赞
踩
Apache Flume是一种分布式、可靠和可用的系统,用于高效收集、聚合,以及将大量日志数据从许多不同的来源移动到集中式数据存储上。使用Apache Flume不仅限于日志数据的聚合。由于数据源是可定制的,因此可以使用Flume来传输大量的事件数据,包括但不限于网络流量数据、社交媒体生成的数据、电子邮件消息和其他数据源。
Flume使用两个独立的事务负责从Source到Channel及从Channel到Sink的事件传递。Channel中的File Channel具有持久性,事件写入File Channel后,即使Agent重新启动,事件也不会丢失。Flume中还提供了一种Memory Channel的方式,但它不具有持久存储的能力,但是与File Channel相比,MemoryChannel的优点是具有较高的吞吐量。
Flume的主要组件有Event、Client、Agent、Source、Channel和Sink等。
1、下载
wget https://mirrors.bfsu.edu.cn/apache/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
2、解压
tar -zxvf apache-flume-1.9.0-bin.tar.gz -C ../servers/
3、设置环境变量
vim /etc/profile
增加如下配置:
export FLUME_HOME=/export/servers/apache-flume-1.9.0-bin
export PATH=:$FLUME_HOME/bin:$PATH
刷新配置
source /etc/profile
1、创建文件目录
#作为数据源
mkdir /export/source
#作为输出目录
mkdir /export/dist
2、创建配置文件
配置文件是实现Flume数据采集的核心。这里主要配置采集源和输出目录等信息,如下所示:
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置 Source a1.sources.r1.type =spooldir a1.sources.r1.spoolDir=/export/source #配置Sink a1.sinks.k1.type =file_roll a1.sinks.k1.sink.directory=/export/dist # 设置Channel类型为Memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 把 Source 和 Sink 绑到 Channel上 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
3、启动Flume代理
bin/flume-ng agent -n a1 -c conf -f ./job/job1.conf -Dflume.root.logger=INFO,console
4、测试
启动成功后,在source目录下,创建test.txt文件,然后会看到控制台打印如下内容:
这个时候,source目录下的文件变成了test.txt.COMPLETED,说明读取成功了。
1、编写配置文件
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置 Source a1.sources.r1.type =spooldir a1.sources.r1.spoolDir=/export/source # 配置 Sink a1.sinks.k1.type =hdfs #按照%Y-%m-%d/%H%M格式分开存储文件 a1.sinks.k1.hdfs.path=hdfs://node01:8020/flume/data/%Y-%m-%d/%H%M a1.sinks.k1.hdfs.rollInterval=0 a1.sinks.k1.hdfs.rollSize=10240000 a1.sinks.k1.hdfs.rollCount=0 a1.sinks.k1.hdfs.idleTimeout=3 a1.sinks.k1.hdfs.fileType=DataStream a1.sinks.k1.hdfs.round=true a1.sinks.k1.hdfs.roundValue=10 a1.sinks.k1.hdfs.roundUnit=minute a1.sinks.k1.hdfs.useLocalTimeStamp=true #a1.sinks.k1.type =hdfs # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 绑定 Source 和 Sink 到 Channel 上 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
其中,a1.sinks.k1.hdfs.path的值,根据自己的配置填写。
2、启动Flume代理
bin/flume-ng agent -n a1 -c conf -f ./job/job2.conf -Dflume.root.logger=INFO,console
3、测试
启动成功后,在source目录下,创建t1.txt文件。然后访问hadoop的hdfs,比如:http://192.168.1.8:50070/explorer.html#/flume/data,就可以看见上传到hdfs的文件。
a1.sources = logSource a1.channels = fileChannel a1.sinks = hdfsSink #指定Source的类型是exec a1.sources.logSource.type = exec #指定命令是tial -F,持续监测/export/dist/test.txt中的数据 a1.sources.logSource.command = tail -F /export/dist/test.txt # 将Channel设置为fileChannel a1.sources.logSource.channels = fileChannel # 设置Sink为HDFS a1.sinks.hdfsSink.type = hdfs #文件生成的时间 a1.sinks.hdfsSink.hdfs.path = hdfs://master:8020/flume/record/%Y-%m-%d/%H%M a1.sinks.hdfsSink.hdfs.filePrefix= transaction_log a1.sinks.hdfsSink.hdfs.rollInterval= 600 a1.sinks.hdfsSink.hdfs.rollCount= 10000 a1.sinks.hdfsSink.hdfs.rollSize= 0 a1.sinks.hdfsSink.hdfs.round = true a1.sinks.hdfsSink.hdfs.roundValue = 10 a1.sinks.hdfsSink.hdfs.roundUnit = minute a1.sinks.hdfsSink.hdfs.fileType = DataStream a1.sinks.hdfsSink.hdfs.useLocalTimeStamp = true #Specify the channel the sink should use a1.sinks.hdfsSink.channel = fileChannel # 设置 Channel的类型为file,并设置断点目录和channel数据存放目录 a1.channels.fileChannel.type = file a1.channels.fileChannel.checkpointDir= /export/flume/dataCheckpointDir a1.channels.fileChannel.dataDirs= /export/flume/dataDir
2、启动Flume代理
bin/flume-ng agent -n a1 -c conf -f ./job/job3.conf -Dflume.root.logger=INFO,console
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。