当前位置:   article > 正文

干货:使用 Kafka connect 同步数据至 Elasticsearch_docker kafka 导入es

docker kafka 导入es

接着上篇安装完 postgresql connect,我们再安装 es connect 就容易多了;

  • 安装 es connector plugins

因为 docker 安装的 connect 容器里没有 es 的 connect plugins,所以我们去 confluent 官网下载(搜索 Kafka Connect Elasticsearch 下载即可)

下载解压后放至 connect 目录(上篇中设置的挂载目录)中,如果不记得将容器目录挂载到哪可通过如下命令查看:

docker inspect 容器id |grep Mounts -A 20

放置完成后重启 connect 容器,并请求如下 http 验证:

get  ip:8093/connector-plugins

  • 创建 es sink connector
post ip:8093/connectors   为何不可为大牛?

{
    "name": "es-sink1",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "connection.url": "http://ip:9200",
        "connection.username": "elastic",
        "connection.password": "elastic_xdeas",
        "type.name": "_doc",
        "key.ignore": "false",
        "topics": "know.knowledge.formal_new",
        "write.method": "upsert",
        "behavior.on.null.values": "delete",
        "transforms": "key,ExtractFieldObject",
        "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.key.field": "id",
        "transforms.ExtractFieldObject.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
        "transforms.ExtractFieldObject.field": "after"
    }
}
(为何不可为大牛?)
这里的es connector 配置的着重解析一下:
一开始不知道怎么配认证,翻遍了国内外官方/非官方博客文档都没有找到,几乎要放弃了,最后在stackoverflow找到了 https://stackoverflow.com/questions/58381240/how-to-kafka-connect-elasticsearch-with-ssl (强烈吐槽!官方文档能不能详细点!)
key.ignore 如果设置为true,ES里面_id的值会自动生成,这样的话表里某行记录只要一变化,es就会增加一条数据,所以一定要设置为falsetopics:需要订阅的topic,即上篇配置完pg connector后生成的topic;
transforms:数据转换有关;
transforms.key.type和transforms.key.field这里配置的意思是将表中的id作为es里面的文档id;
"transforms.ExtractFieldObject.field": "after"  字段筛选,我们只需要"after"字段的数据,
因为如果没有transforms.ExtractFieldObject.type 和 transforms.ExtractFieldObject.field的配置,其他的一些无关紧要的元数据也会进入es,索引里数据会是下面这样:
(再次吐槽官方文档,这里也是花了很多时间才摸索这试出来,太难了)
"payload":{"before":null,
"after":{"id":"1","collect_id":"1","title":"test","content":"1","publish_date":1591025759000000,"collect_date":1591025761000000,"status":1,"create_date":1591025764000000,"creater":"1","update_date":1591025769000000,"updater":"1","link":"1","label":["1"],"origin":"4"},
"source":{"version":"1.1.1.Final","connector":"postgresql","name":"know","ts_ms":1591006642405,"snapshot":"false","db":"xdeasdb","schema":"knowledge","table":"knowledge_formal_new","txId":1604,"lsn":29368760,"xmin":null},
"op":"u","ts_ms":1591006642869,"transaction":null}}

验证:获取所有的connectors:get ip:8093/connectors/
  • 同步验证 

     如上述操作没问题,修改表数据,能看到 es 中自动创建了索引并将最新数据同步了过来,索引名即对应上步配置的 topics :know.knowledge.formal_new

总结:kafka connector 是 kafka 内置的数据传输工具,上文我们创建了一个 postgresql connector(依赖 debezium 的 PostgresConnector)其实就是等价于我们在 kafka 的 config 目录中添加了一个 connect-file-source.properties 配置文件(source 代表数据来源);这里我们创建的 es sink connector 等价于在 config 目录添加了一个 connect-file-sink.properties 配置文件(sink 代表数据输出);这里采用 docker 和 api 管理 kafka 的 connector 就显得方便多了;

转发自上面链接: 干货:使用Kafka connect 同步数据至Elasticsearch - 为何不可1995的个人空间 - OSCHINA - 中文开源技术交流社区

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/78564
推荐阅读
相关标签
  

闽ICP备14008679号