赞
踩
接着上篇安装完 postgresql connect,我们再安装 es connect 就容易多了;
因为 docker 安装的 connect 容器里没有 es 的 connect plugins,所以我们去 confluent 官网下载(搜索 Kafka Connect Elasticsearch 下载即可)
下载解压后放至 connect 目录(上篇中设置的挂载目录)中,如果不记得将容器目录挂载到哪可通过如下命令查看:
docker inspect 容器id |grep Mounts -A 20
放置完成后重启 connect 容器,并请求如下 http 验证:
get ip:8093/connector-plugins
post ip:8093/connectors 为何不可为大牛?
{
"name": "es-sink1",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://ip:9200",
"connection.username": "elastic",
"connection.password": "elastic_xdeas",
"type.name": "_doc",
"key.ignore": "false",
"topics": "know.knowledge.formal_new",
"write.method": "upsert",
"behavior.on.null.values": "delete",
"transforms": "key,ExtractFieldObject",
"transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.key.field": "id",
"transforms.ExtractFieldObject.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.ExtractFieldObject.field": "after"
}
}
(为何不可为大牛?)
这里的es connector 配置的着重解析一下:
一开始不知道怎么配认证,翻遍了国内外官方/非官方博客文档都没有找到,几乎要放弃了,最后在stackoverflow找到了 https://stackoverflow.com/questions/58381240/how-to-kafka-connect-elasticsearch-with-ssl (强烈吐槽!官方文档能不能详细点!)
key.ignore 如果设置为true,ES里面_id的值会自动生成,这样的话表里某行记录只要一变化,es就会增加一条数据,所以一定要设置为false;
topics:需要订阅的topic,即上篇配置完pg connector后生成的topic;
transforms:数据转换有关;
transforms.key.type和transforms.key.field这里配置的意思是将表中的id作为es里面的文档id;
"transforms.ExtractFieldObject.field": "after" 字段筛选,我们只需要"after"字段的数据,
因为如果没有transforms.ExtractFieldObject.type 和 transforms.ExtractFieldObject.field的配置,其他的一些无关紧要的元数据也会进入es,索引里数据会是下面这样:
(再次吐槽官方文档,这里也是花了很多时间才摸索这试出来,太难了)
"payload":{"before":null,
"after":{"id":"1","collect_id":"1","title":"test","content":"1","publish_date":1591025759000000,"collect_date":1591025761000000,"status":1,"create_date":1591025764000000,"creater":"1","update_date":1591025769000000,"updater":"1","link":"1","label":["1"],"origin":"4"},
"source":{"version":"1.1.1.Final","connector":"postgresql","name":"know","ts_ms":1591006642405,"snapshot":"false","db":"xdeasdb","schema":"knowledge","table":"knowledge_formal_new","txId":1604,"lsn":29368760,"xmin":null},
"op":"u","ts_ms":1591006642869,"transaction":null}}
验证:获取所有的connectors:get ip:8093/connectors/
如上述操作没问题,修改表数据,能看到 es 中自动创建了索引并将最新数据同步了过来,索引名即对应上步配置的 topics :know.knowledge.formal_new
总结:kafka connector 是 kafka 内置的数据传输工具,上文我们创建了一个 postgresql connector(依赖 debezium 的 PostgresConnector)其实就是等价于我们在 kafka 的 config 目录中添加了一个 connect-file-source.properties 配置文件(source 代表数据来源);这里我们创建的 es sink connector 等价于在 config 目录添加了一个 connect-file-sink.properties 配置文件(sink 代表数据输出);这里采用 docker 和 api 管理 kafka 的 connector 就显得方便多了;
转发自上面链接: 干货:使用Kafka connect 同步数据至Elasticsearch - 为何不可1995的个人空间 - OSCHINA - 中文开源技术交流社区
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。