当前位置:   article > 正文

flume第五讲kafka source(常用)_kafkasource

kafkasource

flume第五讲kafka source(常用)

工作机制
用kafka consumer连接kafka,读取数据然后转换成event,写入channel,
读取kafka中的数据转换成event储存在channel中

重要的参数详解
type org.apache.flume.source.kafka.KafkaSource(全类名)
kafak.bootstrap.servers Kafka broker服务器列表,逗号分隔
kafka.topics 消费主题的名称
kafka.topic.regex 用正则表达式来制定一批topic;

在虚拟机新建一个配置文件
[root@doit02 agent]# vi kafka-mem-logger.conf

[root@doit02 agent]# vi kafka-mem-logger.conf

a1.sources = s1
a1.channels = c1
a1.sinks = k1

a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.s1.channels = c1
a1.sources.s1.batchSize = 100
#超时时间为2,没有写进去就不写了
a1.sources.s1.batchDurationMillis = 2000
#设置kafka服务器的地址
a1.sources.s1.kafka.bootstrap.servers = doit01:9092,doit02:9092,doit03:9092
#设置主题名称
a1.sources.s1.kafka.topics = doit13
#设置批次的容量
a1.source.s1.batchSize = 100

a1.channels.c1.type = memory
#设置channels批量处理的容量
a1.channels.c1.capacity = 1000
#迫使上面的设置生效
a1.channels.c1.transactionCapacity = 100
#设置channels的容量
a1.channels.c1.capacity = 200

a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

启动zookeeper集群

启动我们的kafka集群

创建一个topic doit13

往topic doit13里面写数据

[root@doit01 kafka_2.11-2.2.2]# bin/kafka-console-producer.sh --broker-list doit01:9092, doit02:9092,doit03:9092, doit04:9092 --topic doit13
>dafeige
>gafeigege
>shisijun
>shisijun
>shizijun
>shizijun
>laoganbu
>laoganbu 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

启动flume采集设置

[root@doit02 flume-1.9.0-bin]# bin/flume-ng agent -n a1 -c conf -f \
agent/kafka-mem-logger.conf -Dflume.root.logger=INFO,console
  • 1
  • 2

打印台展示执行结果

2020-04-22 00:21:23,352 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{topic=doit13, partition=3, offset=0, timestamp=1587486078664} body: 73 68 69 73 69 6A 75 6E                         shisijun }
2020-04-22 00:21:23,352 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{topic=doit13, partition=0, offset=0, timestamp=1587486078665} body: 73 68 69 73 69 6A 75 6E                         shisijun }
2020-04-22 00:21:23,353 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{topic=doit13, partition=2, offset=1, timestamp=1587486079705} body: 73 68 69 7A 69 6A 75 6E                         shizijun }
2020-04-22 00:21:23,353 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{topic=doit13, partition=1, offset=1, timestamp=1587486081475} body: 73 68 69 7A 69 6A 75 6E                         shizijun }
2020-04-22 00:21:27,664 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{topic=doit13, partition=0, offset=1, timestamp=1587486085672} body: 6C 61 6F 67 61 6E 62 75                         laoganbu }
2020-04-22 00:21:29,667 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{topic=doit13, partition=3, offset=1, timestamp=1587486088944} body: 6C 61 6F 67 61 6E 62 75 20                      laoganbu  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/789347
推荐阅读
相关标签
  

闽ICP备14008679号