当前位置:   article > 正文

Kafka-Connect实践

kafka-connect

一、Kafka-Connect介绍

  Kafka是一个使用越来越广的消息系统,尤其是在大数据开发中(实时数据处理和分析)。为集成其他系统和解耦应用,

  • 之前经常使用Producer来发送消息到Broker,并使用Consumer来消费Broker中的消息。
  • Kafka Connect是到0.9版本才提供的并极大的简化了其他系统与Kafka的集成。Kafka Connect运用用户快速定义并实现各种Connector(File,Jdbc,Hdfs等),这些功能让大批量数据导入/导出Kafka很方便。
    在这里插入图片描述

1.1、Kafka Connect 特性包括:

  • Kafka connector通用框架,提供统一的集成API
  • 同时支持分布式模式和单机模式
  • REST 接口,用来查看和管理Kafka connectors
  • 自动化的offset管理,开发人员不必担心错误处理的影响
  • 分布式、可扩展
  • 流/批处理集成

二、Kafka-Connect实践

2.1、Flie Connector 测试

在这里插入图片描述
本例使用到了两个Connector:

  • FileStreamSource:从test.txt中读取并发布到Broker
  • FileStreamSink:从Broker中读取数据并写入到test.sink.txt文件中

2.1.1、配置Source

其中的Source使用到的配置文件是${KAFKA_HOME}/config/connect-file-source.properties

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
  • 1
  • 2
  • 3
  • 4
  • 5

2.1.2、配置Sink

其中的Sink使用到的配置文件是${KAFKA_HOME}/config/connect-file-sink.properties

name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test
  • 1
  • 2
  • 3
  • 4
  • 5

2.1.3、配置 Broker

Broker使用到的配置文件是${KAFKA_HOME}/config/connect-standalone.properties

# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# The internal converter used for offsets and config data is configurable and must be specified, but most users will
# always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

2.1.4、启动Source Connector和Sink Connector

cd ${KAFKA_HOME}
./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties 
  • 1
  • 2

2.1.5、打开console-consumer

./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic connect-test
  • 1

2.1.6、写入到test.txt文件中,并观察console-consumer中的变化

[root@Server4 kafka_2.12-0.11.0.0]# echo 'firest line' >> test.txt
[root@Server4 kafka_2.12-0.11.0.0]# echo 'second line' >> test.txt
  • 1
  • 2

console-consumer中打开的窗口输出如下

{"schema":{"type":"string","optional":false},"payload":"firest line"}
{"schema":{"type":"string","optional":false},"payload":"second line"}
  • 1
  • 2

查看 test.sink.txt

[root@Server4 kafka_2.12-0.11.0.0]# cat test.sink.txt 
firest line
second line
  • 1
  • 2
  • 3

关注我的公众号【宝哥大数据】,更多干货

在这里插入图片描述
参考
https://www.cnblogs.com/videring/articles/6371081.html

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/782166
推荐阅读
相关标签
  

闽ICP备14008679号