当前位置:   article > 正文

ClickHouse基本使用总结_clickhouse 建表

clickhouse 建表

查看系统配置

查看系统表

select * from system.clusters;

验证zookeeper
#验证zookeeper是否与当前数据库clickhouse进行了正确的配置

SELECT * FROM system.zookeeper WHERE path = '/clickhouse';

建表

创建本地表


MergeTree,这个引擎本身不具备同步副本的功能,如果指定的是ReplicaMergeTree,会同步到对应的replica上面去。一般在实际应用中,创建分布式表指定的都是Replica的表。

分布式表本身不存储数据,数据存储其实还是由本地表t_cluster完成的。这个dist_t_cluster仅仅做一个代理的作用。

如果在任意节点创建表以后,其他节点都能同步到表结构,说明集群生效。

使用 ReplacingMergeTree

  1. CREATE TABLE default.test ON CLUSTER clickhouse_cluster
  2. (
  3.     name String DEFAULT 'lemonNan' COMMENT '姓名',
  4.     age int DEFAULT 18 COMMENT '年龄',
  5.     gongzhonghao String DEFAULT 'lemonCode' COMMENT '公众号',
  6.     my_time DateTime64(3, 'UTC') COMMENT '时间'
  7. ) ENGINE = ReplacingMergeTree()
  8. PARTITION BY toYYYYMM(my_time)
  9. ORDER BY my_time

使用 MergeTree 

  1. CREATE TABLE t_cluster ON CLUSTER clickhouse_cluster (
  2. id Int16,
  3. name String,
  4. birth Date
  5. )ENGINE = MergeTree()
  6. PARTITION BY toYYYYMM(birth)
  7. ORDER BY id

clickhouse 什么时候用ReplacingMergeTree引擎? 什么时候用 MergeTree 引擎? 
ReplacingMergeTree 引擎和 MergeTree 引擎都是 ClickHouse 中常用的引擎类型,但它们适用场景略有不同。

ReplacingMergeTree 引擎适用于需要在数据更新时保留历史数据的场景。该引擎在进行数据插入时,会检测新数据是否与已有的数据冲突,如果冲突,则会用新数据替换旧数据,并记录旧数据的删除时间。这样就可以在查询时保留历史数据,并且支持撤销和恢复历史数据。

MergeTree 引擎适用于大多数常规的 OLAP 场景。该引擎可以对数据进行快速的排序和压缩,并支持按照某一列或多列进行分区,从而实现快速的查询和聚合。MergeTree 引擎比较适合处理大量的数据和高并发的查询请求。

因此,如果需要在数据更新时保留历史数据,可以使用 ReplacingMergeTree 引擎;如果只需要进行常规的 OLAP 查询,可以使用 MergeTree 引擎
 

ClickHouse 用什么依据判断新数据是否与已有的数据冲突?

ClickHouse 在使用 ReplacingMergeTree 引擎时,会根据表中指定的主键来判断新数据是否与已有数据冲突。具体来说,ClickHouse 会根据主键值进行匹配,如果新数据中的主键值与已有数据中的主键值相同,则认为两条数据冲突。

在没有指定主键的情况下,ClickHouse 会使用所有列作为合并键,将新数据和已有数据进行比较。如果所有列的值都相同,则认为两条数据冲突。

在判断数据冲突时,ClickHouse 包括了对数据类型和大小写的严格比较,因此需要注意数据的精度和格式。同时,ClickHouse 还支持自定义函数来进行特定条件下的判断,以满足更加灵活的业务需求。

例如如下是指定name字段为主键

  1. CREATE TABLE default.test88 ON CLUSTER clickhouse_cluster
  2. (
  3. name String NOT NULL COMMENT '姓名',
  4. age int DEFAULT 18 COMMENT '年龄',
  5. gongzhonghao String DEFAULT 'lemonCode' COMMENT '公众号',
  6. my_time DateTime64(3, 'UTC') COMMENT '时间',
  7. PRIMARY KEY (name)
  8. ) ENGINE = ReplacingMergeTree()
  9. PARTITION BY toYYYYMM(my_time)
  10. ORDER BY name

创建分布式表

  1. CREATE TABLE default.dist_t_cluster
  2. ON CLUSTER clickhouse_cluster as t_cluster
  3. engine = Distributed(clickhouse_cluster, default, t_cluster, rand());

插入测试数据
多插入几条,在任意节点上查看分布式表,都能够看到数据。

insert into dist_t_cluster values(1, 'aaa', '2021-02-01'), (2, 'bbb', '2021-02-02');

分布式表引擎的创建模板:

