赞
踩
环境win
mysql5.7
apache-zookeeper-3.5.9-bin
kafka_2.11-1.1.1
canal.deployer-1.1.7-SNAPSHOT
如果不想看步骤可以直接下载我打包好的文件,修改相关数据库配置就行
https://download.csdn.net/download/weixin_38738049/87441074?spm=1001.2014.3001.5503
mysql> create user ‘canal’@‘%’ identified by ‘canal’;
mysql> grant replication slave on . to ‘canal’@‘%’;
mysql> flush privileges;
log-bin=mysql-bin
binlog-format=ROW
#设置监听数据库名为stpnew
binlog-do-db=stpnew
修改配置文件名称为zoo.cfg
修改配置
dataDir=\tmp\zookeeper
启动 apache-zookeeper-3.5.9-bin\bin\zkServer.cmd
启动成功如下图
下载略
修改配置文件
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://你的本地ip:9092
log.dirs=/tmp/kafka-logs
启动kafka
bin\windows\kafka-server-start.bat config\server.properties
启动报错的话
修改kafka-run-class.bat classpath 加上双引号
set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp "%CLASSPATH%" %KAFKA_OPTS% %*
启动命令
bin\windows\kafka-server-start.bat config\server.properties
创建topic
bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic binlog_stpnew
修改配置文件canal.properties 如下
################################################# ######### common argument ############# ################################################# # tcp bind ip canal.ip = # register ip to zookeeper canal.register.ip = canal.port = 11111 canal.metrics.pull.port = 11112 # canal instance user/passwd # canal.user = canal # canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458 # canal admin config #canal.admin.manager = 127.0.0.1:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 # admin auto register #canal.admin.register.auto = true #canal.admin.register.cluster = #canal.admin.register.name = canal.zkServers = # flush data to zk canal.zookeeper.flush.period = 1000 canal.withoutNetty = false # tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ canal.serverMode = kafka # flush meta cursor/parse position to file canal.file.data.dir = ${canal.conf.dir} canal.file.flush.period = 1000 ## memory store RingBuffer size, should be Math.pow(2,n) canal.instance.memory.buffer.size = 16384 ## memory store RingBuffer used memory unit size , default 1kb canal.instance.memory.buffer.memunit = 1024 ## meory store gets mode used MEMSIZE or ITEMSIZE canal.instance.memory.batch.mode = MEMSIZE canal.instance.memory.rawEntry = true ## detecing config canal.instance.detecting.enable = false #canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now() canal.instance.detecting.sql = select 1 canal.instance.detecting.interval.time = 3 canal.instance.detecting.retry.threshold = 3 canal.instance.detecting.heartbeatHaEnable = false # support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery canal.instance.transaction.size = 1024 # mysql fallback connected to new master should fallback times canal.instance.fallbackIntervalInSeconds = 60 # network config canal.instance.network.receiveBufferSize = 16384 canal.instance.network.sendBufferSize = 16384 canal.instance.network.soTimeout = 30 # binlog filter config canal.instance.filter.druid.ddl = true canal.instance.filter.query.dcl = false canal.instance.filter.query.dml = false canal.instance.filter.query.ddl = false canal.instance.filter.table.error = false canal.instance.filter.rows = false canal.instance.filter.transaction.entry = false canal.instance.filter.dml.insert = false canal.instance.filter.dml.update = false canal.instance.filter.dml.delete = false # binlog format/image check canal.instance.binlog.format = ROW,STATEMENT,MIXED canal.instance.binlog.image = FULL,MINIMAL,NOBLOB # binlog ddl isolation canal.instance.get.ddl.isolation = false # parallel parser config canal.instance.parser.parallel = true ## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors() canal.instance.parser.parallelThreadSize = 16 ## disruptor ringbuffer size, must be power of 2 canal.instance.parser.parallelBufferSize = 256 # table meta tsdb info canal.instance.tsdb.enable = true canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:} canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL; canal.instance.tsdb.dbUsername = canal canal.instance.tsdb.dbPassword = canal # dump snapshot interval, default 24 hour canal.instance.tsdb.snapshot.interval = 24 # purge snapshot expire , default 360 hour(15 days) canal.instance.tsdb.snapshot.expire = 360 ################################################# ######### destinations ############# ################################################# canal.destinations = example # conf root dir canal.conf.dir = ../conf # auto scan instance dir add/remove and start/stop instance canal.auto.scan = true canal.auto.scan.interval = 5 # set this value to 'true' means that when binlog pos not found, skip to latest. # WARN: pls keep 'false' in production env, or if you know what you want. canal.auto.reset.latest.pos.mode = false canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml canal.instance.global.mode = spring canal.instance.global.lazy = false canal.instance.global.manager.address = ${canal.admin.manager} #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml canal.instance.global.spring.xml = classpath:spring/file-instance.xml #canal.instance.global.spring.xml = classpath:spring/default-instance.xml ################################################## ######### MQ Properties ############# ################################################## # aliyun ak/sk , support rds/mq canal.aliyun.accessKey = canal.aliyun.secretKey = canal.aliyun.uid= canal.mq.flatMessage = true canal.mq.canalBatchSize = 50 canal.mq.canalGetTimeout = 100 # Set this value to "cloud", if you want open message trace feature in aliyun. canal.mq.accessChannel = local canal.mq.database.hash = true canal.mq.send.thread.size = 30 canal.mq.build.thread.size = 8 ################################################## ######### Kafka ############# ################################################## kafka.bootstrap.servers = 127.0.0.1:9092 kafka.acks = all kafka.compression.type = none kafka.batch.size = 16384 kafka.linger.ms = 1 kafka.max.request.size = 1048576 kafka.buffer.memory = 33554432 kafka.max.in.flight.requests.per.connection = 1 kafka.retries = 0 kafka.kerberos.enable = false kafka.kerberos.krb5.file = ../conf/kerberos/krb5.conf kafka.kerberos.jaas.file = ../conf/kerberos/jaas.conf # sasl demo # kafka.sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required \\n username=\"alice\" \\npassword="alice-secret\"; # kafka.sasl.mechanism = SCRAM-SHA-512 # kafka.security.protocol = SASL_PLAINTEXT ################################################## ######### RocketMQ ############# ################################################## rocketmq.producer.group = test rocketmq.enable.message.trace = false rocketmq.customized.trace.topic = rocketmq.namespace = rocketmq.namesrv.addr = 127.0.0.1:9876 rocketmq.retry.times.when.send.failed = 0 rocketmq.vip.channel.enabled = false rocketmq.tag = ################################################## ######### RabbitMQ ############# ################################################## rabbitmq.host = rabbitmq.virtual.host = rabbitmq.exchange = rabbitmq.username = rabbitmq.password = rabbitmq.deliveryMode = ################################################## ######### Pulsar ############# ################################################## pulsarmq.serverUrl = pulsarmq.roleToken = pulsarmq.topicTenantPrefix =
修改instance.properties
参考案例 我这里只同步两个表
################################################# ## mysql serverId , v1.0.26+ will autoGen canal.instance.mysql.slaveId=1234 # enable gtid use true/false canal.instance.gtidon=false # position info canal.instance.master.address=127.0.0.1:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= # rds oss binlog #canal.instance.rds.accesskey= #canal.instance.rds.secretkey= #canal.instance.rds.instanceId= # table meta tsdb info #canal.instance.tsdb.enable=true #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb #canal.instance.tsdb.dbUsername=canal #canal.instance.tsdb.dbPassword=canal #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid= # username/password canal.instance.dbUsername=rfrepl canal.instance.dbPassword=repl0507@LF canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== # table regex canal.instance.filter.regex=stpnew\\.t_parking_record,stpnew\\.t_charge_record # table black regex canal.instance.filter.black.regex=mysql\\.slave_.* # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch # mq config canal.mq.topic=binlog_stpnew # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..* canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 #canal.mq.partitionHash=test.table:id^name,.*\\..* #同一id的数据进入同一个分区,保证消费的顺序性 canal.mq.partitionHash=stpnew.t_parking_record:id,stpnew.t_charge_record:parkingrecordid #canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6 #################################################
配置完启动
canal.deployer-1.1.7-SNAPSHOT\bin\startup.bat
此时修改数据,就会收到canal的数据
查看kafkatopic
bin\windows\kafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --topic binlog_stpnew --from-beginning
配置ck
spring:
# ck数据库配置
datasource:
url: jdbc:clickhouse://ip:8123/stpnew?socket_timeout=300000
username: default
password: 123456
driver-class-name: ru.yandex.clickhouse.ClickHouseDriver
配置kafka
kafka: bootstrap-servers: 127.0.0.1:9092 template: # 指定默认topic id default-topic: binlog_stpnew consumer: # 是否自动提交offset enable-auto-commit: false auto-offset-reset: latest # Kafka提供的序列化和反序列化类 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer key-deserializer: org.apache.kafka.common.serialization.StringDeserializer properties: group.id: dataeye # 批量消费每次最多消费多少条消息 max.poll.records: 1000 fetch.min.bytes: 10240 fetch.max.wait.ms: 10000 max.partition.fetch.bytes: 104857600 # 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作) session.timeout.ms: 120000 # 消费请求超时时间 request.timeout.ms: 180000 listener: #在侦听器容器中运行的线程数 concurrency: 3 # 消费端监听的topic不存在时,项目启动会报错(关掉) missing-topics-fatal: false
编写消费者
import org.apache.commons.collections4.CollectionUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ContainerProperties.AckMode; import org.springframework.kafka.support.Acknowledgment; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.reformer.dataeye.cache.JedisClient; import com.reformer.dataeye.handler.AbstractTableHandler; import com.reformer.dataeye.handler.TableStraegyFactory; import com.reformer.dataeye.util.StringUtil; import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; /** * @create 2021年10月14日 上午10:31:11 * @author Administrator * @version */ @SuppressWarnings("unchecked") @Slf4j @Configuration @Component public class DataConsumer { @Autowired protected JedisClient jedisClient; @Autowired private TableStraegyFactory tableStraegyFactory; @Value("${spring.kafka.listener.concurrency:1}") private int concurrency; protected String RETRY_KEY = "dataeye_retry"; @KafkaListener(topics = "#{'${spring.kafka.template.default-topic}'.split(',')}", containerFactory = "manualListenerContainerFactory") public void process(List<ConsumerRecord<String, String>> records, Acknowledgment ackgt) { Map<String, List<JSONObject>> tableMap = new ConcurrentHashMap<String, List<JSONObject>>(); ConsumerRecord<String, String> firstRecord = records.get(0); log.info("consumer:thread={},topic={},offset={},size={}", Thread.currentThread().getName(),firstRecord.topic(),firstRecord.offset(),records.size()); try { //获取 for (ConsumerRecord<String, String> record : records) { Optional<String> kafkaMessage = (Optional<String>) Optional .ofNullable(record.value()); if (!kafkaMessage.isPresent()) { continue; } JSONObject json = JSONObject.parseObject(kafkaMessage.get()); String tableName = json.getString("table"); if (StringUtil.isBlank(tableName) || tableStraegyFactory.getTableStraegy(tableName) == null) { //没有定义的表处理类直接返回 continue; } //数据分组聚合 List<JSONObject> listJson = tableMap.get(tableName); if (listJson == null) { listJson = new ArrayList<JSONObject>(); } if (!listJson.contains(json)) { listJson.add(json); } tableMap.put(tableName, listJson); } if (tableMap.isEmpty() || tableMap.size() == 0) { return; } for (Map.Entry<String, List<JSONObject>> entry : tableMap.entrySet()) { List<JSONObject> array = entry.getValue(); log.info("insert:size={},table={}", array.size(), entry.getKey()); AbstractTableHandler<?> tableHanlder = tableStraegyFactory.getTableStraegy(entry.getKey()); if (tableHanlder != null && array.size() > 0) { boolean ret = tableHanlder.tableProcess(array); if (!ret) { this.putRetyQueue(entry.getKey(), array); } } } } catch (Exception ex) { //捕获异常,防止某条异常数据或者意外的失败导致无法继续消费消息 log.error("process_error:message={}" + ex.getMessage(), ex); ex.printStackTrace(); } finally { //提交偏移量 ackgt.acknowledge(); } } /** * MANUAL 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, * 手动调用Acknowledgment.acknowledge()后提交 * * @param consumerFactory * @return */ @Bean("manualListenerContainerFactory") public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> manualListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); //设置超时时间 factory.getContainerProperties().setPollTimeout(5000); //设置提交偏移量的方式 factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE); log.info("init_props,consumer={},factory={}", JSON.toJSONString(consumerFactory.getConfigurationProperties()), JSON.toJSONString(factory.getContainerProperties())); factory.setConcurrency(concurrency); factory.setBatchListener(true); return factory; } /** * 数据库插入失败,保存至重试队列,后面进行定时重试 * * @param list */ private synchronized void putRetyQueue(String table, List<JSONObject> array) { String value = jedisClient.hget(RETRY_KEY, table); if (StringUtil.isNotBlank((value))) { array.addAll(JSONObject.parseObject(value, List.class)); } jedisClient.hset(RETRY_KEY, table, JSONObject.toJSONString(array)); log.info("putRetyQueue,table={},size={},list={}", table, array.size(), array); } /** * 批量插入重试任务,每隔2分钟执行一次 * * @throws Exception */ @Scheduled(fixedDelay = 120000, initialDelay = 10000) public void retry() throws Exception { String lockKey = RETRY_KEY + "_lock"; try { if (!jedisClient.tryLock(lockKey, 6000, 6000)) { return; } Map<String, String> map = jedisClient.hgetAll(RETRY_KEY); Set<Map.Entry<String, String>> entrySet = map.entrySet(); for (Map.Entry<String, String> entry : entrySet) { List<JSONObject> array = JSONObject.parseObject(entry.getValue(), List.class); String table = entry.getKey(); log.info("retry,table={},size={},array={}", table, array.size(), array); AbstractTableHandler<?> tableHanlder = tableStraegyFactory.getTableStraegy(table); boolean ret = tableHanlder.tableProcess(array); if (ret) { long l = jedisClient.hdel(RETRY_KEY, table); log.info("retry_success,table={},size={},l={}", table, array.size(), l); } } } catch (Exception e) { log.error("-----message=" + e.getMessage(), e); } finally { jedisClient.unlock(lockKey); } } }
处理数据抽象类
import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; import org.apache.commons.collections4.CollectionUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.reformer.dataeye.cache.JedisClient; import com.reformer.dataeye.consumer.DataConsumer; import com.reformer.dataeye.util.ThreadLocalDateUtil; import lombok.extern.slf4j.Slf4j; @Slf4j public abstract class AbstractTableHandler<T> { @Autowired protected JedisClient jedisClient; protected final static ThreadLocalDateUtil threadDateUtil = new ThreadLocalDateUtil("yyyyMM"); protected final static ThreadLocalDateUtil threadDateUtil2 = new ThreadLocalDateUtil("yyyy-MM-dd HH:mm:ss"); @Value("${spring.kafka.consumer.properties.group.id:dataeye}") private String groupId; //业务处理并发线程数 protected ForkJoinPool forkJoinPool = new ForkJoinPool( Runtime.getRuntime().availableProcessors() * 2); public boolean tableProcess(List<JSONObject> array) { try { log.info("同步数据:array"+JSON.toJSONString(array)); List<T> list = new CopyOnWriteArrayList<T>(); //业务处理 Set<String> partitionSet = new HashSet<String>(); list = processDataNode(partitionSet, array); if (CollectionUtils.isEmpty(list)) { return true; } log.info("insert:size={},list={}", list.size(), JSON.toJSONString(list)); //批量插入ck数据库 insertBatch(list); //异步触发合并 optimizeRun(partitionSet); return true; } catch (Exception ex) { log.error("tableProcess_error:message={}" + ex.getMessage(), ex); ex.printStackTrace(); } return false; }
processDataNode 方法
大致逻辑就是根据canal返回的json 判断是update还是查询,还是新增,还是删除对应相关ck操作就行。
protected List<ParkingRecord> processDataNode(Set<String> partitionSet, List<JSONObject> array) throws Exception { List<ParkingRecord> finalList = new ArrayList<ParkingRecord>(); for (int i = 0; i < array.size(); i++) { JSONObject json = array.get(i); if (TypeConstant.QUERY.equals(json.getString("type"))) { continue; } int sign = 1; if (TypeConstant.DELETE.equals(json.getString("type"))) { log.info("detail_delete,json={}", json.toJSONString()); sign = -1; } boolean isUpdate = TypeConstant.UPDATE.equals(json.getString("type")); JSONArray data = json.getJSONArray("data"); //1 : 新增或者更新 -1:删除 List<ParkingRecord> subList = JSONObject.parseArray(data.toJSONString(),ParkingRecord.class); for (ParkingRecord parkingRecord : subList) { parkingRecord.setSign(sign); if (isUpdate) { ParkingRecord record = parkingRecordMapper.selectByPrimaryKey(parkingRecord.getId()); if (!record.getIntime().equals(parkingRecord.getIntime())) { log.info("intime is updated,record={}",JSON.toJSONString(record)); record.setSign(-1); finalList.add(record); } } finalList.add(parkingRecord); //加入分区变量 partitionSet.add(threadDateUtil.formatDate(parkingRecord.getIntime())); } } log.info("ParkingRecord,totalCount={},listCount={}", array.size(), finalList.size()); return finalList; }
之后就是测试集群下相关配置等等。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。