当前位置:   article > 正文

六, Kafka与Flume对接_kafka 前面接flume

kafka 前面接flume

Kafka 对接 Flume

Flume 是一个在大数据开发中非常常用的组件。可以用于 Kafka 的生产者,也可以用于
Flume 的消费者。

一, Flume作为生产者

在这里插入图片描述

1. 启动Kafka集群

#前面章节有提到一键启停脚本的写法
kk start
  • 1
  • 2

2. 启动Kafka消费者

#为了简单, 先使用console-consumer作为消费者
bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --topic first
  • 1
  • 2

3. 设置Flume配置文件

先在Flume安装目录的job目录下, 创建flume_to_kafka.conf

# 1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 2 配置 source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/cal.*
a1.sources.r1.positionFile =/opt/module/flume -1.7.0/taildir_position.json
# 3 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 4 配置 sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# 5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

4. 启动Flume

bin/flume-ng agent -n a1 -c conf/ -f job/flume_to_kafka.conf
  • 1

5. Flume采集数据, Kafka消费数据情况

  • Kafka 消费者启动
bin/kafka-console-consumer.sh --bootstrap-server bigdata01:9092 --topic first --from-begining
  • 1

在这里插入图片描述

二, Flume作为消费者

在这里插入图片描述

1. 配置Flume

  • 在flume的安装目录下新建conf文件, kafka-to-flume.conf
# 1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 2 配置 source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 50
a1.sources.r1.batchDurationMillis = 200
a1.sources.r1.kafka.bootstrap.servers = bigdata01:9092
a1.sources.r1.kafka.topics = first
a1.sources.r1.kafka.consumer.group.id = custom.g.id
# 3 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 4 配置 sink
a1.sinks.k1.type = logger
# 5拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

2. 启动Flume

bin/flume-ng agent -c conf/ -n a1 -f job/kafka-to-flume.conf -Dflume.root.logger=INFO,console
  • 1

3. 启动Kafka生产者

bin/kafka-console-producer.sh --broker-list bigdata01:9092 --topic first
  • 1

4. 测试

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/692316
推荐阅读
相关标签
  

闽ICP备14008679号