当前位置:   article > 正文

大数据——Flume_.sinks.k1.topic

.sinks.k1.topic

大数据 —— Flume


Flume简介

FlumeCloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据,它提供了从console(控制台)、RPC(Thrift-RPC)text(文件)、tail(UNIX tail)syslogsyslog日志系统),支持TCPUDP2种模式),exec(命令执行)等数据源上收集数据的能力;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制,如HDFSHBase等)的能力。

Flume结构

Agent主要由:sourcechannelsink三个组件组成:
(1)、Source
从数据发生器接收数据,并将接收的数据以Flumeevent格式传递给一个或者多个通道channelFlume提供多种数据接收的方式,比如AvroThrifttwitter1%等。
(2)、Channel
channel是一种短暂的存储容器,它将从source处接收到的event格式的数据缓存起来,直到它们被sinks消费掉,它在sourcesink间起着一共桥梁的作用,channal是一个完整的事务,这一点保证了数据在收发的时候的一致性。并且它可以和任意数量的sourcesink链接。支持的类型有:JDBC channelFile System channelMemort channel等。
(3)、sink
sink将数据存储到集中存储器比如HbaseHDFS,它从channals消费数据(events)并将其传递给目标地。目标地可能是另一个sink,也可能HDFSHBase

Flume安装

(1)、将下载的flume包,解压到指定目录中。
(2)、将flume/conf/目录中的flume-env.sh.template文件重命名为flume-env.sh或者拷贝一份再重命名,然后配置flume-env.sh文件,将其中的JAVA_HOME变量设置为自己的JDK安装目录。
(3)、配置环境变量,修改~/.bashrc文件(可选,如果不配置,那么每次执行命令都要进入flume/bin目录,很不方便):

export FLUME_HOME=(flume安装位置)
export PATH=$PATH:$FLUME_HOME/bin
  • 1
  • 2

(4)、输入flume-ng version命令,出现版本信息即安装成功。

Flume常用命令

  • help:帮助命令,flume-ng help
  • agent:启动一个agent
  • --conf,-c:指定配置文件,一般用flume/conf/
  • --name,-n:给当前agent起名字,要和自定义配置文件中起的名字一致,一般就用a1
  • --conf-file,-f:指定自己写的conf文件。
  • -Dflume.root.logger=INFO,console:将运行日志输出到控制台。

完整的运行命令:flume-ng agent --conf flume/conf/ --name a1 --conf-file data/flume/flume-2-kafka.conf -Dflume.root.logger=INFO,console
或者:flume-ng agent -c flume/conf/ -n a1 -f data/flume/flume-2-kafka.conf -Dflume.root.logger=INFO,console

Flume实例

1. 监控一个文件,实时采集新增的数据输出到控制台

agent选型:exec source + memory channel + logger sink

利用tail -F命令监控文件。

编写配置文件flume-file-console.conf

# 此处agent命名为a1,那么执行文件的时候命令也要写a1
#Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/au/software/data/flume/test.log # 此处为需要监控的文件
# 命令从-c后的字符串读取
a1.sources.r1.shell = /bin/sh -c

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

启动agentflume-ng agent -c ~/software/flume/conf/ -n a1 -f flume-file-console.conf -Dflume.root.logger=INFO,console

测试,往test.log文件写入内容(echo "hello" >> test.log),控制台会打印该内容:

# 控制台打印的内容:
2019-07-21 09:25:50,680 (SinkRunner-PollingRunner-DefaultSinkProcessor) 
[INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] 
Event: { headers:{} body: 68 65 6C 6C 6F                                  hello }

  • 1
  • 2
  • 3
  • 4
  • 5

2. 从指定网络端口采集数据单行数据输出到控制台

agent选型:netcat source + memory channel + logger sink

编写配置文件flume-telnet-logger.conf

#Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost # 绑定ip
a1.sources.r1.port = 44444 # 监听的端口

#Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

启动agentflume-ng agent -c ~/software/flume/conf/ -n a1 -f flume-telnet-logger.conf -Dflume.root.logger=INFO,console

测试:telnet localhost 44444,然后输入任意字符即可。