Distributed(clusterName, databaseName, tableName[, sharding_key])
1、集群标识符(clusterName)
注意不是复制表宏中的标识符,而是<remote_servers>中指定的那个。
2、本地表所在的数据库名称(databaseName)
3、本地表名称(tableName)
4、(可选的)分片键(sharding key)

该键与config.xml中配置的分片权重(weight)一同决定写入分布式表时的路由,即数据最终落到哪个物理表上。它可以是表中一列的原始数据(如site_id),也可以是函数调用的结果,如上面的SQL语句采用了随机值rand()。注意该键要尽量保证数据均匀分布,另外一个常用的操作是采用区分度较高的列的哈希值,如intHash64(user_id)。

分布式DDL

在ClickHouse中创建表、删表等DDL操作是一件麻烦的事,需要登录集群中的每一个节点去执行DDL语句,怎么简化这个操作呢?

ClickHouse(即CH)支持集群模式。 可以在DDL语句上附加ON CLUSTER <cluster_name>的语法,使得该DDL语句执行一次即可在集群中所有实例上都执行,简单方便。

一个集群拥有1到多个节点。
CREATE、ALTER、DROP、RENAME、TRUNCATE这些DDL语句,都支持分布式执行
【即如果在集群中的任意一个节点上执行DDL语句,那么集群中的每个节点都会以相同的顺序执行相同的语句,
这样就省去了需要依次去单个节点执行DDL的烦恼】
 

来源:clickhouse 基于集群实现分布式DDL的使用示例及坑_clickhouse insert on cluster_java编程艺术的博客-CSDN博客

分区partition

表中的数据可以按照指定的字段分区存储,每个分区在文件系统中都是都以目录的形式存在。常用时间字段作为分区字段,数据量大的表可以按照小时分区,数据量小的表可以在按照天分区或者月分区,查询时,使用分区字段作为Where条件,可以有效的过滤掉大量非结果集数据。

根据某个字段分区

  1. create table partition_table_test(
  2. id UInt32,
  3. name String,
  4. city String
  5. ) engine = MergeTree()
  6.  order by id
  7. partition by city;

根据时间分区

  1. CREATE TABLE default.test ON CLUSTER clickhouse_cluster
  2. (
  3.     name String DEFAULT 'lemonNan' COMMENT '姓名',
  4.     age int DEFAULT 18 COMMENT '年龄',
  5.     gongzhonghao String DEFAULT 'lemonCode' COMMENT '公众号',
  6.     my_time DateTime64(3, 'UTC') COMMENT '时间'
  7. ) ENGINE = ReplacingMergeTree()
  8. PARTITION BY toYYYYMM(my_time)
  9. ORDER BY my_time

#查询partition相关信息 

select database, table, partition, partition_id, name, path from system.parts where database = 'data_sync' and table = 'test';

#删除partition

alter table data_sync.test drop partition '202203'

 来源:Clickhouse数据表、数据分区partition的基本操作_clickhouse drop partition_Bulut0907的博客-CSDN博客

ClickHouse与Kafak同步

同步流程图

数据表


# 创建数据表

CREATE DATABASE IF NOT EXISTS data_sync;
  1. CREATE TABLE IF NOT EXISTS data_sync.test
  2. (
  3.     name String DEFAULT 'lemonNan' COMMENT '姓名',
  4.     age int DEFAULT 18 COMMENT '年龄',
  5.     gongzhonghao String DEFAULT 'lemonCode' COMMENT '公众号',
  6.     my_time DateTime64(3, 'UTC') COMMENT '时间'
  7. ) ENGINE = ReplacingMergeTree()
  8. PARTITION BY toYYYYMM(my_time)
  9. ORDER BY my_time

引擎表


# 创建 kafka 引擎表, 地址: 172.16.16.4, topic: lemonCode

  1. CREATE TABLE IF NOT EXISTS data_sync.test_queue(
  2.     name String,
  3.     age int,
  4.     gongzhonghao String, 
  5.     my_time DateTime64(3, 'UTC')
  6. ) ENGINE = Kafka
  7. SETTINGS
  8.   kafka_broker_list = '172.16.16.4:9092',
  9.   kafka_topic_list = 'lemonCode',
  10.   kafka_group_name = 'lemonNan',
  11.   kafka_format = 'JSONEachRow',
  12.   kafka_row_delimiter = '\n',
  13.   kafka_schema = '',
  14.   kafka_num_consumers = 1

注意:setting中还有两个重要参数

