赞
踩
Flume日志采集组件;Flume对接kafka主要是为了通过kafka的topic功能,动态的增加或者减少接收的节点,并且Flume要对接多个节点是需要多个channel和sink的会导致内存不够的情况。
那么可以实现的场景就是Flume采集日志文件,通过kafka给多给业务线使用。
1)配置 flume(flume-kafka.conf)
# define a1.sources = r1 a1.sinks = k1 a1.channels = c1 # source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers = hadoop113:9092,hadoop114:9092,hadoop115:9092 a1.sinks.k1.kafka.topic = first a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 # channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
kafka-console-consumer.sh --zookeeper hadoop113:2181 --topic first
bin/flume-ng agent -c conf/ -n a1 -f jobs/flume-kafka.conf
4)启动nc发送数据
[bd@hadoop113 ~]$ nc localhost 44444
hello
OK
word
OK
结果如下
[bd@hadoop113 ~]$ kafka-console-consumer.sh --zookeeper hadoop113:2181 --topic first
hello
word
依据Kafka Sink的配置
Property Name | Default | Description |
---|---|---|
kafka.topic | default-flume-topic | The topic in Kafka to which the messages will be published. If this parameter is configured, messages will be published to this topic. If the event header contains a “topic” field, the event will be published to that topic overriding the topic configured here. |
在消息头中携带了topic字段的话,该消息就会被发送到topic字段对应的topic去。
那么在flume接收到消息之后,可以通过拦截器为topic加上header,即可将其进行分类。
Flume拦截器如下:
public class JudgeTestStringInterceptor implements Interceptor { // 声明一个存放事件的List private List<Event> allEvents; public void initialize () { // 初始化 allEvents = new ArrayList<Event>(); } /** * 单个事件拦截 * @param event * @return */ public Event intercept (Event event) { // 1、获取事件中的头信息 Map<String, String> headers = event.getHeaders(); // 2、获取事件中的body信息 String body = new String(event.getBody()); // 3、根据body中是否有“test”来决定添加怎样的头信息 // 有的话添加<topic, first>没有则添加<topic, second> if (body.contains("test")) { headers.put("topic", "first"); } else { headers.put("topic", "second"); } return event; // 如果返回null则认为该事件无用,将会被过滤 } /** * 批量事件拦截 * @param list * @return */ public List<Event> intercept (List<Event> list) { // 1、清空集合 allEvents.clear(); // 2、遍历event for (Event event : list) { // 3、给每个事件添加头信息 allEvents.add(intercept(event)); } return allEvents; } public void close () { } // 定义一个Builder对象 public static class Builder implements Interceptor.Builder { public Interceptor build () { return new JudgeTestStringInterceptor(); } public void configure (Context context) { } } }
配置文件type-kafka.conf如下:
# define a1.sources = r1 a1.sinks = k1 a1.channels = c1 # source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # interceptor a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.starnet.interceptor.JudgeTestStringInterceptor$Builder # sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers = hadoop113:9092,hadoop114:9092,hadoop115:9092 a1.sinks.k1.kafka.topic = first a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 # channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
启动flume,两个消费者以及nc之后结果如下:
[bd@hadoop113 ~]$ nc localhost 44444
test
OK
hello
OK
word
OK
[bd@hadoop113 ~]$ kafka-console-consumer.sh --zookeeper hadoop113:2181 --topic first
test
[bd@hadoop113 ~]$ kafka-console-consumer.sh --zookeeper hadoop113:2181 --topic second
hello
word
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。