# 控制台打印的内容:
2019-07-21 09:38:34,496 (SinkRunner-PollingRunner-DefaultSinkProcessor)
[INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)]
Event: { headers:{} body: 68 65 6C 6C 6F 0D                               hello. }

  • 1
  • 2
  • 3
  • 4
  • 5

3. 监控一个文件,实时采集新增的数据输出到Kafka

agent选型:exec source + memory channel + KafkaSink sink

编写配置文件flume-2-kafka.conf

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/au/software/data/flume/test.log # 此处为需要监控的文件

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoopPD:9092 # kafka服务器地址
a1.sinks.k1.kafka.topic = ct
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

启动agentflume-ng agent -c ~/software/flume/conf/ -n a1 -f flume-2-kafka.conf

接下来就是Kafka的消费了。

4. 监听TCP的端口,实时采集新增的数据输出到控制台

agent选型:syslogtcp source + memory channel + logger sink

编写配置文件flume-syslogtcp-logger.conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140 #指定端口
a1.sources.r1.host = 0.0.0.0 #指定ip
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

启动agentflume-ng agent -c ~/software/flume/conf/ -n a1 -f flume-syslogtcp-logger.conf -Dflume.root.logger=INFO,console

测试产生syslogecho "hello world" | nc localhost 5140

# 控制台打印的内容:
2019-07-21 09:56:58,495 (SinkRunner-PollingRunner-DefaultSinkProcessor) 
[INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] 
Event: { headers:{Severity=0, Facility=0, flume.syslog.status=Invalid} 
body: 68 65 6C 6C 6F 20 77 6F 72 6C 64                hello world }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

5. 监控一个文件,实时采集新增的数据输出到hdfs

agent选型:exec source + memory channel + hdfs sink

编写配置文件flume-file-hdfs.conf

# Name the components on this agent
a1.sinks = k1
a1.sources = r1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/au/software/data/flume/test.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoopPD:9000/flume/%Y%m%d%H # 改为自己的hdfs集群
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 1
a1.sinks.k1.hdfs.roundUnit = hour
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.batchSize = 1000
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 600
a1.sinks.k1.hdfs.rollSize = 134217700
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1

#Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

启动agentflume-ng agent -c ~/software/flume/conf/ -n a1 -f flume-file-hdfs.conf

测试,往test.log文件写入内容:echo "hello" >> test.log
查询hdfs文件系统的数据:hdfs dfs -ls /flume

6. 将A端服务器日志,实时采集到B端服务器

agent选型:
Aexec source + memory channel + avro sink
Bavro source + memory channel + logger sink

A端服务器编写配置文件flume-file-avro.conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/au/software/data/flume/test.log
a1.sources.r1.shell = /bin/sh -c

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 44444

a1.channels.c1.type = memory

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

B端服务器编写配置文件flume-avro-logger.conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444

a1.sinks.k1.type = logger

a1.channels.c1.type = memory

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

启动agent
Aflume-ng agent -c ~/software/flume/conf/ -n a1 -f flume-file-avro.conf -Dflume.root.logger=INFO,console
Bflume-ng agent -c ~/software/flume/conf/ -n a1 -f flume-avro-logger.conf -Dflume.root.logger=INFO,console

测试:在A服务器输出数据给test.log,在B服务器控制台可看到结果。

