当前位置:   article > 正文

Doris对接消费kafka数据方案实现_kafka数据同步到doris

kafka数据同步到doris

         本篇主要讲述消费kafka中的数据同步到Doris中。其他olap分析型数据库中,如clickhouse中有对应的kafka引擎表消费kafka的数据而后再通过物化视图的方式将消费的数据同步到对应的物理表中。但在doris中没有对应的kafka引擎表将要如何来实现同步kafka的数据呢?

   

接下来该篇将讲述两种方案来实现同步kafka的数据到Doris中:

通过Routine Load Doris带有的数据导入的方式来实现

  • kafka中数据为普通间隔字符串,如 ‘|’

 创建接收数据表

  1. CREATE TABLE IF NOT EXISTS sea.user
  2. (
  3. siteid INT DEFAULT '10',
  4. citycode SMALLINT,
  5. username VARCHAR(32) DEFAULT '',
  6. pv BIGINT SUM DEFAULT '0'
  7. )
  8. AGGREGATE KEY(siteid, citycode, username)
  9. DISTRIBUTED BY HASH(siteid) BUCKETS 10
  10. PROPERTIES("replication_num" = "1");

对接kafka语句

  1. CREATE ROUTINE LOAD sea.test ON
  2. user COLUMNS TERMINATED BY "|",
  3. COLUMNS(siteid,citycode,username,pv)
  4. PROPERTIES(
  5. "desired_concurrent_number"="1",
  6. "max_batch_interval"="20",
  7. "max_batch_rows"="300000",
  8. "max_batch_size"="209715200")
  9. FROM KAFKA(
  10. "kafka_broker_list"="192.168.18.129:9092",
  11. "kafka_topic"="doris",
  12. "property.group.id"="gid",
  13. "property.clinet.id"="cid",
  14. "property.kafka_default_offsets"="OFFSET_BEGINNING");


 

要注意的是:sea为库名,必须在导入test别名指定,同时user表不能在指定否则会不识别报错。

  • kafka中数据为JSON数据

    创建接收数据的表

    1. create table dev_ods.ods_user_log(
    2. `distinct_id` String not null COMMENT '会员id',
    3. `time` bigint not null COMMENT '时间戳',
    4. event_at datetime comment '事件时间;年月日,时分秒',
    5. `_track_id` string COMMENT '追踪id',
    6. `login_id` string COMMENT '登录号',
    7. `lib` String COMMENT 'lib',
    8. `anonymous_id` String COMMENT '匿名id',
    9. `_flush_time` bigint COMMENT '刷新时间',
    10. `type` String COMMENT '类型',
    11. `event` String COMMENT '事件类型',
    12. `properties` String COMMENT '具备的属性',
    13. `identities` string comment '身份信息',
    14. `dt` Date COMMENT '事件时间'
    15. )
    16. primary key (distinct_id,`time`)
    17. distributed by hash(distinct_id);

    对接kafka的语句,解析JSON数据

  1. CREATE ROUTINE LOAD dev_ods.user_log ON ods_user_log
  2. COLUMNS(distinct_id,time,_track_id,login_id,lib,anonymous_id,_flush_time,type,event,properties,identities,dt = from_unixtime(time/1000, '%Y%m%d'),event_at=from_unixtime(time/1000, 'yyyy-MM-dd HH:mm:ss'))
  3. PROPERTIES
  4. (
  5. "desired_concurrent_number"="3",
  6. "max_batch_interval" = "20",
  7. "max_batch_rows" = "300000",
  8. "max_batch_size" = "209715200",
  9. "strict_mode" = "false",
  10. "format" = "json"
  11. )FROM KAFKA
  12. (
  13. "kafka_broker_list"= "10.150.20.12:9092",
  14. "kafka_topic" = "bigDataSensorAnalyse",
  15. "property.group.id"="test_group_2",
  16. "property.kafka_default_offsets" = "OFFSET_BEGINNING",
  17. "property.enable.auto.commit"="false"
  18.  );

 JSON结构:

其中properties的字段值为:JSON对象,dt,event_at不是kafka中的数据的值,为处理后写入表中。

说明:1)如果json数据是以数组开始,并且数组中每个对象是一条记录,则需要将strip_outer_array设置成true,表示展平数组。

   2)如果json数据是以数组开始,并且数组中每个对象是一条记录,在设置jsonpath时,我们的ROOT节点实际上是数组中对象。

  1. 支持两种json数据格式:
  2. 1){"category":"a9jadhx","author":"test","price":895}
  3. 2)[
  4. {"category":"a9jadhx","author":"test","price":895},
  5. {"category":"axdfa1","author":"EvelynWaugh","price":1299}
  6.  ]


 

 这也是目前Doris所支持的两种JSON数据格式的解析。

