当前位置:   article > 正文

kafka 从mysql抽取,如何从Apache Kafka中的远程数据库中提取数据?

kafka读取mysql数据库 java

I want to make real-time data pipeline in Apache Kafka. I have database which is located at remote location and that database continuously updating. Can anybody which Kafka connect API i should use to pull the data from database and ingest into Kafka broker in real time? later on i would use kafka stream and KSQL to run ad-hoc queries to perform the metrics.

Any help would be highly appreciated!

解决方案

If you want to create a real-time data pipeline you need to use a Change Data Capture (CDC) tool which is able to stream changes from MySQL. I would suggest Debezium which is an open source distributed platform for change data capture.

Capturing Inserts

When a new record is added to a table, a JSON similar to the one below will be produced:

{

"payload":{

"before":null,

"after":{

"id":1005,

"first_name":"Giorgos",

"last_name":"Myrianthous",

"email":"giorgos@abc.com"

},

"source":{

"name":"dbserver1",

"server_id":223344,

"ts_sec":1500369632,

"gtid":null,

"file":"mysql-bin.000003",

"pos":364,

"row":0,

"snapshot":null,

"thread":13,

"db":"inventory",

"table":"customers"

},

"op":"c",

"ts_ms":1500369632095

}

}

before object is null and after object contains the newly inserted values. Note that the op attribute is c, indicating that this was a CREATE event.

Capturing Updates

Assuming that email attribute has been updated, a JSON similar to the one below will be produced:

{

"payload":{

"before":{

"id":1005,

"first_name":"Giorgos",

"last_name":"Myrianthous",

"email":"giorgos@abc.com"

},

"after":{

"id":1005,

"first_name":"Giorgos",

"last_name":"Myrianthous",

"email":"newEmail@abc.com"

},

"source":{

"name":"dbserver1",

"server_id":223344,

"ts_sec":1500369929,

"gtid":null,

"file":"mysql-bin.000003",

"pos":673,

"row":0,

"snapshot":null,

"thread":13,

"db":"inventory",

"table":"customers"

},

"op":"u",

"ts_ms":1500369929464

}

}

Notice op which is now u, indicating that this was an UPDATE event. before object shows the row state before the update and after object captures the current state of the updated row.

Capturing deletes

Now assume that the row has been deleted;

{

"payload":{

"before":{

"id":1005,

"first_name":"Giorgos",

"last_name":"Myrianthous",

"email":"newEmail@abc.com"

},

"after":null,

"source":{

"name":"dbserver1",

"server_id":223344,

"ts_sec":1500370394,

"gtid":null,

"file":"mysql-bin.000003",

"pos":1025,

"row":0,

"snapshot":null,

"thread":13,

"db":"inventory",

"table":"customers"

},

"op":"d",

"ts_ms":1500370394589

}

}

op new is equal to d, indicating a DELETE event. after attribute will be null and before object contains the row before it gets deleted.

You can also have a look at the extensive tutorial provided in their website.

EDIT: Example configuration for a MySQL database

{

"name": "inventory-connector", (1)

"config": {

"connector.class": "io.debezium.connector.mysql.MySqlConnector", (2)

"database.hostname": "192.168.99.100", (3)

"database.port": "3306", (4)

"database.user": "debezium", (5)

"database.password": "dbz", (6)

"database.server.id": "184054", (7)

"database.server.name": "fullfillment", (8)

"database.whitelist": "inventory", (9)

"database.history.kafka.bootstrap.servers": "kafka:9092", (10)

"database.history.kafka.topic": "dbhistory.fullfillment" (11)

"include.schema.changes": "true" (12)

}

}

1 The name of our connector when we register it with a Kafka Connect

service.

2 The name of this MySQL connector class.

3 The address of

the MySQL server.

4 The port number of the MySQL server.

5 The name of

the MySQL user that has the required privileges.

6 The password for

the MySQL user that has the required privileges.

7 The connector’s

identifier that must be unique within the MySQL cluster and similar to

MySQL’s server-id configuration property.

8 The logical name of the

MySQL server/cluster, which forms a namespace and is used in all the

names of the Kafka topics to which the connector writes, the Kafka

Connect schema names, and the namespaces of the corresponding Avro

schema when the Avro Connector is used.

9 A list of all databases

hosted by this server that this connector will monitor. This is

optional, and there are other properties for listing the databases and

tables to include or exclude from monitoring.

10 The list of Kafka

brokers that this connector will use to write and recover DDL

statements to the database history topic.

11 The name of the database

history topic where the connector will write and recover DDL

statements. This topic is for internal use only and should not be used

by consumers.

12 The flag specifying that the connector should

generate on the schema change topic named fullfillment events with the

DDL changes that can be used by consumers.

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/580445
推荐阅读
相关标签
  

闽ICP备14008679号