赞
踩
Debezium MySQL 连接器为每个行级 INSERT、UPDATE 和 DELETE 操作生成数据更改事件。每个事件都包含一个键和一个值。键和值的结构取决于更改的表。
Debezium 和 Kafka Connect 是围绕连续的事件消息流而设计的。然而,这些事件的结构可能会随着时间的推移而改变,这对消费者来说可能很难处理。为了解决这个问题,每个事件都包含其内容的架构,或者,如果您使用架构注册表,则还包含消费者可用于从注册表获取架构的架构 ID。这使得每个事件都是独立的。
以下 JSON 框架显示了更改事件的基本四个部分。但是,您选择在应用程序中使用的 Kafka Connect 转换器的配置方式决定了这四个部分在更改事件中的表示。仅当您配置转换器来生成模式字段时,模式字段才会处于更改事件中。同样,仅当您配置转换器来生成事件键和事件负载时,事件键和事件负载才会出现在更改事件中。如果您使用 JSON 转换器并将其配置为生成所有四个基本更改事件部分,则更改事件具有以下结构:
{
"schema": { 1
...
},
"payload": { 2
...
},
"schema": { 3
...
},
"payload": { 4
...
},
}
表 7. 变更事件基本内容概述
序列 | 字段 | 描述 |
---|---|---|
1 | schema | 第一个架构字段是事件键的一部分。它指定了一个 Kafka Connect 架构,该架构描述了事件键的有效负载部分中的内容。换句话说,第一个架构字段描述了已更改表的主键结构,或者如果表没有主键,则描述唯一键的结构。可以通过设置 message.key.columns 连接器配置属性来覆盖表的主键。在这种情况下,第一个模式字段描述由该属性标识的键的结构。 |
2 | payload | 第一个有效负载字段是事件键的一部分。它具有先前架构字段描述的结构,并且包含已更改的行的键。 |
3 | schema | 第二个架构字段是事件值的一部分。它指定 Kafka Connect 架构,该架构描述事件值的有效负载部分中的内容。换句话说,第二个架构描述了已更改的行的结构。通常,此模式包含嵌套模式。 |
4 | payload | 第二个有效负载字段是事件值的一部分。它具有先前架构字段描述的结构,并且包含已更改的行的实际数据。 |
默认情况下,连接器流将事件记录更改为名称与事件的原始表相同的主题。请参阅主题名称。
注意:
更改事件的键包含已更改表的键和已更改行的实际键的架构。连接器创建事件时,架构及其相应的负载都包含已更改表的主键(或唯一约束)中每一列的字段。
考虑以下客户表,后面是该表的更改事件键的示例。
CREATE TABLE customers (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL UNIQUE KEY
) AUTO_INCREMENT=1001;
捕获客户表更改的每个更改事件都具有相同的事件键架构。只要客户表具有先前的定义,捕获客户表更改的每个更改事件都具有以下键结构。在 JSON 中,它看起来像这样:
{
"schema": { 1
"type": "struct",
"name": "mysql-server-1.inventory.customers.Key", 2
"optional": false, 3
"fields": [ 4
{
"field": "id",
"type": "int32",
"optional": false
}
]
},
"payload": { 5
"id": 1001
}
}
表8. 变更事件键说明
序列 | 字段 | 描述 |
---|---|---|
1 | schema | 密钥的架构部分指定 Kafka Connect 架构,该架构描述密钥的有效负载部分中的内容。 |
2 | mysql-server-1.inventory.customers.Key | 定义密钥有效负载结构的架构名称。此架构描述了已更改的表的主键结构。键架构名称的格式为连接器名称.数据库名称.表名称.Key。在这个例子中:mysql-server-1 是生成此事件的连接器的名称。inventory 是包含已更改的表的数据库。customer 是已更新的表。 |
3 | optional | 指示事件键是否必须在其负载字段中包含值。在此示例中,需要密钥有效负载中的值。当表没有主键时,键的有效负载字段中的值是可选的。 |
4 | fields | 指定负载中预期的每个字段,包括每个字段的名称、类型以及是否必需。 |
5 | payload | 包含生成此更改事件的行的键。在此示例中,键包含一个值为 1001 的 id 字段。 |
更改事件中的值比键稍微复杂一些。与键一样,值也具有模式部分和有效负载部分。模式部分包含描述有效负载部分的 Envelope 结构的模式,包括其嵌套字段。创建、更新或删除数据的操作的更改事件都具有带有信封结构的值有效负载。
考虑用于显示更改事件键示例的相同示例表:
CREATE TABLE customers (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL UNIQUE KEY
) AUTO_INCREMENT=1001;
对此表的更改的更改事件的值部分描述为:
以下示例显示连接器为在客户表中创建数据的操作生成的更改事件的值部分:
{
"schema": { 1
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "mysql-server-1.inventory.customers.Value", 2
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "mysql-server-1.inventory.customers.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "boolean",
"optional": true,
"default": false,
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "table"
},
{
"type": "int64",
"optional": false,
"field": "server_id"
},
{
"type": "string",
"optional": true,
"field": "gtid"
},
{
"type": "string",
"optional": false,
"field": "file"
},
{
"type": "int64",
"optional": false,
"field": "pos"
},
{
"type": "int32",
"optional": false,
"field": "row"
},
{
"type": "int64",
"optional": true,
"field": "thread"
},
{
"type": "string",
"optional": true,
"field": "query"
}
],
"optional": false,
"name": "io.debezium.connector.mysql.Source", 3
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}
],
"optional": false,
"name": "mysql-server-1.inventory.customers.Envelope" 4
},
"payload": { 5
"op": "c", 6
"ts_ms": 1465491411815, 7
"before": null, 8
"after": { 9
"id": 1004,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": { 10
"version": "2.5.3.Final",
"connector": "mysql",
"name": "mysql-server-1",
"ts_ms": 0,
"snapshot": false,
"db": "inventory",
"table": "customers",
"server_id": 0,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 154,
"row": 0,
"thread": 7,
"query": "INSERT INTO customers (first_name, last_name, email) VALUES ('Anne', 'Kretchmar', 'annek@noanswer.org')"
}
}
}
表 9. 创建事件值字段的描述
序列 | 字段 | 描述 |
---|---|---|
1 | schema | 值的架构,描述值有效负载的结构。连接器为特定表生成的每个更改事件中,更改事件的值架构都是相同的。 |
2 | name | 在架构部分中,每个名称字段指定值的有效负载中字段的架构。mysql-server-1.inventory.customers.Value 是负载的 before 和 after 字段的架构。此模式特定于客户表。before 和 after 字段的架构名称的格式为 LogicalName.tableName.Value,这可确保架构名称在数据库中是唯一的。这意味着,当使用 Avro 转换器时,每个逻辑源中每个表的最终 Avro 模式都有其自己的演变和历史。 |
3 | name | io.debezium.connector.mysql.Source 是有效负载源字段的架构。此架构特定于 MySQL 连接器。连接器将其用于它生成的所有事件。 |
4 | name | mysql-server-1.inventory.customers.Envelope 是负载整体结构的架构,其中 mysql-server-1 是连接器名称,inventory 是数据库,customers 是表。 |
5 | payload | 该值是实际数据。这是更改事件提供的信息。事件的 JSON 表示形式可能比它们描述的行大得多。这是因为 JSON 表示必须包含消息的架构和有效负载部分。但是,通过使用 Avro 转换器,您可以显着减小连接器流式传输到 Kafka 主题的消息的大小。 |
6 | op | 强制字符串,描述导致连接器生成事件的操作类型。在此示例中,c 表示该操作创建了一行。有效值为:c=create u=update d = delete r =read(仅适用于快照) |
7 | ts_ms | 显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。在源对象中,ts_ms 表示数据库中进行更改的时间。通过将payload.source.ts_ms的值与payload.ts_ms的值进行比较,您可以确定源数据库更新与Debezium之间的滞后。 |
8 | before | 一个可选字段,指定事件发生之前行的状态。当 op 字段是 c(表示创建)时(如本例所示),before 字段为空,因为此更改事件是针对新内容的。 |
9 | after | 一个可选字段,指定事件发生后行的状态。在此示例中,after 字段包含新行的 id、first_name、last_name 和 email 列的值。 |
10 | source | 描述事件源元数据的必填字段。此字段包含可用于将此事件与其他事件进行比较的信息,包括事件的来源、事件发生的顺序以及事件是否属于同一事务的一部分。源元数据包括:debezium版本,连接器名称,记录事件的 binlog 名称,二进制日志位置,事件内的行,如果事件是快照的一部分,包含新行的数据库和表的名称,创建事件的 MySQL 线程的 ID(仅限非快照),MySQL 服务器 ID(如果可用),数据库中发生更改的时间戳。如果启用了 binlog_rows_query_log_events MySQL 配置选项并且启用了连接器配置 include.query 属性,则源字段还提供查询字段,其中包含导致更改事件的原始 SQL 语句。在 MariaDB 上,配置选项是 binlog_annotate_row_events。 |
示例客户表中更新的更改事件的值与该表的创建事件具有相同的架构。同样,事件值的有效负载具有相同的结构。但是,事件值有效负载在更新事件中包含不同的值。以下是连接器为更新客户表而生成的事件中的更改事件值的示例:
{
"schema": { ... },
"payload": {
"before": { (1)
"id": 1004,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"after": { (2)
"id": 1004,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": { (3)
"version": "2.5.3.Final",
"name": "mysql-server-1",
"connector": "mysql",
"name": "mysql-server-1",
"ts_ms": 1465581029100,
"snapshot": false,
"db": "inventory",
"table": "customers",
"server_id": 223344,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 484,
"row": 0,
"thread": 7,
"query": "UPDATE customers SET first_name='Anne Marie' WHERE id=1004"
},
"op": "u", (4)
"ts_ms": 1465581029523 (5)
}
}
表 10. 更新事件值字段说明
序列 | 字段 | 描述 |
---|---|---|
1 | before | 一个可选字段,指定事件发生之前行的状态。在更新事件值中,before 字段包含每个表列的字段以及数据库提交之前该列中的值。在此示例中,first_name 值为 Anne。 |
2 | after | 一个可选字段,指定事件发生后行的状态。您可以比较前后结构以确定对此行的更新内容。在示例中,first_name 值现在是 Anne Marie。 |
3 | source | 描述事件源元数据的必填字段。源字段结构与创建事件中的字段相同,但有些值不同,例如样本更新事件来自 binlog 中的不同位置。源元数据包括:debezium版本;连接器名称;记录事件的 binlog 名称;二进制日志位置;事件内的行;如果事件是快照的一部分;包含更新行的数据库和表的名称;创建事件的 MySQL 线程的 ID(仅限非快照);MySQL 服务器 ID(如果可用);数据库中发生更改的时间戳。如果启用了 binlog_rows_query_log_events MySQL 配置选项并且启用了连接器配置 include.query 属性,则源字段还提供查询字段,其中包含导致更改事件的原始 SQL 语句。在 MariaDB 上,配置选项是 binlog_annotate_row_events。 |
4 | op | 描述操作类型的强制字符串。在更新事件值中,op 字段值为 u,表示该行因更新而更改。 |
5 | 显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。在源对象中,ts_ms 表示数据库中进行更改的时间。通过将payload.source.ts_ms的值与payload.ts_ms的值进行比较,您可以确定源数据库更新与Debezium之间的滞后。 |
注意:
更改行的主键字段的 UPDATE 操作称为主键更改。对于主键更改,连接器将发出旧键的 DELETE 事件记录和新(更新的)键的 CREATE 事件记录,以代替 UPDATE 事件记录。这些事件具有通常的结构和内容,此外,每个事件都有一个与主键更改相关的消息头:
删除更改事件中的值与同一表的创建和更新事件具有相同的架构部分。示例客户表的删除事件中的有效负载部分如下所示:
{
"schema": { ... },
"payload": {
"before": { (1)
"id": 1004,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"after": null, (2)
"source": { (3)
"version": "2.5.3.Final",
"connector": "mysql",
"name": "mysql-server-1",
"ts_ms": 1465581902300,
"snapshot": false,
"db": "inventory",
"table": "customers",
"server_id": 223344,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 805,
"row": 0,
"thread": 7,
"query": "DELETE FROM customers WHERE id=1004"
},
"op": "d", (4)
"ts_ms": 1465581902461 (5)
}
}
表 11. 删除事件值字段说明
序列 | 字段 | 描述 |
---|---|---|
1 | before | 可选字段,指定事件发生之前行的状态。在删除事件值中,before 字段包含通过数据库提交删除该行之前的值。 |
2 | after | 可选字段,指定事件发生后行的状态。在删除事件值中,after 字段为空,表示该行不再存在。 |
3 | source | 描述事件源元数据的必填字段。在删除事件值中,源字段结构与同一表的创建和更新事件的源字段结构相同。许多源字段值也是相同的。在删除事件值中,ts_ms 和 pos 字段值以及其他值可能已更改。但删除事件值中的源字段提供相同的元数据:Debezium版本;连接器名称;记录事件的 binlog 名称;二进制日志位置;事件内的行;如果事件是快照的一部分;包含更新行的数据库和表的名称;创建事件的 MySQL 线程的 ID(仅限非快照);MySQL 服务器 ID(如果可用);数据库中发生更改的时间戳。如果启用了 binlog_rows_query_log_events MySQL 配置选项并且启用了连接器配置 include.query 属性,则源字段还提供查询字段,其中包含导致更改事件的原始 SQL 语句。在 MariaDB 上,配置选项是 binlog_annotate_row_events。 |
4 | op | 描述操作类型的强制字符串。 op字段值为d,表示该行被删除。 |
5 | ts_ms | 显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。在源对象中,ts_ms 表示数据库中进行更改的时间。通过将payload.source.ts_ms的值与payload.ts_ms的值进行比较,您可以确定源数据库更新与Debezium之间的滞后。 |
删除更改事件记录为使用者提供处理删除该行所需的信息。包含旧值是因为某些消费者可能需要它们才能正确处理删除。
MySQL 连接器事件旨在与 Kafka 日志压缩配合使用。只要保留每个键的最新消息,日志压缩就可以删除一些较旧的消息。这使得 Kafka 可以回收存储空间,同时确保主题包含完整的数据集并且可用于重新加载基于键的状态。
当删除一行时,删除事件值仍然适用于日志压缩,因为 Kafka 可以删除具有相同键的所有早期消息。但是,为了让 Kafka 删除具有相同键的所有消息,消息值必须为 null。为了实现这一点,在 Debezium 的 MySQL 连接器发出删除事件后,连接器会发出一个特殊的逻辑删除事件,该事件具有相同的键但为空值。
截断更改事件表示表已被截断。在这种情况下,消息键为 null,消息值如下所示:
{
"schema": { ... },
"payload": {
"source": { (1)
"version": "2.5.3.Final",
"name": "mysql-server-1",
"connector": "mysql",
"name": "mysql-server-1",
"ts_ms": 1465581029100,
"snapshot": false,
"db": "inventory",
"table": "customers",
"server_id": 223344,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 484,
"row": 0,
"thread": 7,
"query": "UPDATE customers SET first_name='Anne Marie' WHERE id=1004"
},
"op": "t", (2)
"ts_ms": 1465581029523 (3)
}
}
表 12. 截断事件值字段的描述
序列 | 字段 | 描述 |
---|---|---|
1 | source | 描述事件源元数据的必填字段。在截断事件值中,源字段结构与同一表的创建、更新和删除事件相同,提供以下元数据:debezium版本;连接器类型和名称;记录事件的 Binlog 名称;二进制日志位置;事件内的行;如果事件是快照的一部分;数据库和表的名称;截断事件的 MySQL 线程的 ID(仅限非快照);MySQL 服务器 ID(如果可用);数据库中发生更改的时间戳 |
2 | op | 描述操作类型的强制字符串。 op字段值为t,表示该表被截断。 |
3 | ts_ms | 显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。+ 在源对象中,ts_ms 表示数据库中进行更改的时间。通过将payload.source.ts_ms的值与payload.ts_ms的值进行比较,您可以确定源数据库更新与Debezium之间的滞后。 |
如果单个 TRUNCATE 语句应用于多个表,则每个截断表都会发出一个截断更改事件记录。
请注意,由于截断事件表示对整个表所做的更改并且没有消息键,因此除非您正在处理具有单个分区的主题,否则与表相关的更改事件没有顺序保证(创建、更新等)并截断该表的事件。例如,当从不同分区读取这些事件时,消费者仅在该表的截断事件之后才可以接收更新事件。
更多Debezium技术请参考:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。