当前位置:   article > 正文

Flume安装部署以及简单案例入门_部署flume

部署flume

目录

一、Flume安装部署

1、Flume 相关地址

2、安装部署

二、Flume入门案例

1、监控端口数据官方案例

   1.1、需求:

   1.2、分析

   1.3、步骤如下

       1.3.1、安装使用netcat

        1.3.2、判断44444端口是否被占用

        1.3.3、在flume-1.9.0下创建job文件夹,在job下创建Flume Agent配置文件netcat-flume-logger.conf(表示数据源-经过flume-到sink),添加以下内容

     1.3.4、开启flume监听端口

  1.3.5、使用netcat向本机的44444端口发送内容

  1.3.6、在Flume监听页面观察接受数据情况 

2、实时监控单个追加文件

2.1、需求:

2.2、分析

 2.3、步骤

1)在job目录下创建文件:file-flume-hdfs.conf

2)运行Flume

3)开启Hadoop 和 Hive 并操作 Hive 产生日志

3、实时监控目录下多个新文件

3.1、需求:使用Flume监听整个目录的文件,并上传到HDFS

3.2、分析

3.3、步骤

3.3.1、在job目录下创建配置文件dir-flume-hdfs.conf,并添加以下内容

3.3.2、启动监控文件命令

3.3.3、向 upload 文件夹中添加文件,在/opt/module/flume-1.9.0/ 目录下创建 upload 文件夹,并添加以下文件

4、实时监控目录下的多个追加文件

4.1、需求:使用Flume监听整个目录的实时追加文件,并上传到HDFS

4.2、分析

​编辑      

4.3、步骤

4.3.1、在job目录下创建配置文件taildir-flume-hdfs.conf,并添加以下内容

4.3.2、启动监控文件夹命令

4.3.3、向files文件加追加内容,在/opt/module/flume-1.9.0目录下创建files和files2文件夹,向该文件夹中添加文件

 4.3.4、查看HDFS上的数据


一、Flume安装部署

1、Flume 相关地址

(1) Flume 官网地址http://flume.apache.org/
(2)文档查看地址http://flume.apache.org/FlumeUserGuide.html
(3)下载地址http://archive.apache.org/dist/flume/

2、安装部署

(1)将 apache-flume-1.9.0-bin.tar.gz 上传到 linux 的/opt/software 目录下。
(2)解压 apache-flume-1.9.0-bin.tar.gz 到/opt/module/目录下。

tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/

(3)修改 apache-flume-1.9.0-bin 的名称为 flume-1.9.0

mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume-1.9.0 

 (4) 将 flume-1.9.0/conf 下 的 flume-env.sh.template 文 件 修 改 为 flume-env.sh , 并配 置flume-env.sh 文件:

 mv flume-env.sh.template flume-env.sh
 vim flume-env.sh
 export JAVA_HOME=/opt/module/jdk1.8.0_271

  (5) 将 lib 文件夹下的 guava-11.0.2.jar 删除以兼容 Hadoop 3.1.3

[kgf@hadoop104 flume-1.9.0]$ rm -rf /opt/module/flume-1.9.0/lib/guava-11.0.2.jar 


二、Flume入门案例

1、监控端口数据官方案例

   1.1、需求:

        使用Flume监听一个端口,收集该端口数据,并打印到控制台。

   1.2、分析

         通过netcat工具向本机的44444端口发送数据–>Flume监控本机的44444端口,通过Flume的source端读取数据–>Flume把获取的数据通过SInk端写出到控制台。

   1.3、步骤如下

       1.3.1、安装使用netcat
  1. //下载netcat
  2. sudo yum install -y nc
  3. //开启服务端端口
  4. nc -lk 9999
  5. //开启服务端进行通信
  6. nc localhost 9999
        1.3.2、判断44444端口是否被占用

