当前位置:   article > 正文

debezium sqlserver connnector_debezium-connector-sqlserver

debezium-connector-sqlserver

#debezium connector 表需要开cdc,库要开快照隔离
#写入topic ,默认自动创建topic 服务名+表名
curl -i -X POST -H “Accept:application/json” http://kafka地址/connectors/ -H “Content-Type: application/json” -d ‘{
“name”:“inventory-connector12”,
“config”:{
“connector.class”:“io.debezium.connector.sqlserver.SqlServerConnector”,
“key.converter”:“io.confluent.connect.avro.AvroConverter”,
“value.converter”:“io.confluent.connect.avro.AvroConverter”,
“key.converter.schema.registry.url”:“”,
“value.converter.schema.registry.url”:“”,
“database.history.consumer.security.protocol”: “SASL_PLAINTEXT”,
“database.history.consumer.sasl.mechanism”: “SCRAM-SHA-256”,
“database.history.consumer.sasl.jaas.config”: “org.apache.kafka.common.security.scram.ScramLoginModule required username=”" password=“”;“,
“database.history.producer.security.protocol”: “SASL_PLAINTEXT”,
“database.history.producer.sasl.mechanism”: “SCRAM-SHA-256”,
“database.history.producer.sasl.jaas.config”: “org.apache.kafka.common.security.scram.ScramLoginModule required username=”” password=“”;“,
“tasks.max”:“1”,
“database.server.name”:“test”, #唯一
“database.hostname”:”“,
“database.port”:”“,
“database.user”:”“,
“database.password”:”",
“table.include.list”:“dbo.test”, #必须加dbo,才会自动创建topic
“database.dbname”:“CDC_TEST”,
“database.history.kafka.bootstrap.servers”:“10.2.255.70:9092”,
“database.history.kafka.topic”:“testha10” #唯一
}
}’
可能的额外参数:
可能的额外参数

#数据写入tidb通过flinksql
set execution.checkpointing.interval=10s;
SET table.exec.mini-batch.enabled=true;
SET table.exec.mini-batch.allow-latency=2s;
SET table.exec.mini-batch.size=5000;
SET table.optimizer.distinct-agg.split.enabled=true;
SET table.exec.source.cdc-events-duplicate=true;

#source kafka
CREATE TABLE test_result (
id int,
name varchar,
age varchar,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
‘connector’ = ‘kafka’,
‘scan.startup.mode’ = ‘earliest-offset’,
‘topic’ = ‘test.dbo.test’,
properties.bootstrap.servers’ = ‘’,
‘properties.group.id’ = ‘testdbotest’,
‘format’ = ‘debezium-avro-confluent’,
‘debezium-avro-confluent.url’ = ‘’
);

‘connector’ = ‘kafka’,
‘topic’ = ‘test_t_regin2g’,
‘scan.startup.mode’ = ‘earliest-offset’,
–‘scan.startup.mode’ = ‘latest-offset’,
‘properties.bootstrap.servers’ = ‘’,
‘json.ignore-parse-errors’ = ‘true’,
‘sink.semantic’ = ‘exactly-once’,
‘format’ = ‘json’

#sink mysql
CREATE TABLE save_result(
id int,
name varchar,
age varchar,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
‘connector’ = ‘jdbc’,
‘url’ = ‘jdbc:mysql://***/test’,
‘username’ = ‘root’,
‘password’ = ‘’,
‘table-name’ = ‘testcdc_1’
);

INSERT INTO save_result
SELECT *
FROM test_result;

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/583808
推荐阅读
  

闽ICP备14008679号