JSON格式为如下数组结构时:

{  "RECORDS": [    {      "category": "11",      "title": "SayingsoftheCentury",      "price": 895,      "timestamp": 1589191587    },    {      "category": "22",      "author": "2avc",      "price": 895,      "timestamp": 1589191487    },    {      "category": "33",      "author": "3avc",      "title": "SayingsoftheCentury",      "timestamp": 1589191387    }  ]}

对应解析SQL语句为:

  1. 6. 用户指定根节点json_root
  2. CREATE ROUTINE LOAD example_db.test1 ON example_tbl
  3. COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))
  4. PROPERTIES
  5. (
  6. "desired_concurrent_number"="3",
  7. "max_batch_interval" = "20",
  8. "max_batch_rows" = "300000",
  9. "max_batch_size" = "209715200",
  10. "strict_mode" = "false",
  11. "format" = "json",
  12. "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]",
  13. "strip_outer_array" = "true",
  14. "json_root" = "$.RECORDS"
  15. )
  16. FROM KAFKA
  17. (
  18. "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
  19. "kafka_topic" = "my_topic",
  20. "kafka_partitions" = "0,1,2",
  21. "kafka_offsets" = "0,0,0"
  22. );


 

查看对应的查看routine load状态​​​​​​​

  1. 显示所有的example_db库下的状态
  2. use example_db;
  3. SHOW ALL ROUTINE LOAD;
 
  1. Ⅴ).查看routine load状态
  2. SHOW ALL ROUTINE LOAD FOR datasource_name.kafka_load;
  3. Ⅵ).常用routine load命令
  4. a).暂停routine load
  5. PAUSE ROUTINE LOAD FOR datasource_name.kafka_load;
  6. b).恢复routine load
  7. RESUME ROUTINE LOAD FOR datasource_name.kafka_load;
  8. c).停止routine load
  9. STOP ROUTINE LOAD FOR datasource_name.kafka_load;
  10. d).查看所有routine load
  11. SHOW [ALL] ROUTINE LOAD FOR datasource_name.kafka_load;
  12. e).查看routine load任务
  13. SHOW ROUTINE LOAD TASK datasource_name.kafka_load;
  14. Ⅶ).查看数据
  15. SELECT * FROM datasource_name.table_name LIMIT 10;

参数解读​​​​​​​

  1. 1) OFFSET_BEGINNING: 从有数据的位置开始订阅。
  2. 2) OFFSET_END: 从末尾开始订阅

​​​​​​​

注:上述对接kafka为无认证的kafka对接方式,更多可以参看文章底部官网链接查看。

2.通过FlinkSQL的方式对接kafka写入Doris​​​​​​​

  1. create table flink_test_1 (
  2. id BIGINT,
  3. day_time VARCHAR,
  4. amnount BIGINT,
  5. proctime AS PROCTIME ()
  6. )
  7. with (
  8. 'connector' = 'kafka',
  9. 'topic' = 'flink_test',
  10. 'properties.bootstrap.servers' = '10.150.60.5:9092',
  11. 'properties.group.id' = 'flink_gp_test1',
  12. 'scan.startup.mode' = 'earliest-offset',
  13. 'format' = 'json',
  14. 'json.fail-on-missing-field' = 'false',
  15. 'json.ignore-parse-errors' = 'true'
  16. );
  17. CREATE TABLE sync_test_1(
  18. day_time string,
  19. total_gmv bigint,
  20. PRIMARY KEY (day_time) NOT ENFORCED
  21. ) WITH (
  22. 'connector' = 'starrocks',
  23. 'jdbc-url'='jdbc:mysql://10.150.60.2:9030',
  24. 'load-url'='10.150.60.2:8040;10.150.60.11:8040;10.150.60.17:8040',
  25. 'database-name' = 'test',
  26. 'table-name' = 'sync_test_1',
  27. 'username' = 'root',
  28. 'password' = 'bigdata1234',
  29. 'sink.buffer-flush.max-rows' = '1000000',
  30. 'sink.buffer-flush.max-bytes' = '300000000',
  31. 'sink.buffer-flush.interval-ms' = '5000',
  32. 'sink.max-retries' = '3'
  33. );
  34. INSERT INTO sync_test_1
  35. SELECT day_time,SUM(amnount) AS total_gmv FROM flink_test_1 GROUP BY day_time;

​​​​​​​

    以上FlinkSQL同步数据方式提供参考,更多Flink sql相关内容会在后期文章中逐步讲解。

Doris官网链接

参考文章

 kafka 导入数据到 doris​​​​​​​

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/395977
推荐阅读
相关标签
  

闽ICP备14008679号