赞
踩
#debezium connector 表需要开cdc,库要开快照隔离
#写入topic ,默认自动创建topic 服务名+表名
curl -i -X POST -H “Accept:application/json” http://kafka地址/connectors/ -H “Content-Type: application/json” -d ‘{
“name”:“inventory-connector12”,
“config”:{
“connector.class”:“io.debezium.connector.sqlserver.SqlServerConnector”,
“key.converter”:“io.confluent.connect.avro.AvroConverter”,
“value.converter”:“io.confluent.connect.avro.AvroConverter”,
“key.converter.schema.registry.url”:“”,
“value.converter.schema.registry.url”:“”,
“database.history.consumer.security.protocol”: “SASL_PLAINTEXT”,
“database.history.consumer.sasl.mechanism”: “SCRAM-SHA-256”,
“database.history.consumer.sasl.jaas.config”: “org.apache.kafka.common.security.scram.ScramLoginModule required username=”" password=“”;“,
“database.history.producer.security.protocol”: “SASL_PLAINTEXT”,
“database.history.producer.sasl.mechanism”: “SCRAM-SHA-256”,
“database.history.producer.sasl.jaas.config”: “org.apache.kafka.common.security.scram.ScramLoginModule required username=”” password=“”;“,
“tasks.max”:“1”,
“database.server.name”:“test”, #唯一
“database.hostname”:”“,
“database.port”:”“,
“database.user”:”“,
“database.password”:”",
“table.include.list”:“dbo.test”, #必须加dbo,才会自动创建topic
“database.dbname”:“CDC_TEST”,
“database.history.kafka.bootstrap.servers”:“10.2.255.70:9092”,
“database.history.kafka.topic”:“testha10” #唯一
}
}’
可能的额外参数:
#数据写入tidb通过flinksql
set execution.checkpointing.interval=10s;
SET table.exec.mini-batch.enabled=true;
SET table.exec.mini-batch.allow-latency=2s;
SET table.exec.mini-batch.size=5000;
SET table.optimizer.distinct-agg.split.enabled=true;
SET table.exec.source.cdc-events-duplicate=true;
#source kafka
CREATE TABLE test_result (
id int,
name varchar,
age varchar,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
‘connector’ = ‘kafka’,
‘scan.startup.mode’ = ‘earliest-offset’,
‘topic’ = ‘test.dbo.test’,
‘properties.bootstrap.servers’ = ‘’,
‘properties.group.id’ = ‘testdbotest’,
‘format’ = ‘debezium-avro-confluent’,
‘debezium-avro-confluent.url’ = ‘’
);
‘connector’ = ‘kafka’,
‘topic’ = ‘test_t_regin2g’,
‘scan.startup.mode’ = ‘earliest-offset’,
–‘scan.startup.mode’ = ‘latest-offset’,
‘properties.bootstrap.servers’ = ‘’,
‘json.ignore-parse-errors’ = ‘true’,
‘sink.semantic’ = ‘exactly-once’,
‘format’ = ‘json’
#sink mysql
CREATE TABLE save_result(
id int,
name varchar,
age varchar,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
‘connector’ = ‘jdbc’,
‘url’ = ‘jdbc:mysql://***/test’,
‘username’ = ‘root’,
‘password’ = ‘’,
‘table-name’ = ‘testcdc_1’
);
INSERT INTO save_result
SELECT *
FROM test_result;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。