赞
踩
1. Kafka Source
属性名 | 默认值 | 描述 |
channels | ||
type | 应该设置为:org.apache.flume.source.kafka.KafkaSource | |
kafka.bootstrap.servers | 连接kafka集群的地址,逗号分隔多个地址 | |
kafka.consumer.group.id | flume | Kafka Source组件对于Kafka来说是一个消费者的角色,所以需要指定所属的消费者组 |
kafka.topics | 要消费的主题有哪些,逗号分隔 | |
kafka.topics.regex | 使用正则表达式来指定要消费的主题有哪些,该属性会覆盖kafka.topics | |
batchSize | 1000 | Source传输给Channel数据的批次大小 |
useFlumeEventFormat | false | Kafka Source从kafka中读取数据,读取出来的数据是否封装为Flume event的形式;false表示不封装,true表示封装; |
Kafka Source 会自动在Event Header中添加属性:timestamp、topic、partition、key。如果不想使用其自动添加timestamp,那么就需要自己在Header中覆盖。
2. Kafka Channel
Kafka Channel的功能很强大,用法有如下三种:
(1) 当一个普通的缓冲Channel使用;
(2) 只需要有一个Source和一个Kafka Channel就可以直接向kafka写入消息了;
(3) 只需要有一个Kafka Channel和一个Sink就可以直接从kafka中读取数据并写出了;
属性名 | 默认值 | 描述 |
type | 应该被设置为:org.apache.flume.channel.kafka.KafkaChannel | |
kafka.bootstrap.servers | 连接kafka集群的地址,逗号分隔多个地址 | |
kafka.topic | flume-channel | Channel使用的主题 |
kafka.consumer.group.id | flume | 当从Channel中读取数据时,读取数据的Sink相当于是一个消费者,所以需要设置消费者属于的消费者组 |
parseAsFlumeEvent | true | 是否使用Flume Event的形式去解析在Channel中存储的数据;如果在Chaneel中存储的数据不是Flume Event形式的,就将该值设置为false |
defaultPartitionId | 数据进入的分区,不设置表示使用默认分区器决定; 可以指定一个整数,决定消息进入的分区; channel只能使用一个分区作为缓冲,如果想用多个分区,只能配置多个channel; | |
kafka.consumer.auto.offset.reset | latest | offset重置,常用选项为:earliest、latest |
3. Kafka Sink
属性名 | 默认值 | 描述 |
type | 必须为:org.apache.flume.sink.kafka.KafkaSink | |
kafka.bootstrap.servers | 连接kafka集群的地址,逗号分隔多个地址 | |
kafka.topic | default-flume-topic | 要往kafka集群中的那个主题中写入消息; 如果在Flume Event中header部分设置了topic属性, 那么Sink会根据设置的topic自动将数据放入对应主题中 |
flumeBatchSize | 100 | Sink拉取数据的批次大小 |
kafka.producer.acks | 1 | ack 等级:0、1、-1(all) |
useFlumeEventFormat | false | 是否使用Flume Event的形式将消息写入到kafka中 |
defaultPartitionId | 数据进入的分区,不设置表示使用默认分区器决定; 可以指定一个整数,决定消息进入的分区; | |
partitionIdHeader | 指定一个名称,这个名称必须与Flume event中的Header部分属性名称对应,主要用于决定数据发往哪个分区,Sink会从Header部分找这个属性,找到就发往设定值的分区,没找到就随机发。 |
Kafka Sink uses the topic and key properties from the FlumeEvent headers to send events to Kafka. If topic exists in the headers, the event will be sent to that specific topic, overriding the topic configured for the Sink. If key exists in the headers, the key will used by Kafka to partition the data between the topic partitions. Events with same key will be sent to the same partition. If the key is null, events will be sent to random partitions.
1. 需求:
已知某一服务会一直向端口发送网址数据,现在要求收集这些网址,并将其发往至kafka中的websit和other主题中,website和other主题都具有两个分区,将含有baidu的网址存储在0号分区,将含有iqiyi的网址存储在1号分区,其他网址信息都存储在other主题中;
2. 架构图
3. 实现
(1) 引入依赖
- <dependencies>
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-core</artifactId>
- <version>1.9.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>2.4.1</version>
- </dependency>
- </dependencies>
(2) 编写WebsiteInterceptor
- import org.apache.flume.Context;
- import org.apache.flume.Event;
- import org.apache.flume.interceptor.Interceptor;
-
- import java.nio.charset.StandardCharsets;
- import java.util.List;
- import java.util.Map;
- import java.util.regex.Matcher;
- import java.util.regex.Pattern;
-
- public class WebsiteInterceptor implements Interceptor {
- @Override
- public void initialize() {
-
- }
-
- @Override
- public Event intercept(Event event) {
- Map<String, String> headers = event.getHeaders();
- String website = new String(event.getBody(), StandardCharsets.UTF_8);
- Pattern pattern = Pattern.compile("(baidu|iqiyi)");
- Matcher matcher = pattern.matcher(website);
- if (matcher.find()){
- headers.put("topic", "website");
- if (matcher.group().equalsIgnoreCase("baidu")){
- headers.put("key", "0");
- }else {
- headers.put("key", "1");
- }
- }else {
- headers.put("topic", "other");
- }
- return event;
- }
-
- @Override
- public List<Event> intercept(List<Event> list) {
- for (Event event : list) {
- intercept(event);
- }
- return list;
- }
-
- @Override
- public void close() {
-
- }
-
- public static class Builder implements Interceptor.Builder{
-
- @Override
- public Interceptor build() {
- return new WebsiteInterceptor();
- }
-
- @Override
- public void configure(Context context) {
-
- }
- }
- }
(3) 编写配置文件
- # 配置文件命名为:netcat-kafkasink.conf,放置于$FLUME_HOME/conf-files/netcat-kafkasink目录下
-
- # 配置文件内容为:
- # Named
- a1.sources = r1
- a1.channels = c1
- a1.sinks = k1
-
- # Source
- # 对r1的配置
- a1.sources.r1.type = netcat
- a1.sources.r1.bind = hadoop101
- a1.sources.r1.port = 6666
-
-
- # Interceptors
- a1.sources.r1.interceptors = i1
- a1.sources.r1.interceptors.i1.type = com.fig.flume.interceptor.WebsiteInterceptor$Builder
-
- # Channel Selector
- ...
-
- # Channel
- # 对c1的配置
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 10000
- a1.channels.c1.transactionCapacity = 1000
-
-
- # Sink Processor
- ...
-
- # Sink
- # 对k1的配置
- a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
- a1.sinks.k1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
- a1.sinks.k1.flumeBatchSize = 1000
- a1.sinks.k1.kafka.producer.acks = -1
- a1.sinks.k1.useFlumeEventFormat = false
-
-
- # Bind
- a1.sources.r1.channels = c1
- a1.sinks.k1.channel = c1
(4) 启动
- # 开启三个消费者消费数据
- kafka-ops.sh consumer --topic website --group tp --from-beginning
- kafka-ops.sh consumer --topic website --group tp --from-beginning
- kafka-ops.sh consumer --topic other --group tp --from-beginning
-
- # 先启动agent
- flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/conf-files/netcat-kafkasink/netcat-kafkasink.conf -n a1 -Dflume.root.logger=INFO,console
-
- # 模拟向端口发送数据
- nc hadoop101 6666
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。