订阅Kafka数据的参数kafka_thread_per_consumerkafka_num_consumers都与消费者线程的数量相关。具体作用如下:

  • kafka_thread_per_consumer参数:该参数表示每个Kafka消费者的线程数量。默认值为1,即每个消费者使用一个线程。如果需要提高消费速度,可以增加该参数的值以增加消费者线程数量。

  • kafka_num_consumers参数:该参数表示创建的Kafka消费者数量。默认值为1,即只创建一个消费者。如果需要提高消费速度,可以增加该参数的值以增加消费者数量。

需要注意的是,增加消费者线程的数量和消费者数量可能会提高消费速度,但也会增加ClickHouse服务器的负载。因此,在设置这些参数时需要根据实际情况进行权衡。同时,还需要根据Kafka的分区数量、数据量大小等因素进行适量的调整。


 物化视图


# 创建物化视图

  1. CREATE MATERIALIZED VIEW IF NOT EXISTS data_sync.test_mv TO data_sync.test AS
  2. SELECT name, age, gongzhonghao, my_time FROM data_sync.test_queue;

kafka订阅思路

1.可以将kafka数据发送到分布式表中,发挥集群存储的优势。

创建本地表:

  1. CREATE TABLE data_sync.test ON CLUSTER clickhouse_cluster
  2. (
  3. name String NOT NULL COMMENT '姓名',
  4. age int DEFAULT 18 COMMENT '年龄',
  5. gongzhonghao String DEFAULT 'lemonCode' COMMENT '公众号',
  6. my_time DateTime64(3, 'UTC') COMMENT '时间',
  7. ) ENGINE = ReplacingMergeTree()
  8. PARTITION BY toYYYYMM(my_time)
  9. ORDER BY name

创建分布式表:

  1. CREATE TABLE data_sync.test_dist
  2. ON CLUSTER clickhouse_cluster as test
  3. engine = Distributed(clickhouse_cluster, data_sync, test, rand());

创建引擎表:

  1. CREATE TABLE IF NOT EXISTS data_sync.test_queue(
  2.     name String,
  3.     age int,
  4.     gongzhonghao String, 
  5.     my_time DateTime64(3, 'UTC')
  6. ) ENGINE = Kafka
  7. SETTINGS
  8.   kafka_broker_list = '172.16.16.4:9092',
  9.   kafka_topic_list = 'lemonCode',
  10.   kafka_group_name = 'lemonNan',
  11.   kafka_format = 'JSONEachRow',
  12.   kafka_row_delimiter = '\n',
  13.   kafka_schema = '',
  14.   kafka_num_consumers = 1

物化视图:

注意其中的目标表指定到分布式表中。

  1. CREATE MATERIALIZED VIEW IF NOT EXISTS data_sync.test_mv TO data_sync.test_dist AS
  2. SELECT name, age, gongzhonghao, my_time FROM data_sync.test_queue;

 2.可以在多个节点创建引擎表和物化视图,多个节点同时接入kafka数据。

数据模拟


下面是开始模拟流程图的数据走向,已安装 Kafka 的可以跳过安装步骤。

安装 kafka
kafka 这里为了演示安装的是单机

# 启动 zookeeper

docker run -d --name zookeeper -p 2181:2181  wurstmeister/zookeeper


# 启动 kafka, KAFKA_ADVERTISED_LISTENERS 后的 ip地址为机器ip

docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --link zookeeper -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.16.16.4:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka


使用kafka命令发送数据
# 启动生产者,向 topic lemonCode 发送消息

kafka-console-producer.sh --bootstrap-server 172.16.16.4:9092 --topic lemonCode


# 发送以下消息

  1. {"name":"lemonNan","age":20,"gongzhonghao":"lemonCode","my_time":"2022-03-06 18:00:00.001"}
  2. {"name":"lemonNan","age":20,"gongzhonghao":"lemonCode","my_time":"2022-03-06 18:00:00.001"}
  3. {"name":"lemonNan","age":20,"gongzhonghao":"lemonCode","my_time":"2022-03-06 18:00:00.002"}
  4. {"name":"lemonNan","age":20,"gongzhonghao":"lemonCode","my_time":"2022-03-06 23;59:59.002"}


查看 Clickhouse 的数据表

select * from data_sync.test;

来源:

https://www.cnblogs.com/wuhaonan/p/15978470.html

其他命令

如果需要修改表结构或者其他目标表的调整,可以先停止数据订阅,更改好了以后再开始订阅。

注意如果多节点都有引擎表,每个节点都要执行。

#停止订阅

DETACH TABLE data_sync.test_queue;

#开始订阅

ATTACH TABLE data_sync.test_queue;

 删除视图:

drop view data_sync.test_mv;

