赞
踩
ClickHouse支持的导入导出数据格式是非常丰富的,具体可以查看官方文档:格式。
本文主要针对三种类型CSV
/JSON
/AVRO
如何创建Kafka表引擎进行详细说明。
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] ( name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1], name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2], ... ) ENGINE = Kafka() SETTINGS kafka_broker_list = 'host:port', kafka_topic_list = 'topic1,topic2,...', kafka_group_name = 'group_name', kafka_format = 'data_format'[,] [kafka_row_delimiter = 'delimiter_symbol',] [kafka_schema = '',] [kafka_num_consumers = N,] [kafka_max_block_size = 0,] [kafka_skip_broken_messages = N,] [kafka_commit_every_batch = 0,] [kafka_thread_per_consumer = 0]
必填参数:
kafka_broker_list
:Kafka的broker列表(如broker-1:9092,broker-2:9092
)kafka_topic_list
:Kafka的topic列表kafka_group_name
:Kafka消费组名称kafka_format
:消息格式(如CSV
/JSON
/AVRO
)可选参数:
kafka_row_delimiter
:消息分隔符kafka_schema
:消息的schema,下面讲到Avro
格式时会涉及到kafka_num_consumers
:消费者数量kafka_max_block_size
:最大批处理大小(默认值:同max_block_size
)kafka_skip_broken_messages
:Kafka消息解析器对每个块的架构不兼容消息的容忍度(默认值:0)。如果kafka_skip_broken_messages = N
则引擎跳过N条无法解析的Kafka消息(一条消息等于一行数据)。kafka_commit_every_batch
:批量提交时的每批次大小(默认值:0)kafka_thread_per_consumer
:为每个消费者提供独立的线程(默认值:0)。启用后,每个使用者将并行并行地刷新数据;否则,来自多个使用者的行将被压缩以形成一个块CSV
格式数据Kafka
引擎表CREATE TABLE kafka_for_customer (
C_CUSTKEY UInt32,
C_NAME String,
C_ADDRESS String,
C_CITY String,
C_NATION String,
C_REGION String,
C_PHONE String,
C_MKTSEGMENT String
) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka-0:9092,kafka-1:9092,kafka-2:9092',
kafka_topic_list = 'customer',
kafka_group_name = 'ck',
kafka_format = 'CSV',
kafka_num_consumers = 3,
kafka_max_block_size = 1048576;
CREATE TABLE customer on cluster default(
C_CUSTKEY UInt32,
C_NAME String,
C_ADDRESS String,
C_CITY String,
C_NATION String,
C_REGION String,
C_PHONE String,
C_MKTSEGMENT String
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/customer','{replica}') ORDER BY (C_CUSTKEY);
CREATE TABLE customer_all (
C_CUSTKEY UInt32,
C_NAME String,
C_ADDRESS String,
C_CITY String,
C_NATION String,
C_REGION String,
C_PHONE String,
C_MKTSEGMENT String
) ENGINE= Distributed(default, default, customer, rand());
CREATE MATERIALIZED VIEW mv_for_customer TO customer_all AS SELECT * FROM kafka_for_customer;
kafka
下载地址
wget https://mirror.bit.edu.cn/apache/kafka/2.6.0/kafka_2.12-2.6.0.tgz
脚本在kafka
的bin
路径下
./bin/kafka-topics.sh --create --zookeeper zk-0:2181 --replication-factor 3 --partitions 3 --topic customer
customer.tbl
数据生成见ssb-dbgen使用方法
cat customer.tbl|./bin/kafka-console-producer.sh --broker-list kafka-0:9092 --topic customer
ck :) select * from customer_all limit 10; SELECT * FROM customer_all LIMIT 10 ┌─C_CUSTKEY─┬─C_NAME─────────────┬─C_ADDRESS──────────────┬─C_CITY─────┬─C_NATION───────┬─C_REGION────┬─C_PHONE─────────┬─C_MKTSEGMENT─┐ │ 4 │ Customer#000000004 │ 4u58h f │ EGYPT 4 │ EGYPT │ MIDDLE EAST │ 14-128-190-5944 │ MACHINERY │ │ 6 │ Customer#000000006 │ g1s,pzDenUEBW3O,2 pxu │ SAUDI ARA2 │ SAUDI ARABIA │ MIDDLE EAST │ 30-114-968-4951 │ AUTOMOBILE │ │ 7 │ Customer#000000007 │ 8OkMVLQ1dK6Mbu6WG9 │ CHINA 0 │ CHINA │ ASIA │ 28-190-982-9759 │ AUTOMOBILE │ │ 8 │ Customer#000000008 │ j,pZ,Qp,qtFEo0r0c 92qo │ PERU 6 │ PERU │ AMERICA │ 27-147-574-9335 │ BUILDING │ │ 11 │ Customer#000000011 │ cG48rYjF3Aw7xs │ UNITED KI3 │ UNITED KINGDOM │ EUROPE │ 33-464-151-3439 │ BUILDING │ │ 12 │ Customer#000000012 │ Sb4gxKs7 │ JORDAN 5 │ JORDAN │ MIDDLE EAST │ 23-791-276-1263 │ HOUSEHOLD │ │ 14 │ Customer#000000014 │ h3GFMzeFf │ ARGENTINA0 │ ARGENTINA │ AMERICA │ 11-845-129-3851 │ FURNITURE │ │ 16 │ Customer#000000016 │ P2IQMff18er │ IRAN 5 │ IRAN │ MIDDLE EAST │ 20-781-609-3107 │ FURNITURE │ │ 20 │ Customer#000000020 │ i bGScA │ RUSSIA 0 │ RUSSIA │ EUROPE │ 32-957-234-8742 │ FURNITURE │ │ 21 │ Customer#000000021 │ 42E5BARt │ INDIA 4 │ INDIA │ ASIA │ 18-902-614-8344 │ MACHINERY │ └───────────┴────────────────────┴────────────────────────┴────────────┴────────────────┴─────────────┴─────────────────┴──────────────┘ 10 rows in set. Elapsed: 0.015 sec. ck :) select count() from customer_all; SELECT count() FROM customer_all ┌─count()─┐ │ 4696145 │ └─────────┘ 1 rows in set. Elapsed: 0.009 sec.
JSON
格式数据Kafka
引擎表注意JSON
格式kafka_format
不要填JSON
,要填JSONEachRow
CREATE TABLE kafka_for_customer_json (
C_CUSTKEY UInt32,
C_NAME String,
C_ADDRESS String,
C_CITY LowCardinality(String),
C_NATION LowCardinality(String),
C_REGION LowCardinality(String),
C_PHONE String,
C_MKTSEGMENT LowCardinality(String)
) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka-0:9092,kafka-1:9092,kafka-2:9092',
kafka_topic_list = 'customer_json',
kafka_group_name = 'ck',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 3,
kafka_max_block_size = 1048576;
同CSV
格式方法
先将刚刚导入的customer
表的数据导出为JSON
格式作为原始数据
clickhouse-client -h ckhost -u user --password xxxxxx --query "select * from customer FORMAT JSONEachRow " > customer.json
在将customer.json
作为原始数据送入kafka
cat customer.json|./bin/kafka-console-producer.sh --broker-list kafka-0:9092 --topic customer
这样就可以通过分布式表查询到消费Kafka的记录了
Avro
格式数据Avro
格式的数据会复杂一些,需要引入第三方额外组件confluent
,因为在生产和消费时需要给数据序列化和反序列化时一个schema作为参考。
Kafka
引擎表注意Avro
格式kafka
_format不要填Avro
,要填AvroConfluent
CREATE TABLE kafka_for_customer_json (
C_CUSTKEY UInt32,
C_NAME String,
C_ADDRESS String,
C_CITY LowCardinality(String),
C_NATION LowCardinality(String),
C_REGION LowCardinality(String),
C_PHONE String,
C_MKTSEGMENT LowCardinality(String)
) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka-0:9092,kafka-1:9092,kafka-2:9092',
kafka_topic_list = 'customer_avro',
kafka_group_name = 'ck',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 3,
kafka_max_block_size = 1048576;
同CSV
格式方法
confluent
下载地址
wget http://packages.confluent.io/archive/4.0/confluent-oss-4.0.0-2.11.tar.gz
配置schema-registry
vi etc/schema-registry/schema-registry.properties
listeners=http://0.0.0.0:8081
kafkastore.connection.url=zk-0:2181
kafkastore.topic=_schemas
debug=false
启动schema-registry
bin/schema-registry-start -daemon etc/schema-registry/schema-registry.properties
准备数据
可以参考JSON
格式的方法,或者直接使用上面的customer.json
数据
准备schema
cat customer.avsc { "type": "record", "name": "customer", "fields": [ {"name": "C_CUSTKEY", "type": "long"}, {"name": "C_NAME", "type": "string"}, {"name": "C_ADDRESS", "type": "string"}, {"name": "C_CITY", "type": "string"}, {"name": "C_NATION", "type": "string"}, {"name": "C_REGION", "type": "string"}, {"name": "C_PHONE", "type": "string"}, {"name": "C_MKTSEGMENT", "type": "string"} ] }
创建topic
./bin/kafka-topics.sh --create --zookeeper zk-0:2181 --replication-factor 3 --partitions 3 --topic a
使用confluent提供的生产者进行数据生产,–property value.schema中就是上面的schema
cat customer.json | ./bin/kafka-avro-console-producer --broker-list kafka-0:9092 --topic a --property key.schema='{"type":"string"}' --property value.schema='{"type":"record","name":"customer","fields":[{"name":"C_CUSTKEY","type":"long"},{"name":"C_NAME","type":"string"},{"name":"C_ADDRESS","type":"string"},{"name":"C_CITY","type":"string"},{"name":"C_NATION","type":"string"},{"name":"C_REGION","type":"string"},{"name":"C_PHONE","type":"string"},{"name":"C_MKTSEGMENT","type":"string"}]}' --property schema.registry.url=http://localhost:8081
这样就可以通过分布式表查询到消费Kafka的记录了
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。