赞
踩
Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume 基于流式架构,灵活简单。
骚戴理解:注意这里是日志采集,也就是只能采集文本类型的数据!Flume的作用的特点就是可以实时采集!
Flume 组成架构如下图所示
Agent 是一个 JVM 进程,它以事件的形式将数据从源头送至目的。
Agent 主要有 3 个部分组成,Source、Channel、Sink。
Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、taildir、sequence generator、syslog、http、legacy。
Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。
Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义。
Channel 是位于 Source 和 Sink 之间的缓冲区。因此,Channel 允许 Source 和 Sink 运作在不同的速率上。Channel 是线程安全的,可以同时处理几个 Source 的写入操作和几个Sink 的读取操作。
Flume 自带两种 Channel:Memory Channel 和 File Channel。
Memory Channel 是内存中的队列。Memory Channel 在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么 Memory Channel 就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。
File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。
传输单元,Flume 数据传输的基本单元,以 Event 的形式将数据从源头送至目的地。
Event 由 Header 和 Body 两部分组成,Header 用来存放该 event 的一些属性,为 K-V 结构,Body 用来存放该条数据,形式为字节数组。
(1)Flume 官网地址:http://flume.apache.org/
(2)文档查看地址:http://flume.apache.org/FlumeUserGuide.html
(3)下载地址:http://archive.apache.org/dist/flume/
2.1.2 安装部署
(1)将 apache-flume-1.9.0-bin.tar.gz 上传到 linux 的/opt/software 目录下
(2)解压 apache-flume-1.9.0-bin.tar.gz 到/opt/module/目录下
[atguigu@hadoop102 software]$ tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/
(3)修改 apache-flume-1.9.0-bin 的名称为 flume
[atguigu@hadoop102 module]$ mv /opt/module/apache-flume-1.9.0-bin/opt/module/flume
(4)将 lib 文件夹下的 guava-11.0.2.jar 删除以兼容 Hadoop 3.1.3
[atguigu@hadoop102 lib]$ rm /opt/module/flume/lib/guava-11.0.2.jar
1)案例需求:
使用 Flume 监听一个端口,收集该端口数据,并打印到控制台。
2)需求分析:
3)实现步骤:
(1)安装 netcat 工具
[atguigu@hadoop102 software]$ sudo yum install -y nc
骚戴理解:在yum命令中,-y选项表示在安装软件包时自动确认所有提示信息,而不需要手动输入"yes"或回车键确认。这使得批量安装软件包更加方便和快捷。因此,在执行 yum install -y 命令时,系统会默认确认所有的提示信息。
(2)判断 44444 端口是否被占用
[atguigu@hadoop102 flume-telnet]$ sudo netstat -nlp | grep 44444
(3)创建 Flume Agent 配置文件 flume-netcat-logger.conf
(4)在 flume 目录下创建 job 文件夹并进入 job 文件夹。
[atguigu@hadoop102 flume]$ mkdir job
[atguigu@hadoop102 flume]$ cd job/
(5)在 job 文件夹下创建 Flume Agent 配置文件 flume-netcat-logger.conf。
[atguigu@hadoop102 job]$ vim flume-netcat-logger.conf
(6)在 flume-netcat-logger.conf 文件中添加如下内容。
添加内容如下:
- # Name the components on this agent
- a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
- # Describe/configure the source
- a1.sources.r1.type = netcat
- a1.sources.r1.bind = localhost
- a1.sources.r1.port = 44444
- # Describe the sink
- a1.sinks.k1.type = logger
- # Use a channel which buffers events in memory
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 1000
- a1.channels.c1.transactionCapacity = 100 事务容量
- # Bind the source and sink to the channel
- a1.sources.r1.channels = c1
- a1.sinks.k1.channel = c1
注:配置文件来源于官方手册 http://flume.apache.org/FlumeUserGuide.html
骚戴理解:注意下面的第一个channel是有s的,第二个是没有s的,说明sources可以有多个channel,但是sink只能和一个channel相连接
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
(7)开启 flume 监听端口(第二个开启)
第一种写法:
- [atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name
- a1 --conf-file job/flume-netcat-logger.conf -
- Dflume.root.logger=INFO,console
第二种写法(推荐):
- [atguigu@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f
- job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
参数说明:
--conf/-c:表示配置文件存储在 conf/目录
--name/-n:表示给 agent 起名为 a1( agent 起名要和上面的flume-netcat-logger.conf保持一致)
--conf-file/-f:flume 本次启动读取的配置文件是在 job 文件夹下的flume-netcat-logger.conf文件。
-Dflume.root.logger=INFO,console :-D 表示 flume 运行时动态修改 flume.root.logger参数属性值,并将控制台日志打印级别设置为 INFO 级别。日志级别包括:log、info、warn、error。
骚戴理解:上面的-c是指的flume自己的配置文件conf的相对路径,而不是自己写的flume-netcat-logger.conf文件的路径
(8)使用 netcat 工具开启服务端(提前开启)
[atguigu@hadoop102 ~]$ nc -lk 44444
(9)使用 netcat 工具向本机的 44444 端口发送内容
- [atguigu@hadoop102 ~]$ nc localhost 44444
- hello
- atguigu
(10)在 Flume 监听页面观察接收数据情况
1)案例需求:实时监控 Hive 日志,并上传到 HDFS 中
2)需求分析:
3)实现步骤:
(1)Flume 要想将数据输出到 HDFS,依赖 Hadoop 相关 jar 包
检查/etc/profile.d/my_env.sh 文件,确认 Hadoop 和 Java 环境变量配置正确
- JAVA_HOME=/opt/module/jdk1.8.0_212
- HADOOP_HOME=/opt/module/ha/hadoop-3.1.3
- PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
- export PATH JAVA_HOME HADOOP_HOME
(2)创建 flume-file-hdfs.conf 文件
[atguigu@hadoop102 job]$ vim flume-file-hdfs.conf
添加如下内容
- # Name the components on this agent
- a2.sources = r2
- a2.sinks = k2
- a2.channels = c2
-
- # Describe/configure the source
- a2.sources.r2.type = exec
- a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
-
- # Describe the sink
- a2.sinks.k2.type = hdfs
- a2.sinks.k2.hdfs.path = hdfs://hadoop102:9820/flume/%Y%m%d/%H
- #上传文件的前缀
- a2.sinks.k2.hdfs.filePrefix = logs-
-
- #是否按照时间滚动文件夹
- a2.sinks.k2.hdfs.round = true
- #多少时间单位创建一个新的文件夹
- a2.sinks.k2.hdfs.roundValue = 1
- #重新定义时间单位
- a2.sinks.k2.hdfs.roundUnit = hour
- #是否使用本地时间戳
- a2.sinks.k2.hdfs.useLocalTimeStamp = true
-
- #积攒多少个 Event 才 flush 到 HDFS 一次
- a2.sinks.k2.hdfs.batchSize = 100
- #设置文件类型,可支持压缩
- a2.sinks.k2.hdfs.fileType = DataStream
-
- #多久生成一个新的文件
- a2.sinks.k2.hdfs.rollInterval = 60
- #设置每个文件的滚动大小
- a2.sinks.k2.hdfs.rollSize = 134217700
- #文件的滚动与 Event 数量无关
- a2.sinks.k2.hdfs.rollCount = 0
-
- # Use a channel which buffers events in memory
- a2.channels.c2.type = memory
- a2.channels.c2.capacity = 1000
- a2.channels.c2.transactionCapacity = 100
- # Bind the source and sink to the channel
- a2.sources.r2.channels = c2
- a2.sinks.k2.channel = c2
骚戴理解:
要想读取 Linux 系统中的文件,就得按照 Linux 命令的规则执行命令。由于 Hive日志在 Linux 系统中所以读取文件的类型选择:exec 即 execute 执行的意思。表示执行Linux 命令来读取文件。
tail -F /opt/module/hive/logs/hive.log它的意思是实时跟踪并输出/logs/hive.log文件的末尾(即最新的日志信息),并且在文件发生变化时会自动更新。其中,“-F”参数表示跟踪文件,而不是简单地读取文件。
a2.sinks.k2.hdfs.path中的%Y%m%d/和 %H 是时间格式化字符串,表示按照年月日时刻创建不同的目录存储 flume 数据。例如,如果 flume 在 2022 年 10 月 1 日 12 点启动,那么数据将被写入到hdfs://hadoop102:9820/flume/20221001/12/ 目录下
a2.sinks.k2.hdfs.round = true和a2.sinks.k2.hdfs.roundValue = 1和a2.sinks.k2.hdfs.roundUnit = hour的意思就是每一个小时就生成一个新的文件夹,然后后面的数据就都放到这个新文件夹的文件里,注意这里是生成新的文件夹,不是文件!后面的配置才是文件!
a2.sinks.k2.hdfs.useLocalTimeStamp = true这个必须开启,因为前面的a2.sinks.k2.hdfs.path = hdfs://hadoop102:9820/flume/%Y%m%d/%H配置需要获取当前时间,也就是时间戳
a2.sinks.k2.hdfs.fileType 支持的类型有如下几种:
SequenceFile:SequenceFile 是 Hadoop 的二进制文件格式之一,用于存储序列化的键值对。它支持压缩和快速随机访问,适合于大数据集的存储和处理。
DataStream:DataStream 是 Hadoop 2.0 中新引入的一种文件格式,它支持追加写入和并发读取,并且不需要关闭文件流即可完成文件写入。这使得 DataStream 适合于实时数据流的处理。
Text:Text 是 Hadoop 最简单的文本文件格式,每行都是一个文本字符串。因为 Text 文件没有压缩和索引,所以适合小型数据集的存储和处理。
Avro:Avro 是一种数据交换格式,支持动态数据模式,可以根据需要添加、删除或更改字段。在 Hadoop 中,Avro 格式通常用于序列化复杂数据结构,例如 MapReduce 的输入和输出数据。
a2.sinks.k2.hdfs.rollInterval = 60的单位是秒,60s生成一个新文件,a2.sinks.k2.hdfs.rollSize = 134217700是接近128M生成一个新文件,a2.sinks.k2.hdfs.rollCount = 0表示不设置,以上三个参数可以看作一个整体,只要有一个设置了,就可以生效,如果等于0那就是不设置
注意:对于所有与时间相关的转义序列,Event Header 中必须存在以 “timestamp”的key(除非 hdfs.useLocalTimeStamp 设置为 true,此方法会使用 TimestampInterceptor 自动添加timestamp)。a3.sinks.k3.hdfs.useLocalTimeStamp = true
(3)运行 Flume
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf
(4)开启 Hadoop 和 Hive 并操作 Hive 产生日志
- [atguigu@hadoop102 hadoop-2.7.2]$ sbin/start-dfs.sh
- [atguigu@hadoop103 hadoop-2.7.2]$ sbin/start-yarn.sh
- [atguigu@hadoop102 hive]$ bin/hive
(5)在 HDFS 上查看文件。
1)案例需求:使用 Flume 监听整个目录的文件,并上传至 HDFS
2)需求分析:
3)实现步骤:
(1)创建配置文件 flume-dir-hdfs.conf
创建一个文件
[atguigu@hadoop102 job]$ vim flume-dir-hdfs.conf
添加如下内容
- a3.sources = r3
- a3.sinks = k3
- a3.channels = c3
-
- # Describe/configure the source
- a3.sources.r3.type = spooldir
- a3.sources.r3.spoolDir = /opt/module/flume/upload
- a3.sources.r3.fileSuffix = .COMPLETED
- #是否有头文件
- a3.sources.r3.fileHeader = true
- #忽略所有以.tmp 结尾的文件,不上传
- a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
-
- # Describe the sink
- a3.sinks.k3.type = hdfs
- a3.sinks.k3.hdfs.path =
- hdfs://hadoop102:9820/flume/upload/%Y%m%d/%H
- #上传文件的前缀
- a3.sinks.k3.hdfs.filePrefix = upload-
- #是否按照时间滚动文件夹
- a3.sinks.k3.hdfs.round = true
- #多少时间单位创建一个新的文件夹
- a3.sinks.k3.hdfs.roundValue = 1
- #重新定义时间单位
- a3.sinks.k3.hdfs.roundUnit = hour
- #是否使用本地时间戳
- a3.sinks.k3.hdfs.useLocalTimeStamp = true
- #积攒多少个 Event 才 flush 到 HDFS 一次
- a3.sinks.k3.hdfs.batchSize = 100
- #设置文件类型,可支持压缩
- a3.sinks.k3.hdfs.fileType = DataStream
- #多久生成一个新的文件
- a3.sinks.k3.hdfs.rollInterval = 60
- #设置每个文件的滚动大小大概是 128M
- a3.sinks.k3.hdfs.rollSize = 134217700
- #文件的滚动与 Event 数量无关
- a3.sinks.k3.hdfs.rollCount = 0
- # Use a channel which buffers events in memory
- a3.channels.c3.type = memory
- a3.channels.c3.capacity = 1000
- a3.channels.c3.transactionCapacity = 100
- # Bind the source and sink to the channel
- a3.sources.r3.channels = c3
- a3.sinks.k3.channel = c3
(2)启动监控文件夹命令
- [atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name
- a3 --conf-file job/flume-dir-hdfs.conf
注意:在使用 Spooling Directory Source 时,不要在监控目录中创建并持续修改文件;上传完成的文件会以.COMPLETED 结尾;被监控文件夹每 500 毫秒扫描一次文件变动。
(3)向 upload 文件夹中添加文件
在/opt/module/flume 目录下创建 upload 文件夹
[atguigu@hadoop102 flume]$ mkdir upload
向 upload 文件夹中添加文件
- [atguigu@hadoop102 upload]$ touch atguigu.txt
- [atguigu@hadoop102 upload]$ touch atguigu.tmp
- [atguigu@hadoop102 upload]$ touch atguigu.log
(4)查看 HDFS 上的数据
Exec source 适用于监控一个实时追加的文件,不能实现断点续传;
Spooldir Source适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步;
而 Taildir Source适合用于监听多个实时追加的文件,并且能够实现断点续传。
1)案例需求:使用 Flume 监听整个目录的实时追加文件,并上传至 HDFS
2)需求分析:
3)实现步骤:
(1)创建配置文件 flume-taildir-hdfs.conf
创建一个文件
[atguigu@hadoop102 job]$ vim flume-taildir-hdfs.conf
添加如下内容
- a3.sources = r3
- a3.sinks = k3
- a3.channels = c3
-
- # Describe/configure the source
- a3.sources.r3.type = TAILDIR
- a3.sources.r3.positionFile = /opt/module/flume/tail_dir.json
- a3.sources.r3.filegroups = f1 f2
- a3.sources.r3.filegroups.f1 = /opt/module/flume/files/.*file.*
- a3.sources.r3.filegroups.f2 = /opt/module/flume/files2/.*log.*
-
- # Describe the sink
- a3.sinks.k3.type = hdfs
- a3.sinks.k3.hdfs.path = hdfs://hadoop102:9820/flume/upload2/%Y%m%d/%H
- #上传文件的前缀
- a3.sinks.k3.hdfs.filePrefix = upload-
- #是否按照时间滚动文件夹
- a3.sinks.k3.hdfs.round = true
- #多少时间单位创建一个新的文件夹
- a3.sinks.k3.hdfs.roundValue = 1
- #重新定义时间单位
- a3.sinks.k3.hdfs.roundUnit = hour
- #是否使用本地时间戳
- a3.sinks.k3.hdfs.useLocalTimeStamp = true
- #积攒多少个 Event 才 flush 到 HDFS 一次
- a3.sinks.k3.hdfs.batchSize = 100
- #设置文件类型,可支持压缩
- a3.sinks.k3.hdfs.fileType = DataStream
- #多久生成一个新的文件
- a3.sinks.k3.hdfs.rollInterval = 60
- #设置每个文件的滚动大小大概是 128M
- a3.sinks.k3.hdfs.rollSize = 134217700
- #文件的滚动与 Event 数量无关
- a3.sinks.k3.hdfs.rollCount = 0
- # Use a channel which buffers events in memory
- a3.channels.c3.type = memory
- a3.channels.c3.capacity = 1000
- a3.channels.c3.transactionCapacity = 100
- # Bind the source and sink to the channel
- a3.sources.r3.channels = c3
- a3.sinks.k3.channel = c3
骚戴理解:
a3.sources.r3.type是apache flume中用于指定数据源(source)的类型的属性。以下是a3.sources.r3.type可能的值列表:
avro:avrosource,从avro客户端接收事件
netcat:netcatsource,从tcp/ip套接字接收事件
exec:execsource,运行进程并读取其stdout作为事件
spooldir:spooldirectorysource,监视包含事件文件的目录,并将文件内容发送到通道
syslog:syslogtcpsource或syslogudpsource,从tcp或udp端口接收syslog事件
thrift:thriftsource,从thrift客户端接收事件
http:httpsource,从http客户端接收事件
jms:jms source,监听jms队列以接收消息
kafka:kafka source,从kafka topic接收消息
a3.sources.r3.positionFile = /opt/module/flume/tail_dir.json的意思是设置断点信息的存储路径,这里面记录着各种实时同步的信息,如果异常或者宕机导致同步失败了就可以从这里记录的断点继续同步,而不需要重头开始!
以下三句的意思是定义两个文件组,目的就是用了监控多个目录中的文件,f1就监控files里文件名中包含file的文件,f2就监控files里文件名中包含log的文件,如果文件有变化,追加了信息,那就同步到hdfs
a3.sources.r3.filegroups = f1 f2
a3.sources.r3.filegroups.f1 = /opt/module/flume/files/.*file.*
a3.sources.r3.filegroups.f2 = /opt/module/flume/files2/.*log.*
(2)启动监控文件夹命令
- [atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name
- a3 --conf-file job/flume-taildir-hdfs.conf
(3)向 files 文件夹中追加内容
在/opt/module/flume 目录下创建 files 文件夹
[atguigu@hadoop102 flume]$ mkdir files
向 upload 文件夹中添加文件
- [atguigu@hadoop102 files]$ echo hello >> file1.txt
- [atguigu@hadoop102 files]$ echo atguigu >> file2.txt
(4)查看 HDFS 上的数据
Taildir 说明:
Taildir Source 维护了一个 json 格式的 position File,其会定期的往 position File中更新每个文件读取到的最新的位置,因此能够实现断点续传。
Position File 的格式如下:
- {"inode":2496272,"pos":12,"file":"/opt/module/flume/files/file1.t
- xt"}
- {"inode":2496275,"pos":12,"file":"/opt/module/flume/files/file2.t
- xt"}
注:Linux 中储存文件元数据的区域就叫做 inode,每个 inode 都有一个号码,操作系统用 inode 号码来识别不同的文件,Unix/Linux 系统内部不使用文件名,而使用 inode 号码来识别文件。
重要组件:
1)ChannelSelector
ChannelSelector 的作用就是选出 Event 将要被发往哪个 Channel。其共有两种类型,分别是Replicating(复制)和 Multiplexing(多路复用)。
Replicating会将同一个 Event 发往所有的 Channel,Multiplexing 会根据相应的原则,将不同的 Event 发往不同的 Channel。
骚戴理解:Replicating的意思是分发Event给每个Channel,在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的分析系统。此时会用到 Flume 拓扑结构中的 Multiplexing 结构,Multiplexing的原理是,根据 event 中 Header 的某个 key 的值,将不同的 event 发送到不同的 Channel 中,所以我们需要自定义一个 Interceptor,为不同类型的 event 的 Header 中的 key 赋予不同的值。
2)SinkProcessor
SinkProcessor 共 有 三 种 类 型 , 分 别 是 DefaultSinkProcessor 、LoadBalancingSinkProcessor 和 FailoverSinkProcessorDefaultSinkProcessor 对 应 的 是 单 个 的 Sink ,LoadBalancingSinkProcessor 和FailoverSinkProcessor 对应的是 Sink Group(Sink组),LoadBalancingSinkProcessor 可以实现负载均衡的功能,FailoverSinkProcessor 可以错误恢复的功能。
骚戴理解:首先这里要知道一个Sink只能绑定一个Channel,但是一个Channel可以绑定多个Sink!
failoversinkprocessor 是一个用于 apache flume 的 sink 处理器,用于处理事件流并将其写入外部存储系统(如 hadoop、hbase 或 elasticsearch)。它被称为“故障切换”处理器,因为它具有在主要目标无法使用时自动切换到备用目标的能力。它需要给多个Sink定义一个优先级,值越大优先级越高,假如有三个Sink,设置优先级为100,50,10,然后假如优先级为100的Sink挂掉了,那么就会换到优先级50的Sink
通常情况下,flume 将事件传输到单个目标。但是,在某些情况下,这种方法可能导致数据丢失或停机时间增加。例如,如果目标系统不可用,则无法传输事件。这就是 failoversinkprocessor 可以派上用场的地方。failoversinkprocessor 允许您定义多个目标,如果主要目标失败,则会自动切换到备用目标。当主要目标恢复正常时,可以再次切换回主要目标。这确保了高可用性和数据完整性
这种模式是将多个 flume 顺序连接起来了,从最初的 source 开始到最终 sink 传送的目的存储系统。此模式不建议桥接过多的 flume 数量, flume 数量过多不仅会影响传输速率,而且一旦传输过程中某个节点 flume 宕机,会影响整个传输系统。
骚戴理解:avro 是一种数据序列化系统,用于在不同的应用程序、编程语言和平台之间传输数据。它是由 apache 软件基金会开发的一种开放源代码项目,并且被广泛用于 hadoop 生态系统中,如使用 hadoop 的数据处理工具 hive 和 pig。
avro 通过使用二进制编码对数据进行序列化,因此比其他序列化系统(如 xml 或 json)更高效,更节省空间。此外,avro 模式支持动态生成,这意味着您可以定义数据结构并为其生成模式,而无需预先编译代码。另一个 avro 的优势在于其跨语言支持。avro 支持多种编程语言,包括 c、c++、python、ruby、java 和 javascript 等。这使得在不同的应用程序之间共享数据变得容易和灵活。
总之,avro 是一种用于跨平台、跨语言数据序列化的系统,具有高效性和灵活性等优点。
Flume 支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个channel 中,或者将不同数据分发到不同的 channel 中,sink 可以选择传送到不同的目的地
Flume支持使用将多个sink逻辑上分到一个sink组,sink组配合不同的SinkProcessor可以实现负载均衡和错误恢复的功能。
聚合这种模式是我们最常见的,也非常实用,日常 web 应用通常分布在上百个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也非常麻烦。用 flume 的这种组合方式能很好的解决这一问题,每台服务器部署一个 flume 采集日志,传送到一个集中收集日志的 flume,再由此 flume 上传到 hdfs、hive、hbase 等,进行日志分析。
1)案例需求
使用 Flume-1 监控文件变动,Flume-1 将变动内容传递给 Flume-2,Flume-2 负责存储到 HDFS。同时 Flume-1 将变动内容传递给 Flume-3,Flume-3 负责输出到 Local FileSystem。
2)需求分析:
单数据源多出口案例(选择器)
3)实现步骤:
(1)准备工作
在/opt/module/flume/job 目录下创建 group1 文件夹
[atguigu@hadoop102 job]$ cd group1/
在/opt/module/datas/目录下创建 flume3 文件夹
[atguigu@hadoop102 datas]$ mkdir flume3
(2)创建 flume-file-flume.conf
配置 1 个接收日志文件的 source 和两个 channel、两个 sink,分别输送给 flume-flume-hdfs 和 flume-flume-dir。
编辑flume1的配置文件
[atguigu@hadoop102 group1]$ vim flume-file-flume.conf
添加如下内容
- # Name the components on this agent
- a1.sources = r1
- a1.sinks = k1 k2
- a1.channels = c1 c2
- # 将数据流复制给所有 channel
- a1.sources.r1.selector.type = replicating
- # Describe/configure the source
- a1.sources.r1.type = exec
- a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
- a1.sources.r1.shell = /bin/bash -c
-
- # Describe the sink
- # sink 端的 avro 是一个数据发送者
- a1.sinks.k1.type = avro
- a1.sinks.k1.hostname = hadoop102
- a1.sinks.k1.port = 4141
- a1.sinks.k2.type = avro
- a1.sinks.k2.hostname = hadoop102
- a1.sinks.k2.port = 4142
-
- # Describe the channel
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 1000
- a1.channels.c1.transactionCapacity = 100
- a1.channels.c2.type = memory
- a1.channels.c2.capacity = 1000
- a1.channels.c2.transactionCapacity = 100
-
- # Bind the source and sink to the channel
- a1.sources.r1.channels = c1 c2
- a1.sinks.k1.channel = c1
- a1.sinks.k2.channel = c2
(3)创建 flume-flume-hdfs.conf
配置上级 Flume 输出的 Source,输出是到 HDFS 的 Sink。
编辑flume2的配置文件
[atguigu@hadoop102 group1]$ vim flume-flume-hdfs.conf
- 添加如下内容
- # Name the components on this agent
- a2.sources = r1
- a2.sinks = k1
- a2.channels = c1
- # Describe/configure the source
- # source 端的 avro 是一个数据接收服务
- a2.sources.r1.type = avro
- a2.sources.r1.bind = hadoop102
- a2.sources.r1.port = 4141
- # Describe the sink
- a2.sinks.k1.type = hdfs
- a2.sinks.k1.hdfs.path = hdfs://hadoop102:9820/flume2/%Y%m%d/%H
- #上传文件的前缀
- a2.sinks.k1.hdfs.filePrefix = flume2-
- #是否按照时间滚动文件夹
- a2.sinks.k1.hdfs.round = true
- #多少时间单位创建一个新的文件夹
- a2.sinks.k1.hdfs.roundValue = 1
- #重新定义时间单位
- a2.sinks.k1.hdfs.roundUnit = hour
- #是否使用本地时间戳
- a2.sinks.k1.hdfs.useLocalTimeStamp = true
- #积攒多少个 Event 才 flush 到 HDFS 一次
- a2.sinks.k1.hdfs.batchSize = 100
- #设置文件类型,可支持压缩
- a2.sinks.k1.hdfs.fileType = DataStream
- #多久生成一个新的文件
- a2.sinks.k1.hdfs.rollInterval = 30
- #设置每个文件的滚动大小大概是 128M
- a2.sinks.k1.hdfs.rollSize = 134217700
- #文件的滚动与 Event 数量无关
- a2.sinks.k1.hdfs.rollCount = 0
- # Describe the channel
- a2.channels.c1.type = memory
- a2.channels.c1.capacity = 1000
- a2.channels.c1.transactionCapacity = 100
- # Bind the source and sink to the channel
- a2.sources.r1.channels = c1
- a2.sinks.k1.channel = c1
(4)创建 flume-flume-dir.conf
配置上级 Flume 输出的 Source,输出是到本地目录的 Sink。
编辑flume3的配置文件
[atguigu@hadoop102 group1]$ vim flume-flume-dir.conf
添加如下内容
- # Name the components on this agent
- a3.sources = r1
- a3.sinks = k1
- a3.channels = c2
- # Describe/configure the source
- a3.sources.r1.type = avro
- a3.sources.r1.bind = hadoop102
- a3.sources.r1.port = 4142
- # Describe the sink
- a3.sinks.k1.type = file_roll
- a3.sinks.k1.sink.directory = /opt/module/data/flume3
- # Describe the channel
- a3.channels.c2.type = memory
- a3.channels.c2.capacity = 1000
- a3.channels.c2.transactionCapacity = 100
- # Bind the source and sink to the channel
- a3.sources.r1.channels = c2
- a3.sinks.k1.channel = c2
提示:输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目录。
(5)执行配置文件
分别启动对应的 flume 进程:flume-flume-dir,flume-flume-hdfs,flume-file-flume。
- [atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name
- a3 --conf-file job/group1/flume-flume-dir.conf
- [atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name
- a2 --conf-file job/group1/flume-flume-hdfs.conf
- [atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name
- a1 --conf-file job/group1/flume-file-flume.conf
(6)启动 Hadoop 和 Hive
- [atguigu@hadoop102 hadoop-2.7.2]$ sbin/start-dfs.sh
- [atguigu@hadoop103 hadoop-2.7.2]$ sbin/start-yarn.sh
- [atguigu@hadoop102 hive]$ bin/hive
- hive (default)>
(7)检查 HDFS 上数据
(8)检查/opt/module/datas/flume3 目录中数据
- [atguigu@hadoop102 flume3]$ ll
- 总用量 8
- -rw-rw-r--. 1 atguigu atguigu 5942 5 月 22 00:09 1526918887550-3
1)案例需求
使用 Flume1 监控一个端口,其 sink 组中的 sink 分别对接 Flume2 和 Flume3,采用FailoverSinkProcessor,实现故障转移的功能。
2)需求分析
3)实现步骤
(1)准备工作
在/opt/module/flume/job 目录下创建 group2 文件夹
[atguigu@hadoop102 job]$ cd group2/
(2)创建 flume-netcat-flume.conf
配置 1 个 netcat source 和 1 个 channel、1 个 sink group(2 个 sink),分别输送给flume-flume-console1 和 flume-flume-console2。
编辑flume1配置文件
[atguigu@hadoop102 group2]$ vim flume-netcat-flume.conf
添加如下内容
- # Name the components on this agent
- a1.sources = r1
- a1.channels = c1
- a1.sinkgroups = g1
- a1.sinks = k1 k2
- # Describe/configure the source
- a1.sources.r1.type = netcat
- a1.sources.r1.bind = localhost
- a1.sources.r1.port = 44444
-
- a1.sinkgroups.g1.processor.type = failover
- a1.sinkgroups.g1.processor.priority.k1 = 5
- a1.sinkgroups.g1.processor.priority.k2 = 10
- a1.sinkgroups.g1.processor.maxpenalty = 10000
-
- # Describe the sink
- a1.sinks.k1.type = avro
- a1.sinks.k1.hostname = hadoop102
- a1.sinks.k1.port = 4141
- a1.sinks.k2.type = avro
- a1.sinks.k2.hostname = hadoop102
- a1.sinks.k2.port = 4142
- # Describe the channel
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 1000
- a1.channels.c1.transactionCapacity = 100
- # Bind the source and sink to the channel
- a1.sources.r1.channels = c1
- a1.sinkgroups.g1.sinks = k1 k2
- a1.sinks.k1.channel = c1
- a1.sinks.k2.channel = c1
骚戴理解:
a1.sinkgroups.g1.processor.type=failover: 这告诉flume使用failover processor来处理该管道中的事件。当主通道不可用时,failover processor会自动切换到备用通道。
a1.sinkgroups.g1.processor.priority.k1=5: 这定义了备用通道的优先级。在此示例中,备用通道具有较低的优先级,因为其值较小。
a1.sinkgroups.g1.processor.priority.k2=10: 这定义了主要通道的优先级。在此示例中,主通道具有更高的优先级,因为其值较大。
a1.sinkgroups.g1.processor.maxpenalty=10000: 这指定在主通道恢复之前尝试连接备用通道的最大惩罚时间(以毫秒为单位)。如果主通道在这段时间内没有恢复,则failover processor将永久地使用备用通道。
(3)创建 flume-flume-console1.conf
配置上级 Flume 输出的 Source,输出是到本地控制台。
编辑flume2配置文件
[atguigu@hadoop102 group2]$ vim flume-flume-console1.conf
添加如下内容
- # Name the components on this agent
- a2.sources = r1
- a2.sinks = k1
- a2.channels = c1
- # Describe/configure the source
- a2.sources.r1.type = avro
- a2.sources.r1.bind = hadoop102
- a2.sources.r1.port = 4141
- # Describe the sink
- a2.sinks.k1.type = logger
- # Describe the channel
- a2.channels.c1.type = memory
- a2.channels.c1.capacity = 1000
- a2.channels.c1.transactionCapacity = 100
- # Bind the source and sink to the channel
- a2.sources.r1.channels = c1
- a2.sinks.k1.channel = c1
(4)创建 flume-flume-console2.conf
配置上级 Flume 输出的 Source,输出是到本地控制台。
编辑flume3配置文件
- [atguigu@hadoop102 group2]$ vim flume-flume-console2.conf
- 添加如下内容
- # Name the components on this agent
- a3.sources = r1
- a3.sinks = k1
- a3.channels = c2
- # Describe/configure the source
- a3.sources.r1.type = avro
- a3.sources.r1.bind = hadoop102
- a3.sources.r1.port = 4142
- # Describe the sink
- a3.sinks.k1.type = logger
- # Describe the channel
- a3.channels.c2.type = memory
- a3.channels.c2.capacity = 1000
- a3.channels.c2.transactionCapacity = 100
- # Bind the source and sink to the channel
- a3.sources.r1.channels = c2
- a3.sinks.k1.channel = c2
(5)执行配置文件
分别开启对应配置文件:flume-flume-console2,flume-flume-console1,flume-netcat-flume。
- [atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name
- a3 --conf-file job/group2/flume-flume-console2.conf -
- Dflume.root.logger=INFO,console
- [atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name
- a2 --conf-file job/group2/flume-flume-console1.conf -
- Dflume.root.logger=INFO,console
- [atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name
- a1 --conf-file job/group2/flume-netcat-flume.conf
(6)使用 netcat 工具向本机的 44444 端口发送内容
$ nc localhost 44444
(7)查看 Flume2 及 Flume3 的控制台打印日志
(8)将 Flume2 kill杀掉,观察 Flume3 的控制台打印情况。
注:使用 jps -ml 查看 Flume 进程。
1)案例需求:
hadoop102 上的 Flume-1 监控文件/opt/module/group.log,hadoop103 上的 Flume-2 监控某一个端口的数据流,Flume-1 与 Flume-2 将数据发送给 hadoop104 上的 Flume-3,Flume-3 将最终数据打印到控制台。
2)需求分析
多数据源汇总案例
3)实现步骤:
(1)准备工作
分发 Flume
[atguigu@hadoop102 module]$ xsync flume
在 hadoop102、hadoop103 以及 hadoop104 的/opt/module/flume/job 目录下创建一个group3 文件夹。
- [atguigu@hadoop102 job]$ mkdir group3
- [atguigu@hadoop103 job]$ mkdir group3
- [atguigu@hadoop104 job]$ mkdir group3
(2)创建 flume1-logger-flume.conf
配置 Source 用于监控 hive.log 文件,配置 Sink 输出数据到下一级 Flume。在 hadoop102 上编辑配置文件
- [atguigu@hadoop102 group3]$ vim flume1-logger-flume.conf
- 添加如下内容
- # Name the components on this agent
- a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
- # Describe/configure the source
- a1.sources.r1.type = exec
- a1.sources.r1.command = tail -F /opt/module/group.log
- a1.sources.r1.shell = /bin/bash -c
- # Describe the sink
- a1.sinks.k1.type = avro
- a1.sinks.k1.hostname = hadoop104
- a1.sinks.k1.port = 4141
- # Describe the channel
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 1000
- a1.channels.c1.transactionCapacity = 100
- # Bind the source and sink to the channel
- a1.sources.r1.channels = c1
- a1.sinks.k1.channel = c1
(3)创建 flume2-netcat-flume.conf
配置 Source 监控端口 44444 数据流,配置 Sink 数据到下一级 Flume:在 hadoop103 上编辑配置文件
[atguigu@hadoop102 group3]$ vim flume2-netcat-flume.conf
添加如下内容
- # Name the components on this agent
- a2.sources = r1
- a2.sinks = k1
- a2.channels = c1
- # Describe/configure the source
- a2.sources.r1.type = netcat
- a2.sources.r1.bind = hadoop103
- a2.sources.r1.port = 44444
- # Describe the sink
- a2.sinks.k1.type = avro
- a2.sinks.k1.hostname = hadoop104
- a2.sinks.k1.port = 4141
- # Use a channel which buffers events in memory
- a2.channels.c1.type = memory
- a2.channels.c1.capacity = 1000
- a2.channels.c1.transactionCapacity = 100
- # Bind the source and sink to the channel
- a2.sources.r1.channels = c1
- a2.sinks.k1.channel = c1
(4)创建 flume3-flume-logger.conf
配置 source 用于接收 flume1 与 flume2 发送过来的数据流,最终合并后 sink 到控制台。
在 hadoop104 上编辑配置文件
- [atguigu@hadoop104 group3]$ touch flume3-flume-logger.conf
- [atguigu@hadoop104 group3]$ vim flume3-flume-logger.conf
添加如下内容
- # Name the components on this agent
- a3.sources = r1
- a3.sinks = k1
- a3.channels = c1
- # Describe/configure the source
- a3.sources.r1.type = avro
- a3.sources.r1.bind = hadoop104
- a3.sources.r1.port = 4141
- # Describe the sink
- # Describe the sink
- a3.sinks.k1.type = logger
- # Describe the channel
- a3.channels.c1.type = memory
- a3.channels.c1.capacity = 1000
- a3.channels.c1.transactionCapacity = 100
- # Bind the source and sink to the channel
- a3.sources.r1.channels = c1
- a3.sinks.k1.channel = c1
骚戴理解:这里a3.sources.r1.bind = hadoop104和a3.sources.r1.port = 4141我一开始觉得应该有两个主机和两个端口,后面看前面两个Flume发现他们用的都是同一个主机和一个端口,所以这里只需要配置一个主机和一个端口
(5)执行配置文件
分别开启对应配置文件:flume3-flume-logger.conf,flume2-netcat-flume.conf,flume1-logger-flume.conf。
- [atguigu@hadoop104 flume]$ bin/flume-ng agent --conf conf/ --name
- a3 --conf-file job/group3/flume3-flume-logger.conf -
- Dflume.root.logger=INFO,console
- [atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name
- a2 --conf-file job/group3/flume1-logger-flume.conf
- [atguigu@hadoop103 flume]$ bin/flume-ng agent --conf conf/ --name
- a1 --conf-file job/group3/flume2-netcat-flume.conf
(6)在 hadoop103 上向/opt/module 目录下的 group.log 追加内容
[atguigu@hadoop103 module]$ echo 'hello' > group.log
(7)在 hadoop102 上向 44444 端口发送数据
[atguigu@hadoop102 flume]$ telnet hadoop102 44444
(8)检查 hadoop104 上数据
1)案例需求
使用 Flume 采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统。
2)需求分析
在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的分析系统。此时会用到 Flume 拓扑结构中的 Multiplexing 结构,Multiplexing的原理是,根据 event 中 Header 的某个 key 的值,将不同的 event 发送到不同的 Channel 中,所以我们需要自定义一个 Interceptor,为不同类型的 event 的 Header 中的 key 赋予不同的值。
在该案例中,我们以端口数据模拟日志,以是否包含”atguigu”模拟不同类型的日志,我们需要自定义 interceptor 区分数据中是否包含”atguigu”,将其分别发往不同的分析系统(Channel)。
Interceptor和Multiplexing ChannelSelector案例
3)实现步骤
(1)创建一个 maven 项目,并引入以下依赖。
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-core</artifactId>
- <version>1.9.0</version>
- </dependency>
(2)定义 CustomInterceptor 类并实现 Interceptor 接口。
- package com.atguigu.interceptor;
- import org.apache.flume.Context;
- import org.apache.flume.Event;
- import org.apache.flume.interceptor.Interceptor;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Map;
- public class TypeInterceptor implements Interceptor {
- //声明一个存放事件的集合
- private List<Event> addHeaderEvents;
- @Override
- public void initialize() {
- //初始化存放事件的集合
- addHeaderEvents = new ArrayList<>();
- }
- //单个事件拦截
- @Override
- public Event intercept(Event event) {
- //1.获取事件中的头信息
- Map<String, String> headers = event.getHeaders();
- //2.获取事件中的 body 信息
- String body = new String(event.getBody());
- //3.根据 body 中是否有"atguigu"来决定添加怎样的头信息
- if (body.contains("atguigu")) {
- //4.添加头信息
- headers.put("type", "first");
- } else {
- //4.添加头信息
- headers.put("type", "second");
- }
- return event;
- }
- //批量事件拦截
- @Override
- public List<Event> intercept(List<Event> events) {
- //1.清空集合
- addHeaderEvents.clear();
- //2.遍历 events
- for (Event event : events) {
- //3.给每一个事件添加头信息
- addHeaderEvents.add(intercept(event));
- }
- //4.返回结果
- return addHeaderEvents;
- }
- @Override
- public void close() {
- }
-
- public static class Builder implements Interceptor.Builder {
- @Override
- public Interceptor build() {
- return new TypeInterceptor();
- }
- @Override
- public void configure(Context context) {
- }
- }
- }
骚戴理解: headers.put("type", "first");和 headers.put("type", "second");这里的K必须是一致的,因为后面Multiplexing的配置文件里会根据K的值来分发到不同的Channel,也就是下面 flume 配置文件里的a1.sources.r1.selector.header = type。
以下代码在实现拦截器的时候很容易漏掉,这个静态内部类主要是后面配置文件里需要用到它来构造这个拦截器对象,也就是 flume 配置文件里com.atguigu.flume.interceptor.CustomInterceptor$Builder的$Builder,$Builder其实就是调用这个拦截器的静态内部类
- public static class Builder implements Interceptor.Builder {
- @Override
- public Interceptor build() {
- return new TypeInterceptor();
- }
- @Override
- public void configure(Context context) {
- }
- }
(3)编辑 flume 配置文件
为 hadoop102 上的 Flume1 配置 1 个 netcat source,1 个 sink group(2 个 avro sink)+,并配置相应的 ChannelSelector 和 interceptor。
- # Name the components on this agent
- a1.sources = r1
- a1.sinks = k1 k2
- a1.channels = c1 c2
- # Describe/configure the source
- a1.sources.r1.type = netcat
- a1.sources.r1.bind = localhost
- a1.sources.r1.port = 44444
-
- Multiplexing结构的拦截器配置
- a1.sources.r1.interceptors = i1
- a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.CustomInterceptor$Builder
- a1.sources.r1.selector.type = multiplexing
- a1.sources.r1.selector.header = type
- a1.sources.r1.selector.mapping.first = c1
- a1.sources.r1.selector.mapping.second = c2
-
- # Describe the sink
- a1.sinks.k1.type = avro
- a1.sinks.k1.hostname = hadoop103
- a1.sinks.k1.port = 4141
- a1.sinks.k2.type=avro
- a1.sinks.k2.hostname = hadoop104
- a1.sinks.k2.port = 4242
- # Use a channel which buffers events in memory
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 1000
- a1.channels.c1.transactionCapacity = 100
- # Use a channel which buffers events in memory
- a1.channels.c2.type = memory
- a1.channels.c2.capacity = 1000
- a1.channels.c2.transactionCapacity = 100
- # Bind the source and sink to the channel
- a1.sources.r1.channels = c1 c2
- a1.sinks.k1.channel = c1
- a1.sinks.k2.channel = c2
骚戴理解:
a1.sources.r1.interceptors = i1这里是定义拦截器的别名,可以有多个,方便下面配置引用
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.CustomInterceptor$Builder
是定义了具体的拦截器是哪一个,通过全限定名类名然后反射去获取
a1.sources.r1.selector.type = multiplexing是定义了Source把Event分发给Channel的策略,也就是根据请求头的K的值来分发,这需要用到拦截器
a1.sources.r1.selector.header = type是定义请求头里的K,这个要和拦截器保持一致
a1.sources.r1.selector.mapping.first = c1
a1.sources.r1.selector.mapping.second = c2
为 hadoop103 上的 Flume4 配置一个 avro source 和一个 logger sink。
- a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
- a1.sources.r1.type = avro
- a1.sources.r1.bind = hadoop103
- a1.sources.r1.port = 4141
- a1.sinks.k1.type = logger
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 1000
- a1.channels.c1.transactionCapacity = 100
- a1.sinks.k1.channel = c1
- a1.sources.r1.channels = c1
为 hadoop104 上的 Flume3 配置一个 avro source 和一个 logger sink。
- a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
- a1.sources.r1.type = avro
- a1.sources.r1.bind = hadoop104
- a1.sources.r1.port = 4242
- a1.sinks.k1.type = logger
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 1000
- a1.channels.c1.transactionCapacity = 100
- a1.sinks.k1.channel = c1
- a1.sources.r1.channels = c1
(4)分别在 hadoop102,hadoop103,hadoop104 上启动 flume 进程,注意先后顺序。
(5)在 hadoop102 使用 netcat 向 localhost:44444 发送字母和数字。
(6)观察 hadoop103 和 hadoop104 打印的日志。
1)介绍
Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。官方提供的 source 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 source。
官方也提供了自定义 source 的接口:
https://flume.apache.org/FlumeDeveloperGuide.html#source 根据官方说明自定义
MySource 需要继承 AbstractSource 类并实现 Configurable 和 PollableSource 接口。
实现相应方法:
getBackOffSleepIncrement() //backoff 步长
getMaxBackOffSleepInterval()//backoff 最长时间
configure(Context context)//初始化 context(读取配置文件内容)
process()//获取数据封装成 event 并写入 channel,这个方法将被循环调用。
使用场景:读取 MySQL 数据或者其他文件系统。
2)需求
使用 flume 接收数据,并给每条数据添加前缀,输出到控制台。前缀可从 flume 配置文件中配置。
自定义Source需求
3)分析
自定义Source需求分析
4)编码
(1)导入 pom 依赖
- <dependencies>
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-core</artifactId>
- <version>1.9.0</version>
- </dependency>
(2)编写代码
- package com.atguigu;
- import org.apache.flume.Context;
- import org.apache.flume.EventDeliveryException;
- import org.apache.flume.PollableSource;
- import org.apache.flume.conf.Configurable;
- import org.apache.flume.event.SimpleEvent;
- import org.apache.flume.source.AbstractSource;
- import java.util.HashMap;
- public class MySource extends AbstractSource implements
- Configurable, PollableSource {
- //定义配置文件将来要读取的字段
- private Long delay;
- private String field;
- //初始化配置信息
- @Override
- public void configure(Context context) {
- delay = context.getLong("delay");
- field = context.getString("field", "Hello!");
- }
- @Override
- public Status process() throws EventDeliveryException {
- try {
- //创建事件头信息
- HashMap<String, String> hearderMap = new HashMap<>();
- //创建事件
- SimpleEvent event = new SimpleEvent();
- //循环封装事件
- for (int i = 0; i < 5; i++) {
- //给事件设置头信息
- event.setHeaders(hearderMap);
- //给事件设置内容
- event.setBody((field + i).getBytes());
- //将事件写入 channel
- getChannelProcessor().processEvent(event);
- Thread.sleep(delay);
- }
- } catch (Exception e) {
- e.printStackTrace();
- return Status.BACKOFF;
- }
- return Status.READY;
- }
- @Override
- public long getBackOffSleepIncrement() {
- return 0;
- }
- @Override
- public long getMaxBackOffSleepInterval() {
- return 0;
- }
- }
5)测试
(1)打包
将写好的代码打包,并放到 flume 的 lib 目录(/opt/module/flume)下。
(2)配置文件
- # Name the components on this agent
- a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
- # Describe/configure the source
- a1.sources.r1.type = com.atguigu.MySource
- a1.sources.r1.delay = 1000
- #a1.sources.r1.field = atguigu
- # Describe the sink
- a1.sinks.k1.type = logger
- # Use a channel which buffers events in memory
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 1000
- a1.channels.c1.transactionCapacity = 100
- # Bind the source and sink to the channel
- a1.sources.r1.channels = c1
- a1.sinks.k1.channel = c1
(3)开启任务
- [atguigu@hadoop102 flume]$ pwd
- /opt/module/flume
- [atguigu@hadoop102 flume]$ bin/flume-ng agent -c conf/ -f
- job/mysource.conf -n a1 -Dflume.root.logger=INFO,console
(4)结果展示
1)介绍
Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。
Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提交事务。事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件。
Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定义。官方提供的 Sink 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 Sink。
官方也提供了自定义 sink 的接口:
https://flume.apache.org/FlumeDeveloperGuide.html#sink 根据官方说明自定义MySink 需要继承 AbstractSink 类并实现 Configurable 接口。
实现相应方法:
configure(Context context)//初始化 context(读取配置文件内容)
process()//从 Channel 读取获取数据(event),这个方法将被循环调用。
使用场景:读取 Channel 数据写入 MySQL 或者其他文件系统。
2)需求
使用 flume 接收数据,并在 Sink 端给每条数据添加前缀和后缀,输出到控制台。前后缀可在 flume 任务配置文件中配置。
流程分析:
3)编码
- package com.atguigu;
- import org.apache.flume.*;
- import org.apache.flume.conf.Configurable;
- import org.apache.flume.sink.AbstractSink;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- public class MySink extends AbstractSink implements Configurable {
- //创建 Logger 对象
- private static final Logger LOG =
- LoggerFactory.getLogger(AbstractSink.class);
- private String prefix;
- private String suffix;
- @Override
- public Status process() throws EventDeliveryException {
- //声明返回值状态信息
- Status status;
- //获取当前 Sink 绑定的 Channel
- Channel ch = getChannel();
- //获取事务
- Transaction txn = ch.getTransaction();
- //声明事件
- Event event;
- //开启事务
- txn.begin();
-
- //读取 Channel 中的事件,直到读取到事件结束循环
- while (true) {
- event = ch.take();
- if (event != null) {
- break;
- }
- }
- try {
- //处理事件(打印)
- LOG.info(prefix + new String(event.getBody()) + suffix);
- //事务提交
- txn.commit();
- status = Status.READY;
- } catch (Exception e) {
- //遇到异常,事务回滚
- txn.rollback();
- status = Status.BACKOFF;
- } finally {
- //关闭事务
- txn.close();
- }
- return status;
- }
- @Override
- public void configure(Context context) {
- //读取配置文件内容,有默认值
- prefix = context.getString("prefix", "hello:");
- //读取配置文件内容,无默认值
- suffix = context.getString("suffix");
- }
- }
4)测试
(1)打包
将写好的代码打包,并放到 flume 的 lib 目录(/opt/module/flume)下。
(2)配置文件
- # Name the components on this agent
- a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
- # Describe/configure the source
- a1.sources.r1.type = netcat
- a1.sources.r1.bind = localhost
- a1.sources.r1.port = 44444
-
- # Describe the sink
- a1.sinks.k1.type = com.atguigu.MySink
- #a1.sinks.k1.prefix = atguigu:
- a1.sinks.k1.suffix = :atguigu
- # Use a channel which buffers events in memory
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 1000
- a1.channels.c1.transactionCapacity = 100
- # Bind the source and sink to the channel
- a1.sources.r1.channels = c1
- a1.sinks.k1.channel = c1
(3)开启任务
- [atguigu@hadoop102 flume]$ bin/flume-ng agent -c conf/ -f
- job/mysink.conf -n a1 -Dflume.root.logger=INFO,console
- [atguigu@hadoop102 ~]$ nc localhost 44444
- hello
- OK
- atguigu
- OK
(4)结果展示
Ganglia 由 gmond、gmetad 和 gweb 三部分组成。gmond(Ganglia Monitoring Daemon)是一种轻量级服务,安装在每台需要收集指标数据的节点主机上。使用 gmond,你可以很容易收集很多系统指标数据,如 CPU、内存、磁盘、网络和活跃进程的数据等。
gmetad(Ganglia Meta Daemon)整合所有信息,并将其以 RRD 格式存储至磁盘的服务。gweb(Ganglia Web)Ganglia 可视化工具,gweb 是一种利用浏览器显示 gmetad 所存储数据的 PHP 前端。在 Web 界面中以图表方式展现集群的运行状态下收集的多种不同指标数据。
1)安装 ganglia
(1)规划
- hadoop102: web gmetad gmod
- hadoop103: gmod
- hadoop104: gmod
(2)在 102 103 104 分别安装 epel-release
[atguigu@hadoop102 flume]$ sudo yum -y install epel-release
(3)在 102 安装
- [atguigu@hadoop102 flume]$ sudo yum -y install ganglia-gmetad
- [atguigu@hadoop102 flume]$ sudo yum -y install ganglia-web
- [atguigu@hadoop102 flume]$ sudo yum -y install ganglia-gmond
(4)在 103 和 104 安装
[atguigu@hadoop102 flume]$ sudo yum -y install ganglia-gmond
2)在 102 修改配置文件/etc/httpd/conf.d/ganglia.conf
[atguigu@hadoop102 flume]$ sudo vim
/etc/httpd/conf.d/ganglia.conf
修改为红颜色的配置:
- # Ganglia monitoring system php web frontend
- #
- Alias /ganglia /usr/share/ganglia
- <Location /ganglia>
- # Require local
- # 通过 windows 访问 ganglia,需要配置 Linux 对应的主机(windows)ip 地址
- Require ip 192.168.9.1
- # Require ip 10.1.2.3
- # Require host example.org
- </Location>
5)在 102 修改配置文件/etc/ganglia/gmetad.conf
[atguigu@hadoop102 flume]$ sudo vim /etc/ganglia/gmetad.conf
修改为:data_source "my cluster" hadoop102
6)在 102 103 104 修改配置文件/etc/ganglia/gmond.conf
[atguigu@hadoop102 flume]$ sudo vim /etc/ganglia/gmond.conf
修改为:
- cluster {
- name = "my cluster"
- owner = "unspecified"
- latlong = "unspecified"
- url = "unspecified"
- }
- udp_send_channel {
- #bind_hostname = yes # Highly recommended, soon to be default.
- # This option tells gmond to use a source
- address
- # that resolves to the machine's hostname.
- Without
- # this, the metrics may appear to come from
- any
- # interface and the DNS names associated with
- # those IPs will be used to create the RRDs.
- # mcast_join = 239.2.11.71
- # 数据发送给 hadoop102
- host = hadoop102
- port = 8649
- ttl = 1
- }
-
- udp_recv_channel {
- # mcast_join = 239.2.11.71
- port = 8649
- # 接收来自任意连接的数据
- bind = 0.0.0.0
- retry_bind = true
- # Size of the UDP buffer. If you are handling lots of metrics
- you really
- # should bump it up to e.g. 10MB or even higher.
- # buffer = 10485760
- }
7)在 102 修改配置文件/etc/selinux/config
[atguigu@hadoop102 flume]$ sudo vim /etc/selinux/config
修改为:
- # This file controls the state of SELinux on the system.
- # SELINUX= can take one of these three values:
- # enforcing - SELinux security policy is enforced.
- # permissive - SELinux prints warnings instead of enforcing.
- # disabled - No SELinux policy is loaded.
- SELINUX=disabled
- # SELINUXTYPE= can take one of these two values:
- # targeted - Targeted processes are protected,
- # mls - Multi Level Security protection.
- SELINUXTYPE=targeted
尖叫提示:selinux 生效需要重启,如果此时不想重启,可以临时生效之:
[atguigu@hadoop102 flume]$ sudo setenforce 0
8)启动 ganglia
(1)在 102 103 104 启动
[atguigu@hadoop102 flume]$ sudo systemctl start gmond
(2)在 102 启动
- [atguigu@hadoop102 flume]$ sudo systemctl start httpd
- [atguigu@hadoop102 flume]$ sudo systemctl start gmetad
9)打开网页浏览 ganglia 页面
尖叫提示:如果完成以上操作依然出现权限不足错误,请修改/var/lib/ganglia 目录
的权限:
[atguigu@hadoop102 flume]$ sudo chmod -R 777 /var/lib/ganglia
1)启动 Flume 任务
- [atguigu@hadoop102 flume]$ bin/flume-ng agent \
- -c conf/ \
- -n a1 \
- -f job/flume-netcat-logger.conf \
- -Dflume.root.logger=INFO,console \
- -Dflume.monitoring.type=ganglia \
- -Dflume.monitoring.hosts=hadoop102:8649
2)发送数据观察 ganglia 监测图
[atguigu@hadoop102 flume]$ nc localhost 44444
样式如图:
图例说明:
使用第三方框架 Ganglia 实时监控 Flume。
型?
1)作用
(1)Source 组件是专门用来收集数据的,可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy
(2)Channel 组件对采集到的数据进行缓存,可以存放在 Memory 或 File 中。
(3)Sink 组件是用于把数据发送到目的地的组件,目的地包括 Hdfs、Logger、avro、thrift、ipc、file、Hbase、solr、自定义。
2)我公司采用的 Source 类型为:
(1)监控后台日志:exec
(2)监控后台产生日志的端口:netcat
Flume Channel Selectors
1)Source
增加 Source 个(使用 Tair Dir Source 时可增加 FileGroups 个数)可以增大 Source的读取数据的能力。例如:当某一个目录产生的文件过多时需要将这个文件目录拆分成多个文件目录,同时配置好多个 Source 以保证 Source 有足够的能力获取到新产生的数据。
batchSize 参数决定 Source 一次批量运输到 Channel 的 event 条数,适当调大这个参数可以提高 Source 搬运 Event 到 Channel 时的性能。
2)Channel
type 选择 memory 时 Channel 的性能最好,但是如果 Flume 进程意外挂掉可能会丢失数据。type 选择 file 时 Channel 的容错性更好,但是性能上会比 memory channel 差。
使用 file Channel 时 dataDirs 配置多个不同盘下的目录可以提高性能。
Capacity 参数决定 Channel 可容纳最大的 event 条数。transactionCapacity 参数决定每次 Source 往 channel 里面写的最大 event 条数和每次 Sink 从 channel 里面读的最大
event 条数。transactionCapacity 需要大于 Source 和 Sink 的 batchSize 参数。
3)Sink
增加 Sink 的个数可以增加 Sink 消费 event 的能力。Sink 也不是越多越好够用就行, 过多的 Sink 会占用系统资源,造成系统资源不必要的浪费。batchSize 参数决定 Sink 一次批量从 Channel 读取的 event 条数,适当调大这个参数可以提高 Sink 从 Channel 搬出 event 的性能。
Flume 的事务机制(类似数据库的事务机制):Flume 使用两个独立的事务分别负责从Soucrce 到 Channel,以及从 Channel 到 Sink 的事件传递。
比如 spooling directory source 为文件的每一行创建一个事件,一旦事务中所有的事件全部传递到 Channel 且提交成功,那么 Soucrce 就将该文件标记为完成。同理,事务以类似的方式处理从 Channel 到 Sink 的传递过程,如果因为某种原因使得事件无法记录,那么事务将会回滚。且所有的事件都会保持到 Channel 中,等待重新传递。
根据 Flume 的架构原理,Flume 是不可能丢失数据的,其内部有完善的事务机制,Source 到 Channel 是事务性的,Channel 到 Sink 是事务性的,因此这两个环节不会出现数据的丢失,唯一可能丢失数据的情况是 Channel 采用 memoryChannel,agent 宕机导致数据丢失,或者 Channel 存储数据已满,导致 Source 不再写入,未写入的数据丢失。
Flume 不会丢失数据,但是有可能造成数据的重复,例如数据已经成功由 Sink 发出,但是没有接收到响应,Sink 会再次发送数据,此时可能会导致数据的重复
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。