去重问题

 MergeTree表引擎无法对相同主键的数据进行去重,ClickHouse提供了ReplacingMergeTree引擎,可以针对相同主键的数据进行去重,它能够在合并分区时删除重复的数据。值得注意的是,ReplacingMergeTree只是在一定程度上解决了数据重复问题,但是并不能完全保障数据不重复。

ReplacingMergeTree
相对于MergeTree,只是多一个去重的功能。根据order by的字段去重

去重时机

数据去重只会在合并的过程中出现。合并会在未知的时间在后台进行,所以无法预先做出计划,有一些数据可能仍未被处理。

去重范围

去重只会在分区内部进行去重,不能执行跨分区的去重

因此,ReplacingMergeTree能力有限,不能保证没有重复的数据出现。
 

示例:

  1. CREATE TABLE default.test2
  2. (
  3. id String,
  4. pubtime DateTime
  5. )
  6. ENGINE = ReplacingMergeTree()
  7. PARTITION BY toYYYYMMDD(pubtime)
  8. ORDER BY id
  9. PRIMARY KEY id;

插入数据: 

  1. INSERT INTO default.test2 (id, pubtime) VALUES ('1111', '2023-01-01 00:00:00');
  2. INSERT INTO default.test2 (id, pubtime) VALUES ('1111', '2023-01-01 00:00:00');
  3. INSERT INTO default.test2 (id, pubtime) VALUES ('1111', '2023-01-01 00:00:00');

表中插入了3条数据

 手动触发数据合并

optimize table default.test2

这个时候看,表中只剩下了一条数据

 来源:

ClickHouse学习笔记_clickhouse optimize_不熬夜的靓仔的博客-CSDN博客

网易经验规整:ClickHouse开发与使用规范大全

AggregatingMergeTree

 

AggregatingMergeTree引擎在以下场景下使用:

1. 聚合查询:当需要对大量数据进行聚合操作时,AggregatingMergeTree可以提供高效的聚合查询性能。它会在数据写入时预先聚合数据,并保留聚合结果,以便后续查询时可以直接获取聚合结果,而不需要进行实时的聚合运算。

2. 时间序列数据:AggregatingMergeTree特别适用于存储和查询时间序列数据,如日志、指标数据等。它可以根据时间戳进行数据的分区和排序,同时支持灵活的时间窗口聚合查询,例如按小时、天、周或月进行聚合。

3. 数据压缩:AggregatingMergeTree引擎还支持数据压缩,可以大幅减少存储空间占用。它使用了MergeTree引擎的合并和压缩技术,可以根据数据的时间维度进行合并和压缩,以优化存储空间的利用效率。

总之,AggregatingMergeTree适用于需要进行大规模聚合查询、处理时间序列数据和优化存储空间的场景。它可以提供高性能的聚合查询能力,并节省存储空间。

#注意聚合表在DBeaver中直接双击无法查看

创建原始表

CREATE TABLE user_activity

(

user_id String,

activity_date Date,

activity_count Int32

)

ENGINE = MergeTree()

PARTITION BY toYYYYMM(activity_date)

ORDER BY (activity_date, user_id);

 

创建聚合表

CREATE TABLE user_activity_summary

(

user_id String,

activity_date Date,

activity_sum AggregateFunction(sum, Int32)

)

ENGINE = AggregatingMergeTree()

PARTITION BY toYYYYMM(activity_date)

ORDER BY (activity_date, user_id)

SETTINGS index_granularity = 8192;

原始表入库

INSERT INTO user_activity (user_id, activity_date, activity_count)

VALUES ('user1', '2022-01-01', 10),

('user2', '2022-01-01', 5),

('user3', '2022-01-01', 15),

('user1', '2022-01-02', 8),

('user2', '2022-01-02', 12),

('user3', '2022-01-02', 6),

('user1', '2022-01-03', 20),

('user2', '2022-01-03', 10),

('user3', '2022-01-03', 18);

 

聚合数据到聚合表

在示例中,我们使用INSERT INTO语句将数据插入到user_activity_summary表中。在SELECT子句中,我们使用sumState函数对activity_count列进行聚合。GROUP BY子句将按user_id和activity_date进行分组。

通过这个例子,我们可以将原始的用户活动数据按天进行聚合,在user_activity_summary表中获得按用户和日期聚合的结果,以方便后续的查询和分析操作

INSERT INTO user_activity_summary (user_id, activity_date, activity_sum)
SELECT
    user_id,
    activity_date,
    sumState(activity_count)
FROM user_activity
GROUP BY user_id, activity_date; 

查询聚合表数据

SELECT

user_id,

activity_date,

sumMerge(activity_sum) AS total_activity_sum

FROM user_activity_summary where activity_date='2022-01-02'

GROUP BY user_id, activity_date

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号