sudo netstat -nlp | grep 44444

        1.3.3、在flume-1.9.0下创建job文件夹,在job下创建Flume Agent配置文件netcat-flume-logger.conf(表示数据源-经过flume-到sink),添加以下内容
  1. # Name the components on this agent
  2. #组件声明
  3. a1.sources = r1
  4. a1.sinks = k1
  5. a1.channels = c1
  6. # Describe/configure the source
  7. a1.sources.r1.type = netcat
  8. a1.sources.r1.bind = localhost
  9. a1.sources.r1.port =44444
  10. # Describe the sink
  11. a1.sinks.k1.type = logger
  12. # Use a channel whichbuffers events in memory
  13. a1.channels.c1.type = memory
  14. a1.channels.c1.capacity = 1000
  15. a1.channels.c1.transactionCapacity = 100
  16. # Bind the source and sink to the channel
  17. a1.sources.r1.channels = c1
  18. a1.sinks.k1.channel = c1

     1.3.4、开启flume监听端口
  1. //方式一
  2. bin/flume-ng agent --conf conf/ --name a1 --conf-file job/netcat-flume-logger.conf -Dflume.root.logger=INFO,console
  3. //方式二
  4. bin/flume-ng agent -c conf/ -n a1 -f job/netcat-flume-logger.conf -Dflume.root.logger=INFO,console

参数说明:

  • (i)–conf/-c:配置文件在conf/目录。
  • (ii)–name/-n:表示给agent起名为a1。
  • (iii)–conf-file/-f:flume本次启动读取的配置文件在job目录下的netcat-flume-logger.conf文件。
  • (iv)-Dflume.root.logger-INFO,console:-D 表示 flume 运行时动态修改flume.root.logger参数属性值,并将控制台日志打印级别设置为 INFO 级别。日志级别包括:log、info、warn、 error。
  1.3.5、使用netcat向本机的44444端口发送内容

nc localhost 44444
hello

  1.3.6、在Flume监听页面观察接受数据情况 

2、实时监控单个追加文件

2.1、需求:

        实时监控Hive日志,并上传到HDFS中。

2.2、分析

 2.3、步骤

1)在job目录下创建文件:file-flume-hdfs.conf

        要想读取 Linux 系统中的文件,就得按照 Linux 命令的规则执行命令。由于 Hive日志在 Linux 系统中所以读取文件的类型选择:exec 即 execute 执行的意思。表示执行 Linux 命令来读取文件。

  1. # Name the components on this agent
  2. a2.sources = r2
  3. a2.sinks = k2
  4. a2.channels = c2
  5. # Describe/configure the source
  6. #定义source类型为exec可执行命令的
  7. a2.sources.r2.type = exec
  8. a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
  9. # Describe the sink
  10. a2.sinks.k2.type = hdfs
  11. a2.sinks.k2.hdfs.path = hdfs://hadoop102/flume/%Y%m%d/%H
  12. #上传文件的前缀
  13. a2.sinks.k2.hdfs.filePrefix = logs-
  14. #是否按照时间滚动文件夹
  15. a2.sinks.k2.hdfs.round= true
  16. #多少时间单位创建一个新的文件夹
  17. a2.sinks.k2.hdfs.roundValue = 1
  18. #重新定义时间单位
  19. a2.sinks.k2.hdfs.roundUnit = hour
  20. #是否使用本地时间戳
  21. a2.sinks.k2.hdfs.useLocalTimeStamp = true
  22. #积攒多少个Event 才 flush 到 HDFS 一次
  23. a2.sinks.k2.hdfs.batchSize = 100
  24. #设置文件类型,可支持压缩
  25. a2.sinks.k2.hdfs.fileType = DataStream
  26. #多久生成一个新的文件(单位s)
  27. a2.sinks.k2.hdfs.rollInterval = 60
  28. #设置每个文件的滚动大小
  29. a2.sinks.k2.hdfs.rollSize = 134217700
  30. #文件的滚动与Event 数量无关
  31. a2.sinks.k2.hdfs.rollCount = 0
  32. # Use a channelwhich buffers eventsin memory
  33. a2.channels.c2.type = memory
  34. a2.channels.c2.capacity = 1000
  35. a2.channels.c2.transactionCapacity = 100
  36. # Bind the source and sink to the channel
  37. a2.sources.r2.channels = c2
  38. a2.sinks.k2.channel = c2
