赞
踩
保证安装Flume的Linux服务器的环境变量中有JAVA_HOME
将apache-flume-1.7.0-bin.tar.gz上传到linux的/opt/soft目录下,解压apache-flume-1.7.0-bin.tar.gz到/opt/module/目录下
[wyr@hadoop102 software]$ tar -zxf apache-flume-1.7.0-bin.tar.gz -C /opt/module/
修改apache-flume-1.7.0-bin的名称为flume
[wyr@hadoop102 module]$ ll
total 20
drwxrwxr-x. 7 wyr wyr 4096 Jan 31 12:14 apache-flume-1.7.0-bin
drwxr-xr-x. 11 wyr wyr 4096 Jan 31 10:43 hadoop-2.7.2
drwxrwxr-x. 9 wyr wyr 4096 Jan 30 19:27 hive
drwxr-xr-x. 8 wyr wyr 4096 Dec 13 2016 jdk1.8.0_121
drwxr-xr-x. 11 wyr wyr 4096 Jan 29 22:01 zookeeper-3.4.10
[wyr@hadoop102 module]$ mv apache-flume-1.7.0-bin/ flume
[wyr@hadoop102 module]$ ll
drwxrwxr-x. 7 wyr wyr 4096 Jan 31 12:14 flume
drwxr-xr-x. 11 wyr wyr 4096 Jan 31 10:43 hadoop-2.7.2
drwxrwxr-x. 9 wyr wyr 4096 Jan 30 19:27 hive
drwxr-xr-x. 8 wyr wyr 4096 Dec 13 2016 jdk1.8.0_121
drwxr-xr-x. 11 wyr wyr 4096 Jan 29 22:01 zookeeper-3.4.10
[wyr@hadoop102 module]$
将flume配置到系统环境变量中
[wyr@hadoop102 module]$ cd flume
[wyr@hadoop102 flume]$ pwd
/opt/module/flume
[wyr@hadoop102 flume]$ vim /etc/profile
[wyr@hadoop102 flume]$
JAVA_HOME=/opt/module/jdk1.8.0_121
HADOOP_HOME=/opt/module/hadoop-2.7.2
HIVE_HOME=/opt/module/hive
FLUME_HOME=/opt/module/flume
PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$HIVE_HOME/bin:$FLUME_HOME/bin
export JAVA_HOME HADOOP_HOME HIVE_HOME FLUME_HOME PATH
[wyr@hadoop102 flume]$ source /etc/profile
[wyr@hadoop102 flume]$
[wyr@hadoop102 module]$ xsync.sh flume
[wyr@hadoop102 etc]$ xsync.sh profile
Flume是Java写的系统,所以agent的配置文件的本质是一个Properties文件!格式为 属性名=属性值
Flume根据数据源类型和目的存储类型选择合适的source、sink,编写对应的配置文件即可。
在配置文件中需要编写:
在/opt/module/flume目录下新创建一个文件夹myagests来存放各种不同配置的Agent
[wyr@hadoop102 flume]$ mkdir myagents [wyr@hadoop102 flume]$ ll total 152 drwxr-xr-x. 2 wyr wyr 4096 Jan 31 12:14 bin -rw-r--r--. 1 wyr wyr 77387 Oct 11 2016 CHANGELOG drwxr-xr-x. 2 wyr wyr 4096 Jan 31 12:14 conf -rw-r--r--. 1 wyr wyr 6172 Sep 26 2016 DEVNOTES -rw-r--r--. 1 wyr wyr 2873 Sep 26 2016 doap_Flume.rdf drwxr-xr-x. 10 wyr wyr 4096 Oct 13 2016 docs drwxrwxr-x. 2 wyr wyr 4096 Jan 31 12:14 lib -rw-r--r--. 1 wyr wyr 27625 Oct 13 2016 LICENSE drwxrwxr-x. 2 wyr wyr 4096 Jan 31 12:58 myagents -rw-r--r--. 1 wyr wyr 249 Sep 26 2016 NOTICE -rw-r--r--. 1 wyr wyr 2520 Sep 26 2016 README.md -rw-r--r--. 1 wyr wyr 1585 Oct 11 2016 RELEASE-NOTES drwxrwxr-x. 2 wyr wyr 4096 Jan 31 12:14 tools [wyr@hadoop102 flume]$
创建Agent配置文件
[wyr@hadoop102 flume]$ cd myagents/
[wyr@hadoop102 myagents]$ ll
total 0
[wyr@hadoop102 myagents]$ touch netcatsource-loggersink.conf
[wyr@hadoop102 myagents]$ touch execsource-hdfssink.conf
[wyr@hadoop102 myagents]$ ll
total 0
-rw-rw-r--. 1 wyr wyr 0 Jan 31 13:01 execsource-hdfssink.conf
-rw-rw-r--. 1 wyr wyr 0 Jan 31 13:00 netcatsource-loggersink.conf
[wyr@hadoop102 myagents]$
#一、定义Agent中各个组件的名称:a1是agent的名称,a1中定义了一个叫r1的source,如果有多个,使用空格间隔 # r1:表示a1的Source的名称 a1.sources = r1 # k1:表示a1的Sink的名称 a1.sinks = k1 # c1:表示a1的Channel的名称 a1.channels = c1 #二、配置source:组名名.属性名=属性值 # 表示a1的输入源类型为netcat端口类型 a1.sources.r1.type=netcat # 表示a1的监听的主机 a1.sources.r1.bind=hadoop102 # 表示a1的监听的端口号 a1.sources.r1.port=44444 #三、配置chanel:Use a channel which buffers events in memory # 表示a1的channel类型是memory内存型 a1.channels.c1.type=memory # 表示a1的channel总容量10000个event a1.channels.c1.capacity=1000 # 表示a1的channel传输时收集到了1000条event以后再去提交事务 #a1.channels.c1.transactionCapacity = 1000 #四、配置sink # 表示a1的输出目的地是控制台logger类型 a1.sinks.k1.type=logger # 规定每行最大接收字节数 a1.sinks.k1.maxBytesToLog=100 #五、连接组件:同一个source可以对接多个channel,一个sink只能从一个channel拿数据! # 表示将r1和c1连接起来 a1.sources.r1.channels=c1 # 表示将k1和c1连接起来 a1.sinks.k1.channel=c1
flume-ng agent -c 其他配置文件所在的目录 -n agent的名称 -f agent配置文件 -Dproperty=value -Dflume.root.logger=info,console
在命令行中加上 -Dflume.root.logger=info,consloe 可以临时指定sink的输出级别、输出位置
[wyr@hadoop102 flume]$ flume-ng agent -c conf/ -n a1 -f myagents/netcatsource-loggersink.conf -Dflume.root.logger=info,console
开启Agent01之后,就开始监控hadoop102节点的44444端口收到的信息
在hadoop103机器上给hadoop102节点的44444端口发送信息:
[wyr@hadoop103 ~]$ nc hadoop102 44444
hello wyr
OK
how are you
OK
fine, thanks
OK
abcdefghijklmnopqrstuvwxyz
OK
hadoop102节点上就会收到:
#一、定义Agent中各个组件的名称:a2是agent的名称,a2中定义了一个叫r1的source,如果有多个,使用空格间隔 a2.sources = r1 a2.sinks = k1 a2.channels = c1 #二、配置source:组名名.属性名=属性值 a2.sources.r1.type=exec a2.sources.r1.command=tail -f /opt/module/hive/logs/hive.log #三、配置chanel:Use a channel which buffers events in memory a2.channels.c1.type=memory a2.channels.c1.capacity=1000 #四、配置sink a2.sinks.k1.type = hdfs #一旦路径中含有基于时间的转义序列,要求event的header中必须有timestamp=时间戳,如果event的header中必须有timestamp=时间戳,需要设置useLocalTimeStamp = true a2.sinks.k1.hdfs.path = hdfs://hadoop103:9000/flume/%Y%m%d/%H/%M #是否使用本地(Agent运行所在的机器)时间戳 a2.sinks.k1.hdfs.useLocalTimeStamp = true #上传后的文件统一添加的前缀 a2.sinks.k1.hdfs.filePrefix = wyr-exec-logs- #积攒多少个Event才flush到HDFS一次 a2.sinks.k1.hdfs.batchSize = 100 #以下三个和目录的滚动相关,目录一旦设置了时间转义序列,基于时间戳滚动 #是否将时间戳向下舍 a2.sinks.k1.hdfs.round = true #多少时间单位创建一个新的文件夹 a2.sinks.k1.hdfs.roundValue = 1 #重新定义时间单位 a2.sinks.k1.hdfs.roundUnit = minute #以下三个和文件的滚动相关,以下三个参数是或的关系!以下三个参数如果值为0都代表禁用! # 每10秒滚动生成一个新的文件 a2.sinks.k1.hdfs.rollInterval = 10 #设置每个文件到128M时滚动 a2.sinks.k1.hdfs.rollSize = 134217700 #每写多少个event滚动一次 a2.sinks.k1.hdfs.rollCount = 0 #五、连接组件:同一个source可以对接多个channel,一个sink只能从一个channel拿数据! a2.sources.r1.channels=c1 a2.sinks.k1.channel=c1
flume-ng agent -c 其他配置文件所在的目录 -n agent的名称 -f agent配置文件 -Dproperty=value -Dflume.root.logger=info,console
[wyr@hadoop102 flume]$ flume-ng agent -c conf/ -n a2 -f myagents/execsource-hdfssink.conf -Dflume.root.logger=info,console
#一、定义Agent中各个组件的名称:a3是agent的名称,a3中定义了一个叫r1的source,如果有多个,使用空格间隔 a3.sources = r1 a3.sinks = k1 a3.channels = c1 #二、配置source:组名名.属性名=属性值 a3.sources.r1.type=spooldir a3.sources.r1.spoolDir=/home/wyr/flumedir/spooldir #三、配置chanel:Use a channel which buffers events in memory a3.channels.c1.type=memory a3.channels.c1.capacity=1000 #四、配置sink a3.sinks.k1.type = hdfs #一旦路径中含有基于时间的转义序列,要求event的header中必须有timestamp=时间戳,如果event的header中必须有timestamp=时间戳,需要设置useLocalTimeStamp = true a3.sinks.k1.hdfs.path = hdfs://hadoop103:9000/flume/%Y%m%d/%H/%M #是否使用本地(Agent运行所在的机器)时间戳 a3.sinks.k1.hdfs.useLocalTimeStamp = true #上传后的文件统一添加的前缀 a3.sinks.k1.hdfs.filePrefix = wyr-spooldir-logs- #积攒多少个Event才flush到HDFS一次 a3.sinks.k1.hdfs.batchSize = 100 #以不压缩的文本形式保存数据 a3.sinks.k1.hdfs.fileType=DataStream #以下三个和目录的滚动相关,目录一旦设置了时间转义序列,基于时间戳滚动 #是否将时间戳向下舍 a3.sinks.k1.hdfs.round = true #多少时间单位创建一个新的文件夹 a3.sinks.k1.hdfs.roundValue = 1 #重新定义时间单位 a3.sinks.k1.hdfs.roundUnit = minute #以下三个和文件的滚动相关,以下三个参数是或的关系!以下三个参数如果值为0都代表禁用! # 每30秒滚动生成一个新的文件 a3.sinks.k1.hdfs.rollInterval = 30 #设置每个文件到128M时滚动 a3.sinks.k1.hdfs.rollSize = 134217700 #每写多少个event滚动一次 a3.sinks.k1.hdfs.rollCount = 0 #五、连接组件:同一个source可以对接多个channel,一个sink只能从一个channel拿数据! a3.sources.r1.channels=c1 a3.sinks.k1.channel=c1
flume-ng agent -c 其他配置文件所在的目录 -n agent的名称 -f agent配置文件 -Dproperty=value -Dflume.root.logger=info,console
[wyr@hadoop102 flume]$ flume-ng agent -n a3 -c conf/ -f myagents/spoolingdirsource-hdfssink.conf -Dflume.root.logger=info,console
flume ng 1.7版本后提供!
常见问题: TailDirSource采集的文件,不能随意重命名!如果日志在正在写入时,名称为 xxxx.tmp,写入完成后,滚动,改名为xxx.log,此时一旦匹配规则可以匹配上述名称,就会发生数据的重复采集!
Taildir Source 可以读取多个文件最新追加写入的内容!
Taildir Source是可靠的,即使flume出现了故障或挂掉。Taildir Source在工作时,会将读取文件的最后的位置记录在一个json文件中,一旦agent重启,会从之前已经记录的位置,继续执行tail操作!
Json文件中,位置是可以修改,修改后,Taildir Source会从修改的位置进行tail操作!如果JSON文件丢失了,此时会重新从每个文件的第一行,重新读取,这会造成数据的重复!
Taildir Source目前只能读文本文件!
必需配置:
#一、定义Agent中各个组件的名称:a4是agent的名称,a4中定义了一个叫r1的source,如果有多个,使用空格间隔 a4.sources = r1 a4.sinks = k1 a4.channels = c1 #二、配置source:组名名.属性名=属性值 a4.sources.r1.type=taildir a4.sources.r1.filegroups=f1 f2 a4.sources.r1.filegroups.f1=/home/wyr/flumedir/taildir/test01 a4.sources.r1.filegroups.f2=/home/wyr/flumedir/taildir/test02 #三、配置chanel:Use a channel which buffers events in memory a4.channels.c1.type=memory a4.channels.c1.capacity=1000 #四、配置sink a4.sinks.k1.type=logger a4.sinks.k1.maxBytesToLog=100 #五、连接组件:同一个source可以对接多个channel,一个sink只能从一个channel拿数据! a4.sources.r1.channels=c1 a4.sinks.k1.channel=c1
flume-ng agent -c 其他配置文件所在的目录 -n agent的名称 -f agent配置文件 -Dproperty=value -Dflume.root.logger=info,console
flume-ng agent -n a4 -c conf/ -f myagents/taildirsource-loggersink.conf -Dflume.root.logger=info,console
在/home/wyr/flumedir/taildir目录新建test01文件,并输入文字
在/home/wyr/flumedir/taildir目录新建test02文件
在test02中输入第一行文字:this is test02 file 保存
#一、定义Agent中各个组件的名称:a5是agent的名称,a5中定义了一个叫r1的source,如果有多个,使用空格间隔 # r1:表示a5的Source的名称 a5.sources = r1 # k1:表示a5的Sink的名称 a5.sinks = k1 # c1:表示a5的Channel的名称 a5.channels = c1 #二、配置source:组名名.属性名=属性值 # 表示a5的输入源类型为netcat端口类型 a5.sources.r1.type=netcat # 表示a5的监听的主机 a5.sources.r1.bind=hadoop103 # 表示a5的监听的端口号 a5.sources.r1.port=44444 #三、配置chanel:Use a channel which buffers events in memory # 表示a5的channel类型是memory内存型 a5.channels.c1.type=memory # 表示a5的channel总容量10000个event a5.channels.c1.capacity=1000 # 表示a5的channel传输时收集到了1000条event以后再去提交事务 #a5.channels.c1.transactionCapacity = 1000 #四、配置sink # 表示a5将event序列化之后传给另一个sink a5.sinks.k1.type=avro # 规定sink数据接收主机 a5.sinks.k1.hostname=hadoop102 # 规定sink数据接收端口 a5.sinks.k1.port=33333 #五、连接组件:同一个source可以对接多个channel,一个sink只能从一个channel拿数据! # 表示将r1和c1连接起来 a5.sources.r1.channels=c1 # 表示将k1和c1连接起来 a5.sinks.k1.channel=c1
#一、定义Agent中各个组件的名称:a6是agent的名称,a6中定义了一个叫r1的source,如果有多个,使用空格间隔 # r1:表示a6的Source的名称 a6.sources = r1 # k1:表示a6的Sink的名称 a6.sinks = k1 # c1:表示a6的Channel的名称 a6.channels = c1 #二、配置source:组名名.属性名=属性值 # 表示a6的输入源类型为avro序列化event a6.sources.r1.type=avro # 表示a6的监听的主机 a6.sources.r1.bind=hadoop102 # 表示a6的监听的端口号 a6.sources.r1.port=33333 #三、配置chanel:Use a channel which buffers events in memory # 表示a6的channel类型是memory内存型 a6.channels.c1.type=memory # 表示a6的channel总容量10000个event a6.channels.c1.capacity=1000 # 表示a6的channel传输时收集到了1000条event以后再去提交事务 #a6.channels.c1.transactionCapacity = 1000 #四、配置sink # 表示a6的输出目的地是控制台logger类型 a6.sinks.k1.type=logger #五、连接组件:同一个source可以对接多个channel,一个sink只能从一个channel拿数据! # 表示将r1和c1连接起来 a6.sources.r1.channels=c1 # 表示将k1和c1连接起来 a6.sinks.k1.channel=c1
flume-ng agent -c 其他配置文件所在的目录 -n agent的名称 -f agent配置文件 -Dproperty=value -Dflume.root.logger=info,console
flume-ng agent -n a6 -c conf/ -f myagents/avrosource-loggersink.conf -Dflume.root.logger=info,console
flume-ng agent -n a5 -c conf/ -f myagents/netcatsource-avrosink.conf -Dflume.root.logger=info,console
在hadoop01服务器上输入:
[wyr@hadoop103 ~]$ nc hadoop103 44444
hello0000
OK
hello1111
OK
heoo^H^Hu^H^[[D
OK
hello3333
OK
在hadoop02服务器上收到:
2021-01-31 22:19:10,003 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 30 30 30 30 hello0000 }
2021-01-31 22:19:14,005 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 31 31 31 31 hello1111 }
2021-01-31 22:19:36,011 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6F 6F 08 08 75 08 1B 5B 44 heoo..u..[D }
2021-01-31 22:19:36,085 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 33 33 33 33 hello3333 }
#一、定义Agent中各个组件的名称:agent01是agent的名称, #agent01中定义了一个叫r1的source agent01.sources = r1 #agent01定义了2个sink,如果有多个,使用空格间隔; agent01.sinks = k1 k2 #agent01定义了2个channel,如果有多个,使用空格间隔; agent01.channels = c1 c2 #二、配置source:组名名.属性名=属性值 agent01.sources.r1.type=exec agent01.sources.r1.command=tail -f /home/wyr/flumedir/exectest.txt #三、配置r1的channel选择器 agent01.sources.r1.selector.type = replicating #四、配置chanel agent01.channels.c1.type=memory agent01.channels.c1.capacity=1000 agent01.channels.c2.type=memory agent01.channels.c2.capacity=1000 #五、配置sink agent01.sinks.k1.type=avro agent01.sinks.k1.hostname=hadoop102 agent01.sinks.k1.port=22222 agent01.sinks.k2.type=avro agent01.sinks.k2.hostname=hadoop104 agent01.sinks.k2.port=33333 #六、连接组件:同一个source可以对接多个channel,一个sink只能从一个channel拿数据! agent01.sources.r1.channels=c1 c2 agent01.sinks.k1.channel=c1 agent01.sinks.k2.channel=c2
#一、定义Agent中各个组件的名称:agent02是agent的名称 #agent02中定义了一个叫r1的source,如果有多个,使用空格间隔 agent02.sources = r1 # k1:表示agent02的Sink的名称 agent02.sinks = k1 # c1:表示agent02的Channel的名称 agent02.channels = c1 #二、配置source:组名名.属性名=属性值 agent02.sources.r1.type=avro agent02.sources.r1.bind=hadoop102 agent02.sources.r1.port=22222 #三、配置chanel agent02.channels.c1.type=memory agent02.channels.c1.capacity=1000 #四、配置sink agent02.sinks.k1.type=logger #五、连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据! agent02.sources.r1.channels=c1 agent02.sinks.k1.channel=c1
#一、定义Agent中各个组件的名称:agent03是agent的名称 #agent03中定义了一个叫r1的source,如果有多个,使用空格间隔 agent03.sources = r1 # k1:表示agent03的Sink的名称 agent03.sinks = k1 # c1:表示agent03的Channel的名称 agent03.channels = c1 #二、配置source:组名名.属性名=属性值 agent03.sources.r1.type=avro agent03.sources.r1.bind=hadoop104 agent03.sources.r1.port=33333 #三、配置chanel agent03.channels.c1.type=memory agent03.channels.c1.capacity=1000 #四、配置sink agent03.sinks.k1.type=file_roll agent03.sinks.k1.sink.directory=/home/wyr/flumedir/agent3 #五、连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据! agent03.sources.r1.channels=c1 agent03.sinks.k1.channel=c1
先在hadoop104上启动Agent03
flume-ng agent -c 其他配置文件所在的目录 -n agent的名称 -f agent配置文件 -Dproperty=value -Dflume.root.logger=info,console
[wyr@hadoop104 flume]$ flume-ng agent -n agent03 -c conf/ -f myagents/agent03-avrosource-filerollsink-replicating.conf -Dflume.root.logger=info,console
在hadoop102上启动Agent02
[wyr@hadoop102 flume]$ flume-ng agent -n agent02 -c conf/ -f myagents/agent02-avrosource-loggersink-replicating.conf -Dflume.root.logger=info,console
最后在hadoop102上启动Agent01
[wyr@hadoop103 flume]$ flume-ng agent -n agent01 -c conf/ -f myagents/agent01-execsource-avrosink-replicating.conf -Dflume.root.logger=info,console
测试 Agent:在hadoop01服务器/home/wyr/flumedir/execdir目录下创建文件test01
Multiplexing Channel Selector根据evnet header中属性,参考用户自己配置的映射信息,将event发送到指定的channel!
#一、定义Agent中各个组件的名称:agent01是agent的名称, #agent01中定义了一个叫r1的source agent01.sources = r1 #agent01定义了2个sink,如果有多个,使用空格间隔; agent01.sinks = k1 k2 #agent01定义了2个channel,如果有多个,使用空格间隔; agent01.channels = c1 c2 #二、配置source:组名名.属性名=属性值 agent01.sources.r1.type=exec agent01.sources.r1.command=tail -f /home/wyr/flumedir/exectest.txt #三、配置r1的channel选择器 agent01.sources.r1.selector.type = multiplexing agent01.sources.r1.selector.header = state agent01.sources.r1.selector.mapping.CZ = c1 agent01.sources.r1.selector.mapping.US = c2 #使用拦截器为event加上某个header:模拟source中带有不同key-value的数据 agent01.sources.r1.interceptors = i1 agent01.sources.r1.interceptors.i1.type = static agent01.sources.r1.interceptors.i1.key = state agent01.sources.r1.interceptors.i1.value = CZ #四、配置chanel agent01.channels.c1.type=memory agent01.channels.c1.capacity=1000 agent01.channels.c2.type=memory agent01.channels.c2.capacity=1000 #五、配置sink agent01.sinks.k1.type=avro agent01.sinks.k1.hostname=hadoop102 agent01.sinks.k1.port=22222 agent01.sinks.k2.type=avro agent01.sinks.k2.hostname=hadoop104 agent01.sinks.k2.port=33333 #六、连接组件:同一个source可以对接多个channel,一个sink只能从一个channel拿数据! agent01.sources.r1.channels=c1 c2 agent01.sinks.k1.channel=c1 agent01.sinks.k2.channel=c2
#一、定义Agent中各个组件的名称:agent02是agent的名称 #agent02中定义了一个叫r1的source,如果有多个,使用空格间隔 agent02.sources = r1 # k1:表示agent02的Sink的名称 agent02.sinks = k1 # c1:表示agent02的Channel的名称 agent02.channels = c1 #二、配置source:组名名.属性名=属性值 agent02.sources.r1.type=avro agent02.sources.r1.bind=hadoop102 agent02.sources.r1.port=22222 #三、配置chanel agent02.channels.c1.type=memory agent02.channels.c1.capacity=1000 #四、配置sink agent02.sinks.k1.type=logger #五、连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据! agent02.sources.r1.channels=c1 agent02.sinks.k1.channel=c1
#一、定义Agent中各个组件的名称:agent03是agent的名称 #agent03中定义了一个叫r1的source,如果有多个,使用空格间隔 agent03.sources = r1 # k1:表示agent03的Sink的名称 agent03.sinks = k1 # c1:表示agent03的Channel的名称 agent03.channels = c1 #二、配置source:组名名.属性名=属性值 agent03.sources.r1.type=avro agent03.sources.r1.bind=hadoop104 agent03.sources.r1.port=33333 #三、配置chanel agent03.channels.c1.type=memory agent03.channels.c1.capacity=1000 #四、配置sink agent03.sinks.k1.type=logger #五、连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据! agent03.sources.r1.channels=c1 agent03.sinks.k1.channel=c1
先在hadoop104上启动Agent03
flume-ng agent -c 其他配置文件所在的目录 -n agent的名称 -f agent配置文件 -Dproperty=value -Dflume.root.logger=info,console
[wyr@hadoop104 flume]$ flume-ng agent -n agent03 -c conf/ -f myagents/agent03-avrosource-loggersink-multiplexing.conf -Dflume.root.logger=info,console
在hadoop102上启动Agent02
[wyr@hadoop102 flume]$ flume-ng agent -n agent02 -c conf/ -f myagents/agent02-avrosource-loggersink-multiplexing.conf -Dflume.root.logger=info,console
最后在hadoop102上启动Agent01
[wyr@hadoop103 flume]$ flume-ng agent -n agent01 -c conf/ -f myagents/agent01-execsource-avrosink-multiplexing.conf -Dflume.root.logger=info,console
测试 Agent:在hadoop01服务器/home/wyr/flumedir/execdir目录下创建文件test01
#一、定义Agent中各个组件的名称:agent01是agent的名称, #agent01中定义了一个叫r1的source agent01.sources = r1 #agent01定义了2个sink,如果有多个,使用空格间隔; agent01.sinks = k1 k2 #agent01定义了1个channel,如果有多个,使用空格间隔; agent01.channels = c1 #二、配置source:组名名.属性名=属性值 agent01.sources.r1.type=exec agent01.sources.r1.command=tail -f /home/wyr/flumedir/exectest.txt #三、配置chanel agent01.channels.c1.type=memory agent01.channels.c1.capacity=1000 #四、配置sink processor agent01.sinkgroups = g1 agent01.sinkgroups.g1.sinks = k1 k2 agent01.sinkgroups.g1.processor.sinks=k1 k2 #Load Balancing Sink Processor agent01.sinkgroups.g1.processor.type = load_balance #Failover Sink Processor #agent01.sinkgroups.g1.processor.type = failover #agent01.sinkgroups.g1.processor.priority.k1=100 #agent01.sinkgroups.g1.processor.priority.k2=90 #五、配置sink agent01.sinks.k1.type=avro agent01.sinks.k1.hostname=hadoop102 agent01.sinks.k1.port=22222 agent01.sinks.k2.type=avro agent01.sinks.k2.hostname=hadoop104 agent01.sinks.k2.port=33333 #六、连接组件:同一个source可以对接多个channel,一个sink只能从一个channel拿数据! agent01.sources.r1.channels=c1 agent01.sinks.k1.channel=c1 agent01.sinks.k2.channel=c1
#一、定义Agent中各个组件的名称:agent02是agent的名称 #agent02中定义了一个叫r1的source,如果有多个,使用空格间隔 agent02.sources = r1 # k1:表示agent02的Sink的名称 agent02.sinks = k1 # c1:表示agent02的Channel的名称 agent02.channels = c1 #二、配置source:组名名.属性名=属性值 agent02.sources.r1.type=avro agent02.sources.r1.bind=hadoop102 agent02.sources.r1.port=22222 #三、配置chanel agent02.channels.c1.type=memory agent02.channels.c1.capacity=1000 #四、配置sink agent02.sinks.k1.type=logger #五、连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据! agent02.sources.r1.channels=c1 agent02.sinks.k1.channel=c1
#一、定义Agent中各个组件的名称:agent03是agent的名称 #agent03中定义了一个叫r1的source,如果有多个,使用空格间隔 agent03.sources = r1 # k1:表示agent03的Sink的名称 agent03.sinks = k1 # c1:表示agent03的Channel的名称 agent03.channels = c1 #二、配置source:组名名.属性名=属性值 agent03.sources.r1.type=avro agent03.sources.r1.bind=hadoop104 agent03.sources.r1.port=33333 #三、配置chanel agent03.channels.c1.type=memory agent03.channels.c1.capacity=1000 #四、配置sink agent03.sinks.k1.type=logger #五、连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据! agent03.sources.r1.channels=c1 agent03.sinks.k1.channel=c1
先在hadoop104上启动Agent03
flume-ng agent -c 其他配置文件所在的目录 -n agent的名称 -f agent配置文件 -Dproperty=value -Dflume.root.logger=info,console
[wyr@hadoop104 flume]$ flume-ng agent -n agent03 -c conf/ -f myagents/agent03-avrosource-loggersink-sinkprocessor.conf -Dflume.root.logger=info,console
在hadoop102上启动Agent02
[wyr@hadoop102 flume]$ flume-ng agent -n agent02 -c conf/ -f myagents/agent02-avrosource-loggersink-sinkprocessor.conf -Dflume.root.logger=info,console
最后在hadoop102上启动Agent01
[wyr@hadoop103 flume]$ flume-ng agent -n agent01 -c conf/ -f myagents/agent01-execsource-avrosink-sinkprocessor.conf -Dflume.root.logger=info,console
测试 Agent:在hadoop01服务器/home/wyr/flumedir/execdir目录下创建文件test01
agentkafka.sources = r1 agentkafka.sinks = k1 agentkafka.channels = c1 # 配置source agentkafka.sources.r1.type = netcat agentkafka.sources.r1.bind = hadoop102 agentkafka.sources.r1.port = 22222 # 配置channel agentkafka.channels.c1.type = memory agentkafka.channels.c1.capacity = 1000 # 配置sink agentkafka.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink agentkafka.sinks.k1.kafka.bootstrap.servers=hadoop103:9092,hadoop102:9092,hadoop104:9092 agentkafka.sinks.k1.kafka.topic=wyrflumekafkatopic01 #将flume 每个event中的body的内容直接写到kafka agentkafka.sinks.k1.useFlumeEventFormat=false # 绑定和连接组件 agentkafka.sources.r1.channels = c1 agentkafka.sinks.k1.channel = c1
启动 Flume 的 Agent
[wyr@hadoop102 flume]$ flume-ng agent -n agentkafka -c conf/ -f myagents/netcatsource-kafkasink.conf -Dflume.root.logger=info,console
在hadoop102机器上向hadoop102服务器的22222端口发送信息
[wyr@hadoop102 ~]$ nc hadoop102 22222
hello kafka
OK
在hadoop104机器上Kafka消费数据
[wyr@hadoop104 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop104:9092 --topic wyrflumekafkatopic01 --from-beginning
hello kafka
agentkafka.sources = r1 agentkafka.sinks = k1 agentkafka.channels = c1 # 配置source agentkafka.sources.r1.type = netcat agentkafka.sources.r1.bind = hadoop102 agentkafka.sources.r1.port = 22222 # 配置拦截器 agentkafka.sources.r1.interceptors = i1 i2 agentkafka.sources.r1.interceptors.i1.type = static agentkafka.sources.r1.interceptors.i1.key = topic agentkafka.sources.r1.interceptors.i1.value = hello agentkafka.sources.r1.interceptors.i2.type = static agentkafka.sources.r1.interceptors.i2.key = key agentkafka.sources.r1.interceptors.i2.value = 1 # 配置channel agentkafka.channels.c1.type = memory agentkafka.channels.c1.capacity = 1000 # 配置sink agentkafka.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink agentkafka.sinks.k1.kafka.bootstrap.servers=hadoop103:9092,hadoop102:9092,hadoop104:9092 agentkafka.sinks.k1.kafka.topic=wyrflumekafkatopic02 agentkafka.sinks.k1.useFlumeEventFormat=true # 绑定和连接组件 agentkafka.sources.r1.channels = c1 agentkafka.sinks.k1.channel = c1
启动 Flume 的 Agent
[wyr@hadoop102 flume]$ flume-ng agent -n agentkafka -c conf/ -f myagents/netcatsource-kafkasink-interceptor.conf -Dflume.root.logger=info,console
在hadoop102机器上向hadoop102服务器的22222端口发送信息
[wyr@hadoop102 ~]$ nc hadoop102 22222
hello kafka
OK
在hadoop104机器上Kafka消费数据
[wyr@hadoop104 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop104:9092 --topic wyrflumekafkatopic02 --from-beginning
hello kafka
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。