赞
踩
Flume 是一个在大数据开发中非常常用的组件。可以用于 Kafka 的生产者,也可以用于
Flume 的消费者。
#前面章节有提到一键启停脚本的写法
kk start
#为了简单, 先使用console-consumer作为消费者
bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --topic first
先在Flume安装目录的job目录下, 创建
flume_to_kafka.conf
# 1 组件定义 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 2 配置 source a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /opt/module/applog/cal.* a1.sources.r1.positionFile =/opt/module/flume -1.7.0/taildir_position.json # 3 配置 channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 4 配置 sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092 a1.sinks.k1.kafka.topic = first a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 # 5 拼接组件 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
bin/flume-ng agent -n a1 -c conf/ -f job/flume_to_kafka.conf
bin/kafka-console-consumer.sh --bootstrap-server bigdata01:9092 --topic first --from-begining
kafka-to-flume.conf
# 1 组件定义 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 2 配置 source a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 50 a1.sources.r1.batchDurationMillis = 200 a1.sources.r1.kafka.bootstrap.servers = bigdata01:9092 a1.sources.r1.kafka.topics = first a1.sources.r1.kafka.consumer.group.id = custom.g.id # 3 配置 channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 4 配置 sink a1.sinks.k1.type = logger # 5拼接组件 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
bin/flume-ng agent -c conf/ -n a1 -f job/kafka-to-flume.conf -Dflume.root.logger=INFO,console
bin/kafka-console-producer.sh --broker-list bigdata01:9092 --topic first
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。