2)运行Flume
  1. bin/flume-ng agent --conf conf/ --name a2 --conf-file job/file-flume-hdfs.conf
  2. //或用
  3. bin/flume-ng agent -n a2 -c conf/ -f job/file-flume-hdfs.conf
3)开启Hadoop 和 Hive 并操作 Hive 产生日志

    Exec source 适用于监控一个实时追加的文件
    缺点:tail只能传最后10条数据,不支持断点续传


3、实时监控目录下多个新文件

3.1、需求:使用Flume监听整个目录的文件,并上传到HDFS

3.2、分析

3.3、步骤

3.3.1、在job目录下创建配置文件dir-flume-hdfs.conf,并添加以下内容
  1. a3.sources = r3
  2. a3.sinks = k3
  3. a3.channels = c3
  4. # Describe/configure the source
  5. #定义source类型为目录
  6. a3.sources.r3.type = spooldir
  7. #监控目录
  8. a3.sources.r3.spoolDir = /opt/module/flume-1.9.0/upload
  9. #定义文件上传完的后缀
  10. a3.sources.r3.fileSuffix = .COMPLETED
  11. #是否有文件头
  12. a3.sources.r3.fileHeader = true
  13. #忽略所有以.tmp结尾的文件,不上传
  14. a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
  15. # Describe the sink
  16. a3.sinks.k3.type= hdfs
  17. a3.sinks.k3.hdfs.path =hdfs://hadoop102/flume/upload/%Y%m%d/%H
  18. #上传文件的前缀
  19. a3.sinks.k3.hdfs.filePrefix = upload-
  20. #是否按照时间滚动文件夹
  21. a3.sinks.k3.hdfs.round= true
  22. #多少时间单位创建一个新的文件夹
  23. a3.sinks.k3.hdfs.roundValue = 1
  24. #重新定义时间单位
  25. a3.sinks.k3.hdfs.roundUnit = hour
  26. #是否使用本地时间戳
  27. a3.sinks.k3.hdfs.useLocalTimeStamp = true
  28. #积攒多少个 Event 才 flush 到 HDFS 一次
  29. a3.sinks.k3.hdfs.batchSize= 100
  30. #设置文件类型,可支持压缩
  31. a3.sinks.k3.hdfs.fileType = DataStream
  32. #多久生成一个新的文件
  33. a3.sinks.k3.hdfs.rollInterval = 60
  34. #设置每个文件的滚动大小大概是 128M
  35. a3.sinks.k3.hdfs.rollSize = 134217700
  36. #文件的滚动与Event 数量无关
  37. a3.sinks.k3.hdfs.rollCount = 0
  38. # Use a channel which buffers events in memory
  39. a3.channels.c3.type = memory
  40. a3.channels.c3.capacity = 1000
  41. a3.channels.c3.transactionCapacity = 100
  42. # Bind the source and sink to the channel
  43. a3.sources.r3.channels = c3
  44. a3.sinks.k3.channel = c3
3.3.2、启动监控文件命令

bin/flume-ng agent --conf conf/ --name a3 --conf-file job/dir-flume-hdfs.conf

         在使用 Spooling Directory Source 时,不要在监控目录中创建并持续修改文件;上传完成的文件会以.COMPLETED 结尾;被监控文件夹每 500 毫秒扫描一次文件变动。

3.3.3、向 upload 文件夹中添加文件,在/opt/module/flume-1.9.0/ 目录下创建 upload 文件夹,并添加以下文件

touch test.txt
touch text.tmp
touch test.log

 

 Spooldir Source适合用于同步新文件。
缺点:不适合对实时追加日志的文件进行监听并同步


4、实时监控目录下的多个追加文件

Taildir Source适合用于监听多个实时追加的文件,并且能够实现断点续传。
Taildir 说明:

        Taildir Source 维护了一个 json 格式的 position File,其会定期的往 position File中更新每个文件读取到的最新的位置,因此能够实现断点续传。

