赞
踩
1.创建kafka主题
./bin/kafka-topics.sh --create --topic wsdlog --bootstrap-server localhost:9092
2.创建kafka主题表
CREATE TABLE wsd.log_kafka
(
`CONTENT` String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'localhost:9092', kafka_topic_list = 'wsdlog', kafka_group_name = 'consumer-group1', kafka_format = 'TabSeparated', kafka_num_consumers = 1
3.创建实体表存储消费到的kafka数据
CREATE TABLE wsd.content
(
`CONTENT` String,
`addTime` DateTime
)
ENGINE = MergeTree
ORDER BY CONTENT
4.创建物化视图解析kafka引擎表日志并存储到实体表
CREATE MATERIALIZED VIEW wsd.log_content TO wsd.content
(
`CONTENT` Nullable(String),
`addTime` DateTime
) AS
SELECT
CONTENT,
now() AS addTime
FROM wsd.log_kafka
5.kafka生产数据,查看clickhouse实体表是否拿到数据
./kafka-console-producer.sh --topic wsdlog --bootstrap-server localhost:9092
clickhouse实体表也是成功取到数据了
也可以不建立实体表
CREATE MATERIALIZED VIEW wsd.log_content2
(
`CONTENT` Nullable(String),
`addTime` DateTime
)
ENGINE = MergeTree
ORDER BY addTime
SETTINGS index_granularity = 8192 AS
SELECT
CONTENT,
now() AS addTime
FROM wsd.log_kafka
clickhouse会自动建立.innner开头的实体表存储
根据不同的场景,可以对物化视图的查询语句进行修改,比如需要针对一些json的解析存储等
如下,这样日志解析存库就可以实时消费kafka日志
CREATE MATERIALIZED VIEW default.nginx_view TO default.nginx ( `time64` DateTime64(6, 'Asia/Shanghai'), `time` DateTime('Asia/Shanghai'), `time_ch` DateTime, `host` String) AS SELECT toDateTime64(extract(CONTENT, '^\\s*(\\d{4,4}\\-\\d\\d\\-\\d\\dT\\d\\d:\\d\\d:\\d\\d(\\.\\d{1,6})?)'), 6, 'Asia/Shanghai') AS time64, toDateTime(time64) AS time, now() AS time_ch, JSONExtractString(conn, 'host') AS host FROM ( SELECT CONTENT, replaceRegexpAll(replaceRegexpAll(CONTENT, '^.+?nginx\\-legal\\s*', ''), ',"request_body":"{.*?}"', '') AS conn FROM default.nginx_kafka )
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。