赞
踩
Debezium 的 Oracle 连接器捕获并记录 Oracle 服务器上的数据库中发生的行级更改,包括连接器运行时添加的表。您可以将连接器配置为针对架构和表的特定子集发出更改事件,或者忽略、屏蔽或截断特定列中的值。
Debezium 通过使用本机 LogMiner 数据库包、XStream API 或 OpenLogReplicator 从 Oracle 获取更改事件。
LogMiner是Oracle数据库中一个用于数据恢复和审计的工具。它可以解析数据库的redo日志和归档日志,提供对数据库中所有操作的详细记录,包括DDL语句、DML语句和事务控制语句等。
LogMiner数据库包是Oracle数据库中提供的一个包,包含了一些常用的LogMiner相关的函数和过程。这些函数和过程可以帮助用户更方便地使用LogMiner进行数据恢复和审计。一些常用的LogMiner数据库包中的函数和过程包括:
通过使用LogMiner数据库包中的这些函数和过程,用户可以更方便地进行数据恢复和审计工作。
XStream API是一个Java库,用于将Java对象序列化为XML格式,或者将XML格式的数据反序列化为Java对象。它可以帮助Java开发人员更方便地在Java应用程序和XML数据之间进行转换。
XStream API可以处理复杂的Java对象,包括继承、泛型、集合、嵌套对象等。它还支持自定义转换器,可以根据开发人员的需求对Java对象进行自定义序列化和反序列化。
XStream API的主要功能包括:
将Java对象序列化为XML格式
将XML格式的数据反序列化为Java对象
支持自定义转换器
支持处理复杂的Java对象
支持忽略某些字段或属性
支持注解方式配置
使用XStream API,可以很方便地将Java对象转换为XML格式,然后在不同的系统之间传输或存储。同时,它也可以将XML格式的数据反序列化为Java对象,方便Java应用程序进行处理和使用。
OpenLogReplicator是一款开源的、基于Java的日志复制工具,可以实时地将MySQL数据库的binlog日志复制到其他的MySQL实例或者其他的数据存储介质,如Kafka、MongoDB、Elasticsearch等。它可以帮助开发人员和系统管理员更方便地进行数据备份、数据复制、数据迁移等工作。
OpenLogReplicator具有以下特点:
要以最佳方式配置和运行 Debezium Oracle 连接器,了解连接器如何执行快照、流式传输更改事件、确定 Kafka 主题名称、使用元数据以及实现事件缓冲会很有帮助。
通常,Oracle 服务器上的重做日志配置为不保留数据库的完整历史记录。因此,Debezium Oracle 连接器无法从日志中检索数据库的完整历史记录。为了使连接器能够为数据库的当前状态建立基线,连接器第一次启动时,它会执行数据库的初始一致快照。
注意:
以下工作流程列出了 Debezium 创建快照所采取的步骤。这些步骤描述了当 snapshot.mode 配置属性设置为其默认值(初始值)时快照的过程。您可以通过更改 snapshot.mode 属性的值来自定义连接器创建快照的方式。如果您配置不同的快照模式,连接器将使用此工作流的修改版本来完成快照。
当快照模式设置为默认时,连接器将完成以下任务来创建快照:
1.建立与数据库的连接。
2.确定要捕获的表。默认情况下,连接器捕获所有表,但那些具有将其排除在捕获之外的架构的表除外。快照完成后,连接器将继续传输指定表的数据。如果您希望连接器仅从特定表捕获数据,则可以通过设置 table.include.list 或 table.exclude.list 等属性,指示连接器仅捕获表或表元素子集的数据。
3.在每个捕获的表上获取 ROW SHARE MODE 锁,以防止在创建快照期间发生结构更改。 Debezium 持有这些锁的时间很短。
4.从服务器的重做日志中读取当前系统更改号 (SCN) 位置。
5.捕获所有数据库表或指定捕获的所有表的结构。连接器将架构信息保留在其内部数据库架构历史记录主题中。架构历史记录提供有关发生更改事件时有效的结构的信息。
注意:
6.释放在步骤 3 中获得的锁。其他数据库客户端现在可以写入任何先前锁定的表。
7.在步骤 4 中读取的 SCN 位置,连接器扫描指定用于捕获的表 (SELECT * FROM … AS OF SCN 123)。在扫描期间,连接器完成以下任务:
8.在连接器偏移量中记录快照的成功完成。
生成的初始快照捕获捕获表中每行的当前状态。从该基线状态开始,连接器会捕获发生的后续更改。
快照流程开始后,如果由于连接器故障、重新平衡或其他原因导致该流程中断,则该流程将在连接器重新启动后重新启动。连接器完成初始快照后,它会继续从步骤 3 中读取的位置进行流式传输,以便不会错过任何更新。如果连接器因任何原因再次停止,则在重新启动后,它将从之前停止的位置恢复流式传输更改。
表 1. snapshot.mode 连接器配置属性的设置
设置 | 描述 |
---|---|
always | 在每个连接器启动时执行快照。快照完成后,连接器开始传输事件记录以进行后续数据库更改。 |
initial | 连接器按照创建初始快照的默认工作流程中所述执行数据库快照。快照完成后,连接器开始传输事件记录以进行后续数据库更改。 |
initial_only | 连接器执行数据库快照并在流式传输任何更改事件记录之前停止,不允许捕获任何后续更改事件。 |
schema_only | 连接器捕获所有相关表的结构,执行默认快照工作流程中描述的所有步骤,但它不会创建 READ 事件来表示连接器启动时的数据集(步骤 6)。 |
schema_only_recovery | 连接器捕获所有相关表的结构,执行默认快照工作流程中描述的所有步骤,但它不会创建 READ 事件来表示连接器启动时的数据集(步骤 6)。警告:如果在上次连接器关闭后将架构更改提交到数据库,请勿使用此模式执行快照。 |
连接器运行的初始快照捕获两种类型的信息:
运行初始快照后,您可能会注意到快照捕获未指定捕获的表的架构信息。默认情况下,初始快照旨在捕获数据库中存在的每个表的架构信息,而不仅仅是从指定捕获的表中捕获。连接器要求表的架构存在于架构历史记录主题中,然后才能捕获表。通过启用初始快照来捕获不属于原始捕获集的表的架构数据,Debezium 使连接器做好准备,以便在以后有必要时可以轻松地从这些表中捕获事件数据。如果初始快照未捕获表的架构,则必须先将该架构添加到历史主题,然后连接器才能从表中捕获数据。
在某些情况下,您可能希望限制初始快照中的架构捕获。当您想要减少完成快照所需的时间时,这会很有用。或者,当 Debezium 通过有权访问多个逻辑数据库的用户帐户连接到数据库实例,但您希望连接器仅捕获特定逻辑数据库中的表中的更改时。
附加信息:
在某些情况下,您可能希望连接器从初始快照未捕获其架构的表中捕获数据。根据连接器配置,初始快照可能仅捕获数据库中特定表的表架构。如果历史主题中不存在表架构,连接器将无法捕获该表,并报告缺少架构错误。
您可能仍然能够从表中捕获数据,但必须执行其他步骤来添加表架构。
先决条件
程序
如果将架构更改应用于表,则架构更改之前提交的记录与更改之后提交的记录具有不同的结构。当 Debezium 从表中捕获数据时,它会读取架构历史记录以确保将正确的架构应用于每个事件。如果该架构不存在于架构历史记录主题中,则连接器无法捕获该表,并会产生错误。
如果要从初始快照未捕获的表中捕获数据,并且该表的架构已修改,则必须将该架构添加到历史主题(如果尚不可用)。您可以通过运行新的架构快照或运行表的初始快照来添加架构。
先决条件
程序
初始快照捕获了所有表的架构(store.only.captured.tables.ddl 设置为 false)
初始快照未捕获所有表的架构(store.only.captured.tables.ddl 设置为 true)
如果初始快照未保存要捕获的表的架构,请完成以下过程之一:
流程 1:架构快照,然后是增量快照
程序 2:初始快照,然后是可选的增量快照
默认情况下,连接器仅在首次启动后运行初始快照操作。在此初始快照之后,在正常情况下,连接器不会重复快照过程。连接器捕获的任何未来更改事件数据仅通过流处理传入。
但是,在某些情况下,连接器在初始快照期间获取的数据可能会过时、丢失或不完整。为了提供重新捕获表数据的机制,Debezium 包含一个执行临时快照的选项。在 Debezium 环境中发生以下任何更改后,您可能需要执行临时快照:
您可以通过启动所谓的临时快照为之前捕获快照的表重新运行快照。特别快照需要使用信令表。您可以通过向 Debezium 信令表发送信号请求来启动临时快照。
当您启动现有表的临时快照时,连接器会将内容追加到该表已存在的主题中。如果删除了先前存在的主题,并且启用了自动主题创建,Debezium 可以自动创建主题。
即席快照信号指定要包含在快照中的表。快照可以捕获数据库的全部内容,也可以仅捕获数据库中表的子集。此外,快照可以捕获数据库中表的内容的子集。
您可以通过向信令表发送执行快照消息来指定要捕获的表。将执行快照信号的类型设置为增量或阻塞,并提供要包含在快照中的表的名称,如下表所述:
表 2. 即席执行快照信号记录示例
字段 | 默认值 | 描述 |
---|---|---|
type | incremental | 指定要运行的快照的类型。目前,您可以请求增量快照或阻塞快照。 |
data-collections | N/A | 包含与要快照的表的完全限定名称匹配的正则表达式的数组。名称的格式与 signal.data.collection 配置选项相同。 |
additional-condition | N/A | 可选字符串,指定基于表的列的条件,以捕获表内容的子集。注意:该属性已被弃用。要指定用于定义要快照捕获的数据子集的条件,请使用additional-conditions 参数。 |
additional-conditions | N/A | 一个可选数组,指定连接器评估的一组附加条件,以确定要包含在快照中的记录子集。每个附加条件都是一个对象,指定过滤临时快照捕获的数据的条件。您可以为每个附加条件设置以下参数:data-collection过滤器应用到的表的完全限定名称。您可以对每个表应用不同的过滤器。filter指定数据库记录中必须存在的列值,快照才能包含该列值,例如“color=‘blue’”。您分配给筛选器参数的值与您在为阻塞快照设置 snapshot.select.statement.overrides 属性时可能在 SELECT 语句的 WHERE 子句中指定的值类型相同。在早期 Debezium 版本中,没有为快照信号定义显式过滤器参数;相反,过滤条件是由为现已弃用的附加条件参数指定的值隐含的。 |
surrogate-key | N/A | 一个可选字符串,指定连接器在快照过程中用作表主键的列名称。 |
您可以通过向信令表添加具有执行快照信号类型的条目来启动临时增量快照。连接器处理消息后,开始快照操作。快照进程读取第一个和最后一个主键值,并将这些值用作每个表的起点和终点。根据表中的条目数和配置的块大小,Debezium 将表划分为块,并继续对每个块进行快照,一次一个。
您可以通过向信令表添加具有执行快照信号类型的条目来启动临时阻塞快照。连接器处理消息后,开始快照操作。连接器暂时停止流式传输,然后启动指定表的快照,遵循初始快照期间使用的相同过程。快照完成后,连接器将恢复流式传输。
为了提供管理快照的灵活性,Debezium 包含一个补充快照机制,称为增量快照。增量快照依赖 Debezium 机制向 Debezium 连接器发送信号。增量快照基于DDD-3设计文档。
在增量快照中,Debezium 不像初始快照那样一次性捕获数据库的完整状态,而是以一系列可配置块的形式分阶段捕获每个表。您可以指定希望快照捕获的表以及每个块的大小。块大小确定快照在数据库上的每个提取操作期间收集的行数。增量快照的默认块大小为 1024 行。
随着增量快照的进行,Debezium 使用水印来跟踪其进度,维护其捕获的每个表行的记录。与标准初始快照过程相比,这种分阶段捕获数据的方法具有以下优点:
当您运行增量快照时,Debezium 按主键对每个表进行排序,然后根据配置的块大小将表拆分为块。逐块工作,然后捕获块中的每个表行。对于它捕获的每一行,快照都会发出一个 READ 事件。该事件表示块快照开始时行的值。
随着快照的进行,其他进程可能会继续访问数据库,从而可能修改表记录。为了反映此类更改,INSERT、UPDATE 或 DELETE 操作将照常提交到事务日志。同样,正在进行的 Debezium 流处理继续检测这些更改事件并将相应的更改事件记录发送到 Kafka。
在某些情况下,流处理发出的 UPDATE 或 DELETE 事件的接收顺序不正确。也就是说,在快照捕获包含该行的 READ 事件的块之前,流处理可能会发出一个修改表行的事件。当快照最终发出该行相应的 READ 事件时,其值已被取代。为了确保按正确的逻辑顺序处理不按顺序到达的增量快照事件,Debezium 采用缓冲方案来解决冲突。仅当快照事件和流式事件之间的冲突得到解决后,Debezium 才会向 Kafka 发送事件记录。
为了帮助解决迟到的 READ 事件和修改同一表行的流式事件之间的冲突,Debezium 采用了所谓的快照窗口。快照窗口划定了增量快照捕获指定表块数据的时间间隔。在块的快照窗口打开之前,Debezium 会遵循其通常的行为,并将事件从事务日志直接向下游发送到目标 Kafka 主题。但从特定块的快照打开的那一刻起,直到其关闭,Debezium 都会执行重复数据删除步骤来解决具有相同主键的事件之间的冲突。
对于每个数据收集,Debezium 会发出两种类型的事件,并将它们的记录存储在单个目标 Kafka 主题中。它直接从表中捕获的快照记录作为 READ 操作发出。同时,随着用户继续更新数据集合中的记录,并且更新事务日志以反映每次提交,Debezium 会针对每次更改发出 UPDATE 或 DELETE 操作。
当快照窗口打开时,Debezium 开始处理快照块,它将快照记录传送到内存缓冲区。在快照窗口期间,缓冲区中 READ 事件的主键与传入流事件的主键进行比较。如果未找到匹配项,则流式事件记录将直接发送到 Kafka。如果 Debezium 检测到匹配,它会丢弃缓冲的 READ 事件,并将流式记录写入目标主题,因为流式事件在逻辑上取代静态快照事件。块的快照窗口关闭后,缓冲区仅包含不存在相关事务日志事件的 READ 事件。 Debezium 将这些剩余的 READ 事件发送到表的 Kafka 主题。
连接器对每个快照块重复该过程。
注意:Oracle 的 Debezium 连接器在增量快照运行时不支持架构更改。
目前,启动增量快照的唯一方法是将临时快照信号发送到源数据库上的信令表。
您将信号作为 SQL INSERT 查询提交到信令表。
Debezium 检测到信令表中的更改后,它会读取信号并运行请求的快照操作。
您提交的查询指定要包含在快照中的表,并且可以选择指定快照操作的类型。目前,快照操作的唯一有效选项是默认值增量。
要指定要包含在快照中的表,请提供一个列出表的数据集合数组或用于匹配表的正则表达式数组,例如,
{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}
增量快照信号的数据收集数组没有默认值。如果数据收集数组为空,Debezium 会检测到不需要执行任何操作,并且不会执行快照。
注意:
先决条件
使用源信令通道触发增量快照
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<tableName>","<tableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<tableName>", "filter": "<additional-condition>"}]}');
例如,
INSERT INTO myschema.debezium_signal (id, type, data) (1)
values ('ad-hoc-1', (2)
'execute-snapshot', (3)
'{"data-collections": ["schema1.table1", "schema2.table2"], (4)
"type":"incremental", (5)
"additional-conditions":[{"data-collection": "schema1.table1" ,"filter":"color=\'blue\'"}]}'); (6)
命令中的id、type、data参数的取值与信令表的字段相对应。
示例中的参数说明如下表:
表 3. 用于向信令表发送增量快照信号的 SQL 命令中的字段说明
序列 | 值 | 描述 |
---|---|---|
1 | myschema.debezium_signal | 指定源数据库上信令表的完全限定名称。 |
2 | ad-hoc-1 | id 参数指定指定为信号请求的 id 标识符的任意字符串。使用此字符串来标识信令表中条目的日志消息。 Debezium 不使用该字符串。相反,在快照期间,Debezium 会生成自己的 id 字符串作为水印信号。 |
3 | execute-snapshot | 类型参数指定信号要触发的操作。 |
4 | data-collections | 信号数据字段的必需组件,指定表名称或正则表达式的数组,以匹配要包含在快照中的表名称。该数组列出了按完全限定名称匹配表的正则表达式,使用与在 signal.data.collection 配置属性中指定连接器信令表名称相同的格式。 |
5 | incremental | 信号数据字段的可选类型组件,指定要运行的快照操作的类型。 |
目前,唯一有效的选项是默认值增量。如果不指定值,连接器将运行增量快照。 | ||
6 | additional-conditions | 一个可选数组,指定连接器评估的一组附加条件,以确定要包含在快照中的记录子集。每个附加条件都是一个具有数据收集和过滤属性的对象。您可以为每个数据集合指定不同的过滤器。* data-collection 属性是要应用过滤器的数据集合的完全限定名称。 |
如果您希望快照仅包含表中内容的子集,您可以通过向快照信号附加附加条件参数来修改信号请求。
典型快照的 SQL 查询采用以下形式:
SELECT * FROM <tableName> ....
通过添加附加条件参数,您可以将 WHERE 条件附加到 SQL 查询,如下例所示:
SELECT * FROM <data-collection> WHERE <filter> ....
以下示例显示了一个 SQL 查询,用于将带有附加条件的临时增量快照请求发送到信令表:
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<tableName>","<tableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<tableName>", "filter": "<additional-condition>"}]}');
例如,假设您有一个包含以下列的产品表:
如果您希望products表的增量快照只包含color=blue的数据项,可以使用以下SQL语句触发快照:
INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "schema1.products", "filter": "color=blue"}]}');
附加条件参数还允许您传递基于多个列的条件。例如,使用上一示例中的产品表,您可以提交一个查询来触发增量快照,该快照仅包含那些 color=blue 且数量>10 的商品的数据:
INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "schema1.products", "filter": "color=blue AND quantity>10"}]}');
以下示例显示了连接器捕获的增量快照事件的 JSON。
示例:增量快照事件消息
{
"before":null,
"after": {
"pk":"1",
"value":"New data"
},
"source": {
...
"snapshot":"incremental" (1)
},
"op":"r", (2)
"ts_ms":"1620393591654",
"transaction":null
}
序列 | 值 | 描述 |
---|---|---|
1 | snapshot | 指定要运行的快照操作的类型。目前,唯一有效的选项是默认值增量。在提交到信令表的 SQL 查询中指定类型值是可选的。如果不指定值,连接器将运行增量快照。 |
2 | op | 指定事件类型。快照事件的值为r,表示READ操作。 |
您可以向配置的 Kafka 主题发送消息,请求连接器运行临时增量快照。
Kafka 消息的键必须与 topic.prefix 连接器配置选项的值匹配。
消息的值是一个带有类型和数据字段的 JSON 对象。
信号类型为execute-snapshot,数据字段必须有以下字段:
表 4. 执行快照数据字段
序列 | 值 | 描述 |
---|---|---|
type | incremental | 要执行的快照的类型。目前 Debezium 仅支持增量类型。 |
data-collections | N/A | 一组以逗号分隔的正则表达式,与要包含在快照中的表的完全限定名称相匹配。使用与 signal.data.collection 配置选项所需的格式相同的格式指定名称。 |
additional-condition | N/A | 一个可选字符串,指定连接器评估的条件,以指定要包含在快照中的记录子集。 |
additional-conditions | N/A | 附加条件的可选数组,指定连接器评估的条件以指定要包含在快照中的记录子集。每个附加条件都是一个对象,指定过滤临时快照捕获的数据的条件。您可以为每个附加条件设置以下参数: data-collection:: 过滤器应用到的表的完全限定名称。您可以对每个表应用不同的过滤器。 filter:: 指定数据库记录中必须存在的列值,快照才能包含该列值,例如“color=‘blue’”。您分配给筛选器参数的值与您在为阻塞快照设置 snapshot.select.statement.overrides 属性时可能在 SELECT 语句的 WHERE 子句中指定的值类型相同。在早期 Debezium 版本中,没有为快照信号定义显式过滤器参数;相反,过滤条件是由为现已弃用的附加条件参数指定的值隐含的。 |
执行快照 Kafka 消息的示例:
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`
Debezium 使用附加条件字段来选择表内容的子集。
通常,当 Debezium 运行快照时,它会运行 SQL 查询,例如:
SELECT * FROM <tableName> ….
当快照请求包含附加条件属性时,该属性的数据收集和过滤参数将附加到 SQL 查询中,例如:
SELECT * FROM <data-collection> WHERE <filter> ….
例如,给定一个具有 id(主键)、颜色和品牌列的产品表,如果您希望快照仅包含 color=‘blue’ 的内容,则当您请求快照时,您可以添加附加 -用于过滤内容的条件属性:
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "schema1.products" ,"filter":"color='blue'"}]}}`
您可以使用additional-conditions 属性来传递基于多列的条件。例如,使用与上例相同的产品表,如果您希望快照仅包含产品表中 color=‘blue’ 和 Brand=‘MyBrand’ 的内容,您可以发送以下请求:
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "schema1.products" ,"filter":"color='blue' AND brand='MyBrand'"}]}}`
您还可以通过向源数据库上的表发送信号来停止增量快照。您可以通过发送 SQL INSERT 查询向表提交停止快照信号。
Debezium 检测到信令表中的变化后,会读取信号,并停止正在进行的增量快照操作。
您提交的查询指定增量快照操作,并且可以选择删除当前运行快照的表。
先决条件
使用源信令通道停止增量快照
INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<tableName>","<tableName>"],"type":"incremental"}');
例如,
INSERT INTO myschema.debezium_signal (id, type, data)
values ('ad-hoc-1',
'stop-snapshot',
'{"data-collections": ["schema1.table1", "schema2.table2"],
"type":"incremental"}');
signal命令中的id、type、data参数的取值与信令表的字段相对应。
示例中的参数说明如下表:
表 5. 用于向信令表发送停止增量快照信号的 SQL 命令中的字段说明
序列 | 值 | 描述 |
---|---|---|
1 | myschema.debezium_signal | 指定源数据库上信令表的完全限定名称。 |
2 | ad-hoc-1 | id 参数指定指定为信号请求的 id 标识符的任意字符串。使用此字符串来标识信令表中条目的日志消息。 Debezium 不使用该字符串。 |
3 | stop-snapshot | 指定类型参数指定信号要触发的操作。 |
4 | data-collections | 信号数据字段的可选组件,指定表名称或正则表达式的数组,以匹配要从快照中删除的表名称。该数组列出了按完全限定名称匹配表的正则表达式,使用与在 signal.data.collection 配置属性中指定连接器信令表名称相同的格式。如果省略数据字段的该组成部分,则该信号将停止正在进行的整个增量快照。 |
5 | incremental | 信号数据字段的必需组成部分,指定要停止的快照操作类型。目前,唯一有效的选项是增量选项。如果不指定类型值,则信号无法停止增量快照。 |
您可以向配置的 Kafka 信令主题发送信号消息以停止即席增量快照。
Kafka 消息的键必须与 topic.prefix 连接器配置选项的值匹配。
消息的值是一个带有类型和数据字段的 JSON 对象。
信号类型为stop-snapshot,数据字段必须有以下字段:
表 6. 执行快照数据字段
序列 | 值 | 描述 |
---|---|---|
type | incremental | 要执行的快照的类型。目前 Debezium 仅支持增量类型。 |
data-collections | N/A | 一个可选的逗号分隔正则表达式数组,与要包含在快照中的表的完全限定名称相匹配。使用与 signal.data.collection 配置选项所需的格式相同的格式指定名称。 |
以下示例显示了典型的停止快照 Kafka 消息:
Key = `test_connector`
Value = `{"type":"stop-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`
为了在管理快照方面提供更大的灵活性,Debezium 包含一个补充的临时快照机制,称为阻塞快照。阻止快照依赖 Debezium 机制向 Debezium 连接器发送信号。
阻塞快照的行为就像初始快照一样,只是您可以在运行时触发它。
在以下情况下,您可能希望运行阻塞快照而不是使用标准初始快照进程:
当您运行阻塞快照时,Debezium 会停止流式传输,然后启动指定表的快照,遵循与初始快照期间使用的相同流程。快照完成后,流将恢复。
配置快照
您可以在信号的数据组件中设置以下属性:
例如:
{"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]}
发送信号以触发快照的时间与流停止和快照开始的时间之间可能存在延迟。由于此延迟,在快照完成后,连接器可能会发出一些与快照捕获的记录重复的事件记录。
默认情况下,Oracle 连接器将表中发生的所有 INSERT、UPDATE 和 DELETE 操作的更改事件写入特定于该表的单个 Apache Kafka 主题。连接器使用以下约定来命名更改事件主题:
以下列表提供了默认名称的组件的定义:
例如,如果fulfillment是服务器名称,inventory是模式名称,并且数据库包含名称为orders、customers和products的表,则Debezium Oracle连接器将事件发送到以下Kafka主题,数据库中的每个表都有一个事件:
fulfillment.inventory.orders
fulfillment.inventory.customers
fulfillment.inventory.products
连接器应用类似的命名约定来标记其内部数据库架构历史主题、架构更改主题和事务元数据主题。
如果默认的主题名称不能满足您的需求,您可以配置自定义主题名称。要配置自定义主题名称,您可以在逻辑主题路由 SMT 中指定正则表达式。
当数据库客户端查询数据库时,客户端使用数据库的当前架构。但是,数据库架构可以随时更改,这意味着连接器必须能够识别记录每个插入、更新或删除操作时的架构。此外,连接器不一定将当前架构应用于每个事件。如果事件相对较旧,则它可能是在应用当前模式之前记录的。
为了确保正确处理架构更改后发生的事件,Oracle 在重做日志中不仅包含影响数据的行级更改,还包含应用于数据库的 DDL 语句。当连接器在重做日志中遇到这些 DDL 语句时,它会解析它们并更新每个表架构的内存中表示。连接器使用此架构表示来识别每次插入、更新或删除操作时的表结构,并生成适当的更改事件。在单独的数据库架构历史 Kafka 主题中,连接器记录所有 DDL 语句以及每个 DDL 语句在重做日志中出现的位置。
当连接器在崩溃或正常停止后重新启动时,它会从特定位置(即从特定时间点)开始读取重做日志。连接器通过读取数据库模式历史 Kafka 主题并解析重做日志中连接器启动点之前的所有 DDL 语句来重建此时存在的表结构。
此数据库架构历史记录主题是内部的,仅供内部连接器使用。 (可选)连接器还可以将架构更改事件发送到针对消费者应用程序的不同主题。
其他资源
您可以配置 Debezium Oracle 连接器来生成架构更改事件,这些事件描述应用于数据库中的表的结构更改。连接器将架构更改事件写入名为 <serverName> 的 Kafka 主题,其中 topicName 是在 topic.prefix 配置属性中指定的命名空间。
每当 Debezium 从新表流式传输数据或表结构发生更改时,Debezium 都会向架构更改主题发出新消息。
连接器发送到架构更改主题的消息包含有效负载,并且(可选)还包含更改事件消息的架构。
架构更改事件的架构具有以下元素:
示例:Oracle 连接器架构更改主题的架构
以下示例显示了 JSON 格式的典型架构。
{
"schema": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "databaseName"
}
],
"optional": false,
"name": "io.debezium.connector.oracle.SchemaChangeKey",
"version": 1
},
"payload": {
"databaseName": "inventory"
}
}
架构更改事件消息的负载包括以下元素:
ddl:
databaseName:
tableChanges:
重要的:
默认情况下,连接器使用 ALL_TABLES 数据库视图来标识要存储在架构历史记录主题中的表名称。在该视图中,连接器只能访问连接到数据库的用户帐户可用的表中的数据。
您可以修改设置,以便架构历史主题存储不同的表子集。使用以下方法之一更改主题存储的表集:
当连接器配置为捕获表时,它不仅将表架构更改的历史记录存储在架构更改主题中,而且还存储在内部数据库架构历史主题中。内部数据库架构历史记录主题仅供连接器使用,不适合消费应用程序直接使用。确保需要有关架构更改的通知的应用程序仅使用来自架构更改主题的信息。
重要的:
切勿对数据库架构历史主题进行分区。为了使数据库架构历史主题正常运行,它必须维护连接器向其发出的事件记录的一致的全局顺序。
为了确保主题不会在分区之间拆分,请使用以下方法之一设置主题的分区计数:
示例:发送到 Oracle 连接器架构更改主题的消息
{
"schema": {
...
},
"payload": {
"source": {
"version": "2.5.3.Final",
"connector": "oracle",
"name": "server1",
"ts_ms": 1588252618953,
"snapshot": "true",
"db": "ORCLPDB1",
"schema": "DEBEZIUM",
"table": "CUSTOMERS",
"txId" : null,
"scn" : "1513734",
"commit_scn": "1513754",
"lcr_position" : null,
"rs_id": "001234.00012345.0124",
"ssn": 1,
"redo_thread": 1,
"user_name": "user"
},
"ts_ms": 1588252618953, (1)
"databaseName": "ORCLPDB1", (2)
"schemaName": "DEBEZIUM", //
"ddl": "CREATE TABLE \"DEBEZIUM\".\"CUSTOMERS\" \n ( \"ID\" NUMBER(9,0) NOT NULL ENABLE, \n \"FIRST_NAME\" VARCHAR2(255), \n \"LAST_NAME" VARCHAR2(255), \n \"EMAIL\" VARCHAR2(255), \n PRIMARY KEY (\"ID\") ENABLE, \n SUPPLEMENTAL LOG DATA (ALL) COLUMNS\n ) SEGMENT CREATION IMMEDIATE \n PCTFREE 10 PCTUSED 40 INITRANS 1 MAXTRANS 255 \n NOCOMPRESS LOGGING\n STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645\n PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1\n BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)\n TABLESPACE \"USERS\" ", (3)
"tableChanges": [ (4)
{
"type": "CREATE", (5)
"id": "\"ORCLPDB1\".\"DEBEZIUM\".\"CUSTOMERS\"", (6)
"table": { (7)
"defaultCharsetName": null,
"primaryKeyColumnNames": [ (8)
"ID"
],
"columns": [ (9)
{
"name": "ID",
"jdbcType": 2,
"nativeType": null,
"typeName": "NUMBER",
"typeExpression": "NUMBER",
"charsetName": null,
"length": 9,
"scale": 0,
"position": 1,
"optional": false,
"autoIncremented": false,
"generated": false
},
{
"name": "FIRST_NAME",
"jdbcType": 12,
"nativeType": null,
"typeName": "VARCHAR2",
"typeExpression": "VARCHAR2",
"charsetName": null,
"length": 255,
"scale": null,
"position": 2,
"optional": false,
"autoIncremented": false,
"generated": false
},
{
"name": "LAST_NAME",
"jdbcType": 12,
"nativeType": null,
"typeName": "VARCHAR2",
"typeExpression": "VARCHAR2",
"charsetName": null,
"length": 255,
"scale": null,
"position": 3,
"optional": false,
"autoIncremented": false,
"generated": false
},
{
"name": "EMAIL",
"jdbcType": 12,
"nativeType": null,
"typeName": "VARCHAR2",
"typeExpression": "VARCHAR2",
"charsetName": null,
"length": 255,
"scale": null,
"position": 4,
"optional": false,
"autoIncremented": false,
"generated": false
}
],
"attributes": [ (10)
{
"customAttribute": "attributeValue"
}
]
}
}
]
}
}
表 7. 发送到架构更改主题的消息中的字段描述
序列 | 值 | 描述 |
---|---|---|
1 | ts_ms | 显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。在源对象中,ts_ms 表示数据库中进行更改的时间。通过将payload.source.ts_ms的值与payload.ts_ms的值进行比较,您可以确定源数据库更新与Debezium之间的滞后。 |
2 | databaseName,schemaName | 标识包含更改的数据库和架构。 |
3 | ddl | 该字段包含负责架构更改的 DDL。 |
4 | tableChanges | 包含由 DDL 命令生成的架构更改的一项或多项的数组。 |
5 | type | 描述了这种变化。类型可以设置为以下值之一:CREATE:表已创建。ALTER:表已修改。DROP:表已删除。 |
6 | id | 创建、更改或删除的表的完整标识符。在表重命名的情况下,该标识符是 <old>、<new> 表名称的串联。 |
7 | table | 表示应用更改后的表元数据。 |
8 | primaryKeyColumnNames | 组成表主键的列的列表。 |
9 | columns | 已更改表中每列的元数据。 |
10 | attributes | 每个表更改的自定义属性元数据。 |
在连接器发送到架构更改主题的消息中,消息键是包含架构更改的数据库的名称。在以下示例中,有效负载字段包含databaseName键:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "databaseName"
}
],
"optional": false,
"name": "io.debezium.connector.oracle.SchemaChangeKey",
"version": 1
},
"payload": {
"databaseName": "ORCLPDB1"
}
}
Debezium 可以生成代表事务元数据边界并丰富数据更改事件消息的事件。
注意:
数据库事务由包含在 BEGIN 和 END 关键字之间的语句块表示。 Debezium 为每个事务中的 BEGIN 和 END 分隔符生成事务边界事件。事务边界事件包含以下字段:
以下示例显示了典型的事务边界消息:
示例:Oracle 连接器事务边界事件
{
"status": "BEGIN",
"id": "5.6.641",
"ts_ms": 1486500577125,
"event_count": null,
"data_collections": null
}
{
"status": "END",
"id": "5.6.641",
"ts_ms": 1486500577691,
"event_count": 2,
"data_collections": [
{
"data_collection": "ORCLPDB1.DEBEZIUM.CUSTOMER",
"event_count": 1
},
{
"data_collection": "ORCLPDB1.DEBEZIUM.ORDER",
"event_count": 1
}
]
}
除非通过 topic.transaction 选项覆盖,否则连接器会将事务事件发送到 <topic.prefix>.transaction 主题。
启用事务元数据后,数据消息信封将通过新的事务字段进行丰富。该字段以字段组合的形式提供有关每个事件的信息:
以下示例显示了典型的事务事件消息:
{
"before": null,
"after": {
"pk": "2",
"aa": "1"
},
"source": {
...
},
"op": "c",
"ts_ms": "1580390884335",
"transaction": {
"id": "5.6.641",
"total_order": "1",
"data_collection_order": "1"
}
}
更多Debezium技术请参考:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。