当前位置:   article > 正文

2022-08-10 Flume和Kafka对接_背景:flume将bdp端kafka数据传到udp端kafka存在延迟,其中,flume采用kafk

背景:flume将bdp端kafka数据传到udp端kafka存在延迟,其中,flume采用kafka source

一、对接相关的Flume组件

1. Kafka Source

属性名默认值描述
channels
type应该设置为:org.apache.flume.source.kafka.KafkaSource
kafka.bootstrap.servers连接kafka集群的地址,逗号分隔多个地址
kafka.consumer.group.idflumeKafka Source组件对于Kafka来说是一个消费者的角色,所以需要指定所属的消费者组
kafka.topics要消费的主题有哪些,逗号分隔
kafka.topics.regex使用正则表达式来指定要消费的主题有哪些,该属性会覆盖kafka.topics
batchSize1000Source传输给Channel数据的批次大小
useFlumeEventFormatfalseKafka Source从kafka中读取数据,读取出来的数据是否封装为Flume event的形式;false表示不封装,true表示封装;

Kafka Source 会自动在Event Header中添加属性:timestamp、topic、partition、key。如果不想使用其自动添加timestamp,那么就需要自己在Header中覆盖。

2. Kafka Channel

Kafka Channel的功能很强大,用法有如下三种:

        (1) 当一个普通的缓冲Channel使用;

        (2) 只需要有一个Source和一个Kafka Channel就可以直接向kafka写入消息了;

        (3) 只需要有一个Kafka Channel和一个Sink就可以直接从kafka中读取数据并写出了;

属性名默认值描述
type应该被设置为:org.apache.flume.channel.kafka.KafkaChannel
kafka.bootstrap.servers连接kafka集群的地址,逗号分隔多个地址
kafka.topicflume-channelChannel使用的主题
kafka.consumer.group.idflume当从Channel中读取数据时,读取数据的Sink相当于是一个消费者,所以需要设置消费者属于的消费者组
parseAsFlumeEventtrue是否使用Flume Event的形式去解析在Channel中存储的数据;如果在Chaneel中存储的数据不是Flume Event形式的,就将该值设置为false
defaultPartitionId

数据进入的分区,不设置表示使用默认分区器决定;

可以指定一个整数,决定消息进入的分区;

channel只能使用一个分区作为缓冲,如果想用多个分区,只能配置多个channel;

kafka.consumer.auto.offset.resetlatestoffset重置,常用选项为:earliest、latest

3. Kafka Sink

属性名默认值描述
type必须为:org.apache.flume.sink.kafka.KafkaSink
kafka.bootstrap.servers连接kafka集群的地址,逗号分隔多个地址
kafka.topicdefault-flume-topic

要往kafka集群中的那个主题中写入消息;

如果在Flume Event中header部分设置了topic属性,

那么Sink会根据设置的topic自动将数据放入对应主题中

flumeBatchSize100Sink拉取数据的批次大小
kafka.producer.acks1ack 等级:0、1、-1(all)
useFlumeEventFormatfalse是否使用Flume Event的形式将消息写入到kafka中
defaultPartitionId

数据进入的分区,不设置表示使用默认分区器决定;

可以指定一个整数,决定消息进入的分区;

partitionIdHeader指定一个名称,这个名称必须与Flume event中的Header部分属性名称对应,主要用于决定数据发往哪个分区,Sink会从Header部分找这个属性,找到就发往设定值的分区,没找到就随机发。

Kafka Sink uses the topic and key properties from the FlumeEvent headers to send events to Kafka. If topic exists in the headers, the event will be sent to that specific topic, overriding the topic configured for the Sink. If key exists in the headers, the key will used by Kafka to partition the data between the topic partitions. Events with same key will be sent to the same partition. If the key is null, events will be sent to random partitions.

二、案例

1. 需求:

已知某一服务会一直向端口发送网址数据,现在要求收集这些网址,并将其发往至kafka中的websitother主题中,websiteother主题都具有两个分区,将含有baidu的网址存储在0号分区,将含有iqiyi的网址存储在1号分区,其他网址信息都存储在other主题中;

