当前位置:   article > 正文

ClickHouse物化视图消费kafka日志_kafka clickhouse 物化视图

kafka clickhouse 物化视图

1.创建kafka主题

./bin/kafka-topics.sh --create --topic wsdlog  --bootstrap-server localhost:9092

2.创建kafka主题表

 CREATE TABLE wsd.log_kafka
(
    `CONTENT` String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'localhost:9092', kafka_topic_list = 'wsdlog', kafka_group_name = 'consumer-group1', kafka_format = 'TabSeparated', kafka_num_consumers = 1

3.创建实体表存储消费到的kafka数据

CREATE TABLE wsd.content
(
    `CONTENT` String,
    `addTime` DateTime
)
ENGINE = MergeTree
ORDER BY CONTENT

4.创建物化视图解析kafka引擎表日志并存储到实体表

CREATE MATERIALIZED VIEW wsd.log_content TO wsd.content
(
    `CONTENT` Nullable(String),
    `addTime` DateTime
) AS
SELECT
    CONTENT,
    now() AS addTime
FROM wsd.log_kafka

5.kafka生产数据,查看clickhouse实体表是否拿到数据

./kafka-console-producer.sh --topic wsdlog --bootstrap-server  localhost:9092

 clickhouse实体表也是成功取到数据了

也可以不建立实体表

CREATE MATERIALIZED VIEW wsd.log_content2
(
    `CONTENT` Nullable(String),
    `addTime` DateTime
)
ENGINE = MergeTree
ORDER BY addTime
SETTINGS index_granularity = 8192 AS
SELECT
    CONTENT,
    now() AS addTime
FROM wsd.log_kafka

 clickhouse会自动建立.innner开头的实体表存储

根据不同的场景,可以对物化视图的查询语句进行修改,比如需要针对一些json的解析存储等

如下,这样日志解析存库就可以实时消费kafka日志

CREATE MATERIALIZED VIEW default.nginx_view TO default.nginx ( `time64` DateTime64(6, 'Asia/Shanghai'), `time` DateTime('Asia/Shanghai'), `time_ch` DateTime, `host` String) AS SELECT toDateTime64(extract(CONTENT, '^\\s*(\\d{4,4}\\-\\d\\d\\-\\d\\dT\\d\\d:\\d\\d:\\d\\d(\\.\\d{1,6})?)'), 6, 'Asia/Shanghai') AS time64, toDateTime(time64) AS time, now() AS time_ch, JSONExtractString(conn, 'host') AS host FROM ( SELECT CONTENT, replaceRegexpAll(replaceRegexpAll(CONTENT, '^.+?nginx\\-legal\\s*', ''), ',"request_body":"{.*?}"', '') AS conn FROM default.nginx_kafka )

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/418378
推荐阅读
相关标签
  

闽ICP备14008679号