赞
踩
Kafka
是一个使用越来越广的消息系统,尤其是在大数据开发中(实时数据处理和分析)。为集成其他系统和解耦应用,
Producer
来发送消息到Broker,并使用Consumer来消费Broker中的消息。Kafka Connect
是到0.9
版本才提供的并极大的简化了其他系统与Kafka
的集成。Kafka Connect
运用用户快速定义并实现各种Connector(File,Jdbc,Hdfs等)
,这些功能让大批量数据导入/导出Kafka
很方便。Kafka Connect
特性包括:Kafka connector
通用框架,提供统一的集成API
REST
接口,用来查看和管理Kafka connectors
offset
管理,开发人员不必担心错误处理的影响
本例使用到了两个Connector
:
FileStreamSource
:从test.txt
中读取并发布到Broker
中FileStreamSink
:从Broker中读取数据并写入到test.sink.txt
文件中其中的Source
使用到的配置文件是${KAFKA_HOME}/config/connect-file-source.properties
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
其中的Sink使用到的配置文件是${KAFKA_HOME}/config/connect-file-sink.properties
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test
Broker使用到的配置文件是${KAFKA_HOME}/config/connect-standalone.properties
# These are defaults. This file just demonstrates how to override some settings. bootstrap.servers=localhost:9092 # The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will # need to configure these based on the format they want their data in when loaded from or stored into Kafka key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply # it to key.converter.schemas.enable=true value.converter.schemas.enable=true # The internal converter used for offsets and config data is configurable and must be specified, but most users will # always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format. internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false offset.storage.file.filename=/tmp/connect.offsets # Flush much faster than normal, which is useful for testing/debugging offset.flush.interval.ms=10000
cd ${KAFKA_HOME}
./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic connect-test
console-consumer
中的变化[root@Server4 kafka_2.12-0.11.0.0]# echo 'firest line' >> test.txt
[root@Server4 kafka_2.12-0.11.0.0]# echo 'second line' >> test.txt
console-consumer
中打开的窗口输出如下
{"schema":{"type":"string","optional":false},"payload":"firest line"}
{"schema":{"type":"string","optional":false},"payload":"second line"}
查看 test.sink.txt
[root@Server4 kafka_2.12-0.11.0.0]# cat test.sink.txt
firest line
second line
参考
https://www.cnblogs.com/videring/articles/6371081.html
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。