创建kafka消费引擎

可以直接从kafka从消费数据

  • 指定Kafka地址、topic、消费组名称
  • 指定kafka中每个json数据的类型
    CREATE TABLE kafka_data
    (
      link String,
      requestTime DateTime,
      ip String,
      status Int32,
      x_cache String,
      size_response String,
      apiName String,
      appKey String,
    )
    ENGINE = Kafka()
    SETTINGS
      kafka_broker_list = '192.168.14.101:9092',
      kafka_topic_list = 'yuan_api',
      kafka_group_name = 'clickhouse',
      kafka_format = 'JSONEachRow';
    

创建数据表

用于存储实际的数据

  • 指定分区键,通过解析requestTime获取分区键,格式为YYYYMMDD
CREATE TABLE data
(
    link String,
    requestTime DateTime,
    ip String,
    status Int32,
    x_cache String,
    size_response String,
    apiName String,
    appKey String
)
ENGINE = MergeTree()
ORDER BY (requestTime)
PARTITION BY toYYYYMMDD(requestTime);

创建关联视图

将消费的kafka数据关联到数据表中

  • 将数据插入到数据表中
CREATE MATERIALIZED VIEW orders_mv TO data AS
SELECT
    link,
    CAST(requestTime AS DateTime) AS requestTime,
    ip,
    status,
    x_cache,
    size_response,
    apiName,
    appKey
FROM kafka_data;
Copyright © 运维知识库 all right reserved,powered by Gitbook文件修订时间: 2023-09-19 10:45:38

results matching ""

    No results matching ""