当前位置:   article > 正文

修改Canal的消费位点_canal binlog 重置position

canal binlog 重置position

修改Canal的消费位点

日常的使用中,当使用 ZooKeeper 作为高可用方案时,因某些原因需要重新采集数据,只要目标库中的 binlog 文件依旧存在,那么就能够将 Canal 的解析位置重新定位,从指定的位置开始解析。
canal instance的配置文件修改无效果,因为zookeeper已经记录binlog日志位置,修改为新的也不会生效,需要在zookeeper配置中修改或者删除重新生成

canal 默认在 ZooKeeper 中存储的节点为:

/otter/canal
  • 1
一、停止canal服务

在有数据被消费时,客户端的每次 Ack 都会使得服务端用本地缓存的位点信息去覆盖 ZooKeeper 上的位点,所以在修改 ZooKeeper 的位点之前,应先避免其不再被覆盖,即停止 Canal 服务。(停止对应的instance,无需停止整个canal服务)

亦可停止该 instance 对应的消费端程序,则不会再触发服务端使用缓存中的位点信息去覆盖 ZooKeeper 中的信息。
  • 1
二、获取位点信息

停止服务后,可通过 zkCli.sh、ZooInspector、zkui 等工具连接到相应的 ZooKeeper 服务器上,并获取到当前的位点信息。
默认情况下,位点信息存储在下述节点中:

/otter/canal/destinations/${destination}/1001/cursor
  • 1

注意:请将 ${destination} 替换成预期的 instance 名称。
若 ZooKeeper 中并无 …/1001/cursor 节点,则表明该 instance 从未被消费过,可通过启动一次对应的消费者,以生成该位点信息,但记得生成位点信息后要将其停止。
本文仅以 zkCli.sh (交互模式)方式示例如何获取名为 “example” 的 instance 的位点信息:

# cd /data/zookeeper/bin
# sh zkCli.sh
[zk: localhost:2181(CONNECTED) 0] ll  (可使用的命令)
ZooKeeper -server host:port cmd args
	stat path [watch]
	set path data [version]
	ls path [watch]
	delquota [-n|-b] path
	ls2 path [watch]
	setAcl path acl
	setquota -n|-b val path
	history 
	redo cmdno
	printwatches on|off
	delete path [version]
	sync path
	listquota path
	rmr path
	get path [watch]
	create [-s] [-e] path data acl
	addauth scheme auth
	quit 
	getAcl path
	close 
	connect host:port
[zk: localhost:2181(CONNECTED) 4] get /otter/canal/destinations/example/1001/cursor  (获取位点信息)
{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"localhost","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.069317","position":5963960,"serverId":833045285,"timestamp":1629962234000}}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

从上述的结果可看出,位点信息是用 json 格式的字符串进行存储的,里面存放了许多信息,我们仅需要变更“journalName”和“position”两个属性值,分别对应当前的 binlog 文件名以及该文件内的具体位置。

三、查询数据位点

我们既然已经确定了变更位点需要哪些信息,那么又如何获得预期的位点信息呢?这就需要视需求而定了,其实,一般来变更位点都是往当前位置的前面调整,再次采集已同步过的数据。本文仅给出如何在 MySQL 中通过 SQL 语句查询 binlog 的位点信息示例:

1.查询 binlog 文件
# show binary logs (此处的输出结果仅为 binlog 文件名称和大小)
  • 1
2.binlog 事件

binary log,顾名思义就是二进制日志文件,里面存放的信息无法直接被展示出来,但我们可以通过 MySQL 的 SQL 语句将其解析成对应的事件,通过判断事件来找出对应的位点信息。
用于查询 binlog 事件的 SQL 语法如下:

# show binlog events [in 'binlog_name'] [from pos] [limit [offset,] row_count]
  • 1

在查询 binlog 事件时,强烈建议尽可能多地增加限制结果集大小的条件,以降低对 MySQL 的性能损耗,这就是有1. 查询 binlog 文件步骤的原因,为此步骤提供信息。
下述示例表示列出名为“mysql-bin.000008”的文件内从第 4 个位点开始往后的 5 条记录。

show binlog events in 'mysql-bin.000008' from 4 limit 5
  • 1

当前仅能通过判断具体的事件,来确定对应的位点值(Pos)。

四、变更位点信息
1.变更

假设我们需要的目标位点信息是:名为 mysql-bin.000008 的日志文件内位点值为 4。那么通过 zkCli.sh 的 set 方法进行变更,示例如下:

set /otter/canal/destinations/example/1001/cursor {"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"localhost","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000008","position":4,"serverId":1,"timestamp":1590112037000}}
  • 1

注意:节点名与节点值之间使用一个半角空格进行分隔。

2.删除

或者直接删除掉旧位点信息(如删除掉旧位点信息,需要去instance配置新的位点)

[zk: localhost:2181(CONNECTED) 5] delete /otter/canal/destinations/example/1001/cursor
  • 1
canal.instance.master.journal.name=mysql-bin.070764
canal.instance.master.position=120
  • 1
  • 2
五、启动canal服务

启动 Canal 服务端,以使新的位点生效。
如果之前通过停止消费端的方式来消除位点被覆盖,那么此时就需要通过重启 Canal 服务端将 ZooKeeper 上新的位点信息去覆盖 Canal 服务端缓存中的信息。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/代码探险家/article/detail/911103
推荐阅读
相关标签
  

闽ICP备14008679号