赞
踩
目录
1)在job目录下创建文件:file-flume-hdfs.conf
3)开启Hadoop 和 Hive 并操作 Hive 产生日志
3.1、需求:使用Flume监听整个目录的文件,并上传到HDFS
3.3.1、在job目录下创建配置文件dir-flume-hdfs.conf,并添加以下内容
3.3.3、向 upload 文件夹中添加文件,在/opt/module/flume-1.9.0/ 目录下创建 upload 文件夹,并添加以下文件
4.1、需求:使用Flume监听整个目录的实时追加文件,并上传到HDFS
4.3.1、在job目录下创建配置文件taildir-flume-hdfs.conf,并添加以下内容
4.3.3、向files文件加追加内容,在/opt/module/flume-1.9.0目录下创建files和files2文件夹,向该文件夹中添加文件
(1) Flume 官网地址http://flume.apache.org/
(2)文档查看地址http://flume.apache.org/FlumeUserGuide.html
(3)下载地址http://archive.apache.org/dist/flume/
(1)将 apache-flume-1.9.0-bin.tar.gz 上传到 linux 的/opt/software 目录下。
(2)解压 apache-flume-1.9.0-bin.tar.gz 到/opt/module/目录下。
tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/
(3)修改 apache-flume-1.9.0-bin 的名称为 flume-1.9.0
mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume-1.9.0
(4) 将 flume-1.9.0/conf 下 的 flume-env.sh.template 文 件 修 改 为 flume-env.sh , 并配 置flume-env.sh 文件:
mv flume-env.sh.template flume-env.sh
vim flume-env.sh
export JAVA_HOME=/opt/module/jdk1.8.0_271
(5) 将 lib 文件夹下的 guava-11.0.2.jar 删除以兼容 Hadoop 3.1.3
[kgf@hadoop104 flume-1.9.0]$ rm -rf /opt/module/flume-1.9.0/lib/guava-11.0.2.jar
使用Flume监听一个端口,收集该端口数据,并打印到控制台。
通过netcat工具向本机的44444端口发送数据–>Flume监控本机的44444端口,通过Flume的source端读取数据–>Flume把获取的数据通过SInk端写出到控制台。
- //下载netcat
- sudo yum install -y nc
-
- //开启服务端端口
- nc -lk 9999
-
- //开启服务端进行通信
- nc localhost 9999
sudo netstat -nlp | grep 44444
# Name the components on this agent #组件声明 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port =44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel whichbuffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
- //方式一
- bin/flume-ng agent --conf conf/ --name a1 --conf-file job/netcat-flume-logger.conf -Dflume.root.logger=INFO,console
-
- //方式二
- bin/flume-ng agent -c conf/ -n a1 -f job/netcat-flume-logger.conf -Dflume.root.logger=INFO,console
参数说明:
nc localhost 44444
hello
实时监控Hive日志,并上传到HDFS中。
要想读取 Linux 系统中的文件,就得按照 Linux 命令的规则执行命令。由于 Hive日志在 Linux 系统中所以读取文件的类型选择:exec 即 execute 执行的意思。表示执行 Linux 命令来读取文件。
# Name the components on this agent a2.sources = r2 a2.sinks = k2 a2.channels = c2 # Describe/configure the source #定义source类型为exec可执行命令的 a2.sources.r2.type = exec a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log # Describe the sink a2.sinks.k2.type = hdfs a2.sinks.k2.hdfs.path = hdfs://hadoop102/flume/%Y%m%d/%H #上传文件的前缀 a2.sinks.k2.hdfs.filePrefix = logs- #是否按照时间滚动文件夹 a2.sinks.k2.hdfs.round= true #多少时间单位创建一个新的文件夹 a2.sinks.k2.hdfs.roundValue = 1 #重新定义时间单位 a2.sinks.k2.hdfs.roundUnit = hour #是否使用本地时间戳 a2.sinks.k2.hdfs.useLocalTimeStamp = true #积攒多少个Event 才 flush 到 HDFS 一次 a2.sinks.k2.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a2.sinks.k2.hdfs.fileType = DataStream #多久生成一个新的文件(单位s) a2.sinks.k2.hdfs.rollInterval = 60 #设置每个文件的滚动大小 a2.sinks.k2.hdfs.rollSize = 134217700 #文件的滚动与Event 数量无关 a2.sinks.k2.hdfs.rollCount = 0 # Use a channelwhich buffers eventsin memory a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2
- bin/flume-ng agent --conf conf/ --name a2 --conf-file job/file-flume-hdfs.conf
- //或用
- bin/flume-ng agent -n a2 -c conf/ -f job/file-flume-hdfs.conf
Exec source 适用于监控一个实时追加的文件。
缺点:tail只能传最后10条数据,不支持断点续传
a3.sources = r3 a3.sinks = k3 a3.channels = c3 # Describe/configure the source #定义source类型为目录 a3.sources.r3.type = spooldir #监控目录 a3.sources.r3.spoolDir = /opt/module/flume-1.9.0/upload #定义文件上传完的后缀 a3.sources.r3.fileSuffix = .COMPLETED #是否有文件头 a3.sources.r3.fileHeader = true #忽略所有以.tmp结尾的文件,不上传 a3.sources.r3.ignorePattern = ([^ ]*\.tmp) # Describe the sink a3.sinks.k3.type= hdfs a3.sinks.k3.hdfs.path =hdfs://hadoop102/flume/upload/%Y%m%d/%H #上传文件的前缀 a3.sinks.k3.hdfs.filePrefix = upload- #是否按照时间滚动文件夹 a3.sinks.k3.hdfs.round= true #多少时间单位创建一个新的文件夹 a3.sinks.k3.hdfs.roundValue = 1 #重新定义时间单位 a3.sinks.k3.hdfs.roundUnit = hour #是否使用本地时间戳 a3.sinks.k3.hdfs.useLocalTimeStamp = true #积攒多少个 Event 才 flush 到 HDFS 一次 a3.sinks.k3.hdfs.batchSize= 100 #设置文件类型,可支持压缩 a3.sinks.k3.hdfs.fileType = DataStream #多久生成一个新的文件 a3.sinks.k3.hdfs.rollInterval = 60 #设置每个文件的滚动大小大概是 128M a3.sinks.k3.hdfs.rollSize = 134217700 #文件的滚动与Event 数量无关 a3.sinks.k3.hdfs.rollCount = 0 # Use a channel which buffers events in memory a3.channels.c3.type = memory a3.channels.c3.capacity = 1000 a3.channels.c3.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r3.channels = c3 a3.sinks.k3.channel = c3
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/dir-flume-hdfs.conf
在使用 Spooling Directory Source 时,不要在监控目录中创建并持续修改文件;上传完成的文件会以.COMPLETED 结尾;被监控文件夹每 500 毫秒扫描一次文件变动。
touch test.txt
touch text.tmp
touch test.log
Spooldir Source适合用于同步新文件。
缺点:不适合对实时追加日志的文件进行监听并同步
Taildir Source适合用于监听多个实时追加的文件,并且能够实现断点续传。
Taildir 说明:
Taildir Source 维护了一个 json 格式的 position File,其会定期的往 position File中更新每个文件读取到的最新的位置,因此能够实现断点续传。
Position File 的格式如下:
- {"inode":2496272,"pos":12,"file":"/opt/module/flume-1.9.0/files/file1.txt"}
- {"inode":2496275,"pos":12,"file":"/opt/module/flume-1.9.0/files/file2.txt"}
注:Linux 中储存文件元数据的区域就叫做 inode,每个 inode 都有一个号码,操作系统用 inode 号码来识别不同的文件,Unix/Linux 系统内部不使用文件名,而使用 inode 号码来识别文件。
- a3.sources = r3
- a3.sinks = k3
- a3.channels = c3
-
- # Describe/configure the source
- #定义source类型
- a3.sources.r3.type = TAILDIR
- #指定position——file的位置
- a3.sources.r3.positionFile = /opt/module/flume-1.9.0/tail_dir.json
- a3.sources.r3.filegroups = f1 f2
- #定义监控目录文件
- a3.sources.r3.filegroups.f1 = /opt/module/flume/files/.*file.*
- a3.sources.r3.filegroups.f2 = /opt/module/flume/files2/.*log.*
-
- # Describe the sink
- a3.sinks.k3.type= hdfs
- a3.sinks.k3.hdfs.path =hdfs://hadoop102/flume/upload2/%Y%m%d/%H
- #上传文件的前缀
- a3.sinks.k3.hdfs.filePrefix = upload-
- #是否按照时间滚动文件夹
- a3.sinks.k3.hdfs.round = true
- #多少时间单位创建一个新的文件夹
- a3.sinks.k3.hdfs.roundValue = 1
- #重新定义时间单位
- a3.sinks.k3.hdfs.roundUnit = hour
- #是否使用本地时间戳
- a3.sinks.k3.hdfs.useLocalTimeStamp = true
- #积攒多少个 Event才 flush 到 HDFS 一次
- a3.sinks.k3.hdfs.batchSize= 100
- #设置文件类型,可支持压缩
- a3.sinks.k3.hdfs.fileType = DataStream
- #多久生成一个新的文件
- a3.sinks.k3.hdfs.rollInterval= 60
- #设置每个文件的滚动大小大概是 128M
- a3.sinks.k3.hdfs.rollSize = 134217700
- #文件的滚动与Event 数量无关
- a3.sinks.k3.hdfs.rollCount = 0
-
- # Use a channelwhich buffers eventsin memory
- a3.channels.c3.type= memory
- a3.channels.c3.capacity = 1000
- a3.channels.c3.transactionCapacity = 100
-
- # Bind the sourceand sink to the channel
- a3.sources.r3.channels = c3
- a3.sinks.k3.channel = c3
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/taildir-flume-hdfs.conf
#files文件夹下
touch file1.txt
touch file2.txt
echo hello >> file1.txt
echo lyx >> file2.txt#files2文件夹下
touch file3.log
echo hi >> file3.log
上述存在一个问题:
若遇到文件定期更改文件名,并且重新创建一个新原始文件的名字的文件,监控到并上传的数据将是累积的文件内容,并不是更新的内容数据,导致数据重复。
解决:
修改源码中更新和读取的操作,然后将修改好的文件打包下载到本地,再上传到lib目录下进行源码替换,重新启动监控文件夹命令
(1)更新操作:只看inode的值。
(2)读取操作:只要文件是新文件,只看inode的值,不看绝对路径。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。