赞
踩
- a1.sources = r1
- a1.channels = c1
-
-
- a1.sources.r1.type = TAILDIR
- a1.sources.r1.filegroups = f1
- a1.sources.r1.filegroups.f1 = /opt/installs/flume1.9/job/a.log
- a1.sources.r1.positionFile = /opt/installs/flume1.9/job/taildir-kafka.json
-
- a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
- a1.channels.c1.kafka.bootstrap.servers =hadoop11:9092,hadoop12:9092,hadoop13:9092
- a1.channels.c1.kafka.topic = topica
- a1.channels.c1.parseAsFlumeEvent = false
-
- a1.sources.r1.channels = c1
执行命令:
flume-ng agent --conf conf --name a1 --conf-file job/taildir-kafka.conf -Dflume.root.logger=INFO,console
向a.log添加测试数据:
消费者:
flume:
- a1.sources = r1
- a1.channels = c1
- a1.sinks = k1
-
- a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
- a1.sources.r1.batchSize=5000
- a1.sources.r1.batchDurationMillis=2000
- a1.sources.r1.kafka.bootstrap.servers =hadoop11:9092,hadoop12:9092,hadoop13:9092
- a1.sources.r1.kafka.topics = topica
- a1.sources.r1.kafka.consumer.group.id = g1
-
- a1.channels.c1.type = memory
- a1.channels.c1.capacity=5000
- a1.channels.c1.transactionCapacity=5000
-
-
- a1.sinks.k1.type = hdfs
- a1.sinks.k1.batchSize = 5000
- a1.sinks.k1.hdfs.path = hdfs://hadoop11:8020/flume/date=%Y-%m-%d
- a1.sinks.k1.hdfs.useLocalTimeStamp = true
- a1.sinks.k1.hdfs.fileType = DataStream
- a1.sinks.k1.hdfs.round = true
- a1.sinks.k1.hdfs.rollInterval =0
- a1.sinks.k1.hdfs.rollSize = 1048576
- a1.sinks.k1.hdfs.rollCount = 0
-
- a1.sources.r1.channels = c1
- a1.sinks.k1.channel = c1
执行命令:
flume-ng agent --conf conf --name a1 --conf-file job/kafka-hdfs.conf -Dflume.root.logger=INFO,console
向a.log添加测试数据:
消费者:
hdfs:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。