Position File 的格式如下:

  1. {"inode":2496272,"pos":12,"file":"/opt/module/flume-1.9.0/files/file1.txt"}
  2. {"inode":2496275,"pos":12,"file":"/opt/module/flume-1.9.0/files/file2.txt"}

  注:Linux 中储存文件元数据的区域就叫做 inode,每个 inode 都有一个号码,操作系统用 inode 号码来识别不同的文件,Unix/Linux 系统内部不使用文件名,而使用 inode 号码来识别文件。

4.1、需求:使用Flume监听整个目录的实时追加文件,并上传到HDFS

4.2、分析

      

4.3、步骤

4.3.1、在job目录下创建配置文件taildir-flume-hdfs.conf,并添加以下内容
  1. a3.sources = r3
  2. a3.sinks = k3
  3. a3.channels = c3
  4. # Describe/configure the source
  5. #定义source类型
  6. a3.sources.r3.type = TAILDIR
  7. #指定position——file的位置
  8. a3.sources.r3.positionFile = /opt/module/flume-1.9.0/tail_dir.json
  9. a3.sources.r3.filegroups = f1 f2
  10. #定义监控目录文件
  11. a3.sources.r3.filegroups.f1 = /opt/module/flume/files/.*file.*
  12. a3.sources.r3.filegroups.f2 = /opt/module/flume/files2/.*log.*
  13. # Describe the sink
  14. a3.sinks.k3.type= hdfs
  15. a3.sinks.k3.hdfs.path =hdfs://hadoop102/flume/upload2/%Y%m%d/%H
  16. #上传文件的前缀
  17. a3.sinks.k3.hdfs.filePrefix = upload-
  18. #是否按照时间滚动文件夹
  19. a3.sinks.k3.hdfs.round = true
  20. #多少时间单位创建一个新的文件夹
  21. a3.sinks.k3.hdfs.roundValue = 1
  22. #重新定义时间单位
  23. a3.sinks.k3.hdfs.roundUnit = hour
  24. #是否使用本地时间戳
  25. a3.sinks.k3.hdfs.useLocalTimeStamp = true
  26. #积攒多少个 Event才 flush 到 HDFS 一次
  27. a3.sinks.k3.hdfs.batchSize= 100
  28. #设置文件类型,可支持压缩
  29. a3.sinks.k3.hdfs.fileType = DataStream
  30. #多久生成一个新的文件
  31. a3.sinks.k3.hdfs.rollInterval= 60
  32. #设置每个文件的滚动大小大概是 128M
  33. a3.sinks.k3.hdfs.rollSize = 134217700
  34. #文件的滚动与Event 数量无关
  35. a3.sinks.k3.hdfs.rollCount = 0
  36. # Use a channelwhich buffers eventsin memory
  37. a3.channels.c3.type= memory
  38. a3.channels.c3.capacity = 1000
  39. a3.channels.c3.transactionCapacity = 100
  40. # Bind the sourceand sink to the channel
  41. a3.sources.r3.channels = c3
  42. a3.sinks.k3.channel = c3
4.3.2、启动监控文件夹命令

bin/flume-ng agent --conf conf/ --name a3 --conf-file job/taildir-flume-hdfs.conf

4.3.3、向files文件加追加内容,在/opt/module/flume-1.9.0目录下创建files和files2文件夹,向该文件夹中添加文件

#files文件夹下
touch file1.txt
touch file2.txt
echo hello >> file1.txt
echo lyx >> file2.txt

#files2文件夹下
touch file3.log
echo hi >> file3.log

 4.3.4、查看HDFS上的数据

上述存在一个问题:
  若遇到文件定期更改文件名,并且重新创建一个新原始文件的名字的文件,监控到并上传的数据将是累积的文件内容,并不是更新的内容数据,导致数据重复。
解决:
  修改源码中更新和读取的操作,然后将修改好的文件打包下载到本地,再上传到lib目录下进行源码替换,重新启动监控文件夹命令

(1)更新操作:只看inode的值。

(2)读取操作:只要文件是新文件,只看inode的值,不看绝对路径。

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/88927
推荐阅读
相关标签
  

闽ICP备14008679号