当前位置:   article > 正文

Flume-Kafka-Flume对接Kafka以及Kafka数据分类传输_flume 将kafka数据转发到不同kafka

flume 将kafka数据转发到不同kafka

Flume 对接 Kafka

Flume日志采集组件;Flume对接kafka主要是为了通过kafka的topic功能,动态的增加或者减少接收的节点,并且Flume要对接多个节点是需要多个channel和sink的会导致内存不够的情况。

那么可以实现的场景就是Flume采集日志文件,通过kafka给多给业务线使用。

1)配置 flume(flume-kafka.conf)

# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop113:9092,hadoop114:9092,hadoop115:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  1. 启动消费者
kafka-console-consumer.sh --zookeeper hadoop113:2181 --topic first
  • 1
  1. 进入 flume 根目录下,启动 flume
bin/flume-ng agent -c conf/ -n a1 -f jobs/flume-kafka.conf
  • 1

4)启动nc发送数据

[bd@hadoop113 ~]$ nc localhost 44444
hello
OK
word
OK

结果如下
[bd@hadoop113 ~]$ kafka-console-consumer.sh --zookeeper hadoop113:2181 --topic first
hello
word
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

Kafka数据分类

依据Kafka Sink的配置

Property NameDefaultDescription
kafka.topicdefault-flume-topicThe topic in Kafka to which the messages will be published. If this parameter is configured, messages will be published to this topic. If the event header contains a “topic” field, the event will be published to that topic overriding the topic configured here.

在消息头中携带了topic字段的话,该消息就会被发送到topic字段对应的topic去。

那么在flume接收到消息之后,可以通过拦截器为topic加上header,即可将其进行分类。

Flume拦截器如下:

public class JudgeTestStringInterceptor implements Interceptor {

    // 声明一个存放事件的List
    private List<Event> allEvents;

    public void initialize () {

        // 初始化
        allEvents = new ArrayList<Event>();
    }

    /**
     * 单个事件拦截
     * @param event
     * @return
     */
    public Event intercept (Event event) {

        // 1、获取事件中的头信息
        Map<String, String> headers = event.getHeaders();

        // 2、获取事件中的body信息
        String body = new String(event.getBody());

        // 3、根据body中是否有“test”来决定添加怎样的头信息
        // 有的话添加<topic, first>没有则添加<topic, second>
        if (body.contains("test")) {
            headers.put("topic", "first");
        } else {
            headers.put("topic", "second");
        }

        return event;
        // 如果返回null则认为该事件无用,将会被过滤
    }

    /**
     * 批量事件拦截
     * @param list
     * @return
     */
    public List<Event> intercept (List<Event> list) {

        // 1、清空集合
        allEvents.clear();

        // 2、遍历event
        for (Event event : list) {
            // 3、给每个事件添加头信息
            allEvents.add(intercept(event));
        }

        return allEvents;
    }

    public void close () {

    }

    // 定义一个Builder对象
    public static class Builder implements Interceptor.Builder {

        public Interceptor build () {

            return new JudgeTestStringInterceptor();
        }

        public void configure (Context context) {

        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72

配置文件type-kafka.conf如下:

# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.starnet.interceptor.JudgeTestStringInterceptor$Builder

# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop113:9092,hadoop114:9092,hadoop115:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

启动flume,两个消费者以及nc之后结果如下:

[bd@hadoop113 ~]$ nc localhost 44444
test
OK
hello
OK
word
OK


[bd@hadoop113 ~]$ kafka-console-consumer.sh --zookeeper hadoop113:2181 --topic first
test

[bd@hadoop113 ~]$ kafka-console-consumer.sh --zookeeper hadoop113:2181 --topic second
hello
word
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/692308
推荐阅读
相关标签
  

闽ICP备14008679号