赞
踩
kafkaSource,kafkaSink,kafkaChannel组件描述
1) KafkaSource 用于从kafka中读取数据. KafkaSource对于flume来讲是一个source的角色. 对于Kafka来讲,是一个消费者的角色. 2) KafkaSink 用于往Kafka中写数据 KafkaSink对于flume来讲是一个sink的角色,对于kafka来讲,是一个生产者的角色. 3) KafkaChannel(kafka channel很强大,可以只是用一个source或者sink) ① 作为一个基本的channel来使用. xxxSource -> KafkaChannel -> xxxSink ② 支持往kafka中写入数据 xxxSource -> KafkaChannel ③ 支持从Kafka中读取数据 kafkaChannel -> xxxSink
flume作为生产者往kafka中传数据。
结构图
编写netcat-flume-kafka.conf
需求:往flume监听的端口中发送数据,flume监听到后把数据传输到kafka中
#Named a1.sources = r1 a1.channels = c1 a1.sinks = k1 #Source a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 6666 #Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sinks.k1.kafka.topic = pihao a1.sinks.k1.kafka.flumeBatchSize = 100 a1.sinks.k1.useFlumeEventFormat = true a1.sinks.k1.kafka.producer.acks = -1 #Bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
执行flume命令
# 启动flume
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/kafka/netcat-flume-kafka.conf -n a1 -Dflume.root.logger=INFO,console
# 再开一个kafka的消费者消费数据
kafka-console-consumer --bootstrap-server hadoop102:9092 --topic pihao
测试数据
启动好kafka集群,并且启动了一个kafka consumer来消费flume写进来的数据
[atguigu@hadoop102 ~]$ nc localhost 6666
OK
hello
OK
pihao
OK
衍生架构
kafka sink的多topic支持
架构优化:
这个kafka sink有一个特点,就是它里面的event的header中,如果包含了topic这个字段,那么它就会把消息发送对应的topic主题,基于这个特定,我们的架构就更简单了
案例实操:
包含bigdata的数据发往bigdata主题,包含java的数据发往java主题,其他的数据发送other主题
//java package com.pihao.flume.interceptor; import com.nimbusds.jose.util.StandardCharset; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.util.List; import java.util.Map; /** * 自定义flume拦截器 */ public class DataValueInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { // 获取header Map<String, String> headers = event.getHeaders(); // 获取body String body = new String(event.getBody(), StandardCharset.UTF_8); //判断 if(body.contains("bigdata")){ headers.put("topic","bigdata"); }else if(body.contains("java")){ headers.put("topic","java"); } return event; } @Override public List<Event> intercept(List<Event> events) { for (Event event : events) { intercept(event); } return events; } @Override public void close() { } /** * builder内部类,用来实例化上面的interceptor类 */ public static class MyBuilder implements Builder{ @Override public Interceptor build() { return new DataValueInterceptor(); } @Override public void configure(Context context) { } } }
编写netcat-flume-kafkatopic.conf文件
#Named a1.sources = r1 a1.channels = c1 a1.sinks = k1 #Source a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 6666 #Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 #Interceptor a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.pihao.flume.interceptor.DataValueInterceptor$MyBuilder #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sinks.k1.kafka.topic = other a1.sinks.k1.kafka.flumeBatchSize = 100 a1.sinks.k1.useFlumeEventFormat = true a1.sinks.k1.kafka.producer.acks = -1 #Bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
分别开启三个主题的消费者bigdata,java,other
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic bigdata
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic java
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic other
执行flume命令
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/kafka/netcat-flume-kafkatopic.conf -n a1 -Dflume.root.logger=INFO,console
测试发送数据
[atguigu@hadoop102 kafka]$ nc localhost 6666
hellobigdata
OK
java
OK
other
OK
测试成功!
flume作为消费者消费kafka的数据,使用的是kafka source
架构图
编写kafka-flume-logger.conf
#Named a1.sources = r1 a1.channels = c1 a1.sinks = k1 #Source a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092 a1.sources.r1.kafka.topics = pihao a1.sources.r1.kafka.consumer.group.id = flume a1.sources.r1.batchSize = 100 a1.sources.r1.useFlumeEventFormat = false #Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 #Sink a1.sinks.k1.type = logger #Bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
执行flume命令
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/kafka/kafka-flume-logger.conf -n a1 -Dflume.root.logger=INFO,console
测试
启动一个kafka生产者,向主题发送数据,查看flume的控制台输出
[atguigu@hadoop102 kafka]$ kafka-console-producer.sh --broker-list hadoop102:9092 --topic pihao
>hello
>pihao
>
{ headers:{topic=test, partition=0, offset=0, timestamp=1634223626455}
注意:kafka source去kafka中获取数据后会给你的数据多封住一些喜欢信息。topic,partition,offset.time
kafka channel接xxxsink
kafka channel接logger sink,没有source
#kafkachannel-flume-logger.conf #Named a1.channels = c1 a1.sinks = k1 #Source #Channel a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.channels.c1.kafka.topic = pihao a1.channels.c1.kafka.consumer.group.id = flume a1.channels.c1.kafka.consumer.auto.offset.reset = latest a1.channels.c1.parseAsFlumeEvent = false #Sink a1.sinks.k1.type = logger #Bind a1.sinks.k1.channel = c1 运行: flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/kafka/kafkachannel-flume-logger.conf -n a1 -Dflume.root.logger=INFO,console 再启动一个生产者,然后发送数据,查看flume的控制台输出。测试成功!
xxx source接kafka channel
netcat source 接 kafka channel,没有sink
#netcat-flume-kafkachannel.conf #Named a1.sources = r1 a1.channels = c1 #Source a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 6666 #Channel a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.channels.c1.kafka.topic = pihao a1.channels.c1.parseAsFlumeEvent = false #Sink #Bind a1.sources.r1.channels = c1 运行: flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/kafka/netcat-flume-kafkachannel.conf -n a1 -Dflume.root.logger=INFO,console 再启动一个kafka消费者去消费 nc locaohost 6666 发送数据 测试ok!
我这使用的版本是kafka-eagle-bin-1.4.5.tar.gz
安装步骤
# 1)修改kafka启动命令 修改kafka-server-start.sh命令中 if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" fi 为 if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70" export JMX_PORT="9999" #export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" fi 注意:修改之后在启动Kafka之前要分发之其他节点 #2)上传压缩包kafka-eagle-bin-1.4.5.tar.gz到集群/opt/software目录 #3)解压到本地 [atguigu@hadoop102 software]$ tar -zxvf kafka-eagle-bin-1.4.5.tar.gz #4)进入刚才解压的目录 [atguigu@hadoop102 kafka-eagle-bin-1.4.5]$ ll 总用量 82932 -rw-rw-r--. 1 atguigu atguigu 84920710 8月 13 23:00 kafka-eagle-web-1.4.5-bin.tar.gz #5)将kafka-eagle-web-1.3.7-bin.tar.gz解压至/opt/module [atguigu@hadoop102 kafka-eagle-bin-1.4.5]$ tar -zxvf kafka-eagle-web-1.4.5-bin.tar.gz -C /opt/module/ #6)修改名称 [atguigu@hadoop102 module]$ mv kafka-eagle-web-1.4.5/ eagle #7)给启动文件执行权限 [atguigu@hadoop102 eagle]$ cd bin/ [atguigu@hadoop102 bin]$ ll 总用量 12 -rw-r--r--. 1 atguigu atguigu 1848 8月 22 2017 ke.bat -rw-r--r--. 1 atguigu atguigu 7190 7月 30 20:12 ke.sh [atguigu@hadoop102 bin]$ chmod 777 ke.sh #8)修改配置文件 conf/system-config.properties
# 需要修改的system-config.properties ###################################### # multi zookeeper&kafka cluster list ###################################### kafka.eagle.zk.cluster.alias=cluster1 cluster1.zk.list=hadoop102:2181,hadoop103:2181,hadoop104:2181 ###################################### # kafka offset storage ###################################### cluster1.kafka.eagle.offset.storage=kafka ###################################### # enable kafka metrics ###################################### kafka.eagle.metrics.charts=true kafka.eagle.sql.fix.error=false ###################################### # kafka jdbc driver address ###################################### kafka.eagle.driver=com.mysql.jdbc.Driver kafka.eagle.url=jdbc:mysql://hadoop102:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull kafka.eagle.username=root kafka.eagle.password=123456
9)添加环境变量
export KE_HOME=/opt/module/eagle
export PATH=$PATH:$KE_HOME/bin
10)启动
[atguigu@hadoop102 eagle]$ bin/ke.sh start
... ...
... ...
*******************************************************************
* Kafka Eagle Service has started success.
* Welcome, Now you can visit 'http://192.168.202.102:8048/ke'
* Account:admin ,Password:123456
*******************************************************************
* <Usage> ke.sh [start|status|stop|restart|stats] </Usage>
* <Usage> https://www.kafka-eagle.org/ </Usage>
*******************************************************************
[atguigu@hadoop102 eagle]$
注意:启动之前需要先启动ZK以及KAFKA
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。