7. 故障转移(failover

Sinks组允许用户将多个Sink分到一个组中。Sink Processor可用于在组内的所有Sink上提供负载平衡功能,或在发生暂时性故障时实现从一个Sink到另一个Sink的故障转移。

需要实现的效果:
一台机器一直发送数据给其中一个sink,当这个sink不可用的时候,自动发送到下一个sink

步骤:
1. 在server1创建Flume_Sink_Processors.conf配置文件:

#这个是配置failover的关键,需要有一个sink group
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
#处理的类型是failover
a1.sinkgroups.g1.processor.type = failover
#优先级,数字越大优先级越高,每个sink的优先级必须不相同
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
#设置为10秒,当然可以根据你的实际状况更改成更快或者很慢
a1.sinkgroups.g1.processor.maxpenalty = 10000
  
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1 c2
a1.sources.r1.selector.type = replicating

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = server2
a1.sinks.k1.port = 5555
 
a1.sinks.k2.type = avro
a1.sinks.k2.channel = c2
a1.sinks.k2.hostname = server3
a1.sinks.k2.port = 5555
  
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
  
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

2. 在server2创建Flume_Sink_Processors_avro.conf配置文件:

a1.sources = r1
a1.sinks = k1
a1.channels = c1
  
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 5555
  
# Describe the sink
a1.sinks.k1.type = logger
  
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
  
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

3. 在server3创建Flume_Sink_Processors_avro.conf配置文件:

a1.sources = r1
a1.sinks = k1
a1.channels = c1
  
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 5555
  
# Describe the sink
a1.sinks.k1.type = logger
  
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
  
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

4. 启动agent
分别在server1server2server3启动上述agent
server1flume-ng agent -c ~/software/flume/conf/ -n a1 -f flume-file-avro.conf -Dflume.root.logger=INFO,console
server2flume-ng agent -c ~/software/flume/conf/ -n a1 -f Flume_Sink_Processors_avro.conf -Dflume.root.logger=INFO,console
server3flume-ng agent -c ~/software/flume/conf/ -n a1 -f Flume_Sink_Processors_avro.conf -Dflume.root.logger=INFO,console

5. 在server1机器上,测试产生log
echo "test" | nc localhost 5140

6. 因为server3的优先级高于server2,所以在server3的sink窗口,可以看到server1log信息,而server2没有。

7. 我们停止掉server3机器上的sink(ctrl+c),再次输出测试数据(echo "test" | nc localhost 5140),在server2的sink窗口,可以看到server1log信息。

8. 再在server3sink窗口中,启动sink
server3flume-ng agent -c ~/software/flume/conf/ -n a1 -f Flume_Sink_Processors_avro.conf -Dflume.root.logger=INFO,console

9. 因为优先级的关系,log消息会再次落到server3上:

8. 负载均衡(load balance

load balancefailover不同的地方是,load balance有两个配置,一个是轮询,一个是随机。两种情况下如果被选择的sink不可用,就会自动尝试发送到下一个可用的sink上面。

步骤:
1. 在server1创建Load_balancing_Sink_Processors.conf配置文件:

a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
  
#这个是配置Load balancing的关键,需要有一个sink group
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin
  
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1
  
  
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 5555
  
a1.sinks.k2.type = avro
a1.sinks.k2.channel = c2
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 6666
  
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

2. 在server2创建Load_balancing_Sink_Processors_avro.conf配置文件:

a1.sources = r1
a1.sinks = k1
a1.channels = c1
  
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 5555
  
# Describe the sink
a1.sinks.k1.type = logger
  
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
  
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

3. 在server3创建Load_balancing_Sink_Processors_avro.conf配置文件:

a1.sources = r1
a1.sinks = k1
a1.channels = c1
  
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 5555
  
# Describe the sink
a1.sinks.k1.type = logger
  
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
  
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

4. 启动agent
分别在server1server2server3启动上述agent
server1flume-ng agent -c ~/software/flume/conf/ -n a1 -f Load_balancing_Sink_Processors.conf -Dflume.root.logger=INFO,console
server2flume-ng agent -c ~/software/flume/conf/ -n a1 -f Load_balancing_Sink_Processors_avro.conf -Dflume.root.logger=INFO,console
server3flume-ng agent -c ~/software/flume/conf/ -n a1 -f Load_balancing_Sink_Processors_avro.conf -Dflume.root.logger=INFO,console

5. 在server1机器上,测试产生log
echo "test" | nc localhost 5140

:**
分别在server1server2server3启动上述agent
server1flume-ng agent -c ~/software/flume/conf/ -n a1 -f Load_balancing_Sink_Processors.conf -Dflume.root.logger=INFO,console
server2flume-ng agent -c ~/software/flume/conf/ -n a1 -f Load_balancing_Sink_Processors_avro.conf -Dflume.root.logger=INFO,console
server3flume-ng agent -c ~/software/flume/conf/ -n a1 -f Load_balancing_Sink_Processors_avro.conf -Dflume.root.logger=INFO,console

5. 在server1机器上,测试产生log
echo "test" | nc localhost 5140

6. 可以看到server2server3接连的的接受log信息,说明轮询模式起到了作用。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/640283
推荐阅读
相关标签
  

闽ICP备14008679号