2. 架构图

3. 实现

(1) 引入依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.flume</groupId>
  4. <artifactId>flume-ng-core</artifactId>
  5. <version>1.9.0</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.kafka</groupId>
  9. <artifactId>kafka-clients</artifactId>
  10. <version>2.4.1</version>
  11. </dependency>
  12. </dependencies>

(2) 编写WebsiteInterceptor

  1. import org.apache.flume.Context;
  2. import org.apache.flume.Event;
  3. import org.apache.flume.interceptor.Interceptor;
  4. import java.nio.charset.StandardCharsets;
  5. import java.util.List;
  6. import java.util.Map;
  7. import java.util.regex.Matcher;
  8. import java.util.regex.Pattern;
  9. public class WebsiteInterceptor implements Interceptor {
  10. @Override
  11. public void initialize() {
  12. }
  13. @Override
  14. public Event intercept(Event event) {
  15. Map<String, String> headers = event.getHeaders();
  16. String website = new String(event.getBody(), StandardCharsets.UTF_8);
  17. Pattern pattern = Pattern.compile("(baidu|iqiyi)");
  18. Matcher matcher = pattern.matcher(website);
  19. if (matcher.find()){
  20. headers.put("topic", "website");
  21. if (matcher.group().equalsIgnoreCase("baidu")){
  22. headers.put("key", "0");
  23. }else {
  24. headers.put("key", "1");
  25. }
  26. }else {
  27. headers.put("topic", "other");
  28. }
  29. return event;
  30. }
  31. @Override
  32. public List<Event> intercept(List<Event> list) {
  33. for (Event event : list) {
  34. intercept(event);
  35. }
  36. return list;
  37. }
  38. @Override
  39. public void close() {
  40. }
  41. public static class Builder implements Interceptor.Builder{
  42. @Override
  43. public Interceptor build() {
  44. return new WebsiteInterceptor();
  45. }
  46. @Override
  47. public void configure(Context context) {
  48. }
  49. }
  50. }

(3) 编写配置文件

  1. # 配置文件命名为:netcat-kafkasink.conf,放置于$FLUME_HOME/conf-files/netcat-kafkasink目录下
  2. # 配置文件内容为:
  3. # Named
  4. a1.sources = r1
  5. a1.channels = c1
  6. a1.sinks = k1
  7. # Source
  8. # 对r1的配置
  9. a1.sources.r1.type = netcat
  10. a1.sources.r1.bind = hadoop101
  11. a1.sources.r1.port = 6666
  12. # Interceptors
  13. a1.sources.r1.interceptors = i1
  14. a1.sources.r1.interceptors.i1.type = com.fig.flume.interceptor.WebsiteInterceptor$Builder
  15. # Channel Selector
  16. ...
  17. # Channel
  18. # 对c1的配置
  19. a1.channels.c1.type = memory
  20. a1.channels.c1.capacity = 10000
  21. a1.channels.c1.transactionCapacity = 1000
  22. # Sink Processor
  23. ...
  24. # Sink
  25. # 对k1的配置
  26. a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
  27. a1.sinks.k1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
  28. a1.sinks.k1.flumeBatchSize = 1000
  29. a1.sinks.k1.kafka.producer.acks = -1
  30. a1.sinks.k1.useFlumeEventFormat = false
  31. # Bind
  32. a1.sources.r1.channels = c1
  33. a1.sinks.k1.channel = c1

(4) 启动 

  1. # 开启三个消费者消费数据
  2. kafka-ops.sh consumer --topic website --group tp --from-beginning
  3. kafka-ops.sh consumer --topic website --group tp --from-beginning
  4. kafka-ops.sh consumer --topic other --group tp --from-beginning
  5. # 先启动agent
  6. flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/conf-files/netcat-kafkasink/netcat-kafkasink.conf -n a1 -Dflume.root.logger=INFO,console
  7. # 模拟向端口发送数据
  8. nc hadoop101 6666

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号