当前位置:   article > 正文

Flume断点续传深入研究_flume 断点续传 java

flume 断点续传 java
方法一:在excel source中运用复杂的tail命令
在百度中搜索到一篇文章:https://my.oschina.net/leejun2005/blog/288136

可以在tail传的时候记录行号,下次再传的时候,取上次记录的位置开始传输,类似:

agent1.sources.avro-source1.command = /usr/local/bin/tail  -n +$(tail -n1 /home/storm/tmp/n) --max-unchanged-stats=600 -F  /home/storm/tmp/id.txt | awk 'ARNGIND==1{i=$0;next}{i++; if($0~/文件已截断/)i=0; print i >> "/home/storm/tmp/n";print $1"---"i}' /home/storm/tmp/n -
需要注意如下几点:
(1)文件被rotation的时候,需要同步更新你的断点记录“指针”,
(2)需要按文件名来追踪文件,
(3)flume挂掉后需要累加断点续传“指针”
(4)flume挂掉后,如果恰好文件被rotation,那么会有丢数据的风险,只能监控尽快拉起或者加逻辑判断文件大小重置指针。
(5)tail 注意你的版本,请更新coreutils包到最新。


于是乎:
[hadoop@h71 ~]$ cat data.txt 

Jan 18 22:25:55 192.168.101.254 s_sys@hui syslog-ng[69]: STATS: dropped 0
Jan 31 10:24:27 192.168.101.254 s_sys@hui kernel: External interface ADSL is down
Jan 31 10:24:31 192.168.101.254 s_sys@hui system: ||||IPSec event unroute-client on tunnel to gz
Feb  2 06:26:01 h107 rsyslogd0: action 'action 17' resumed (module 'builtin:ompipe') [try http://www.rsyslog.com/e/0 ]
Feb 20 06:25:04 h107 rsyslogd: [origin software="rsyslogd" swVersion="8.4.2" x-pid="22204" x-info="http://www.rsyslog.com"] rsyslogd was HUPed
Feb 21 10:24:32 192.168.101.254 s_sys@hui kernel: 
Feb 21 10:24:33 192.168.101.254 s_sys@hui kernel: 
[hadoop@h71 ~]$ cat n
0

[hadoop@h71 ~]$ echo "Feb 21 10:24:33 192.168.101.254 s_sys@hui kernel: " >> data.txt
[hadoop@h71 ~]$ cat n
1
2
3
4
5
6
7


[hadoop@h71 conf]$ cat hehe.sh
/home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/bin/flume-ng agent -c /home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/conf -f /home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/conf/hbase_simple.conf -n a1 -Dflume.root.logger=INFO,console
tail -n +0 -F /home/hadoop/data.txt |awk 'ARNGIND==1{next}{i++;print i > "/home/hadoop/n"}' /home/hadoop/data.txt
(一开始我还担心第二条命令也会和第一条命令一并执行,那这样的话n文件就不能记录flume进程挂掉后的行数了啊,最后还好经过试验知道第二条命令在第一条命令执行完成后才执行)


[hadoop@h71 conf]$ cat hbase_simple.conf 

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -n +$(tail -n -1 /home/hadoop/n) -F /home/hadoop/data.txt
a1.sources.r1.port = 44444
a1.sources.r1.host = 192.168.8.71
a1.sources.r1.channels = c1

# Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.type = hbase
a1.sinks.k1.table = messages
a1.sinks.k1.columnFamily = host
a1.sinks.k1.columnFamily = cf
a1.sinks.k1.serializer = com.tcloud.flume.AsyncHbaseLogEventSerializer
a1.sinks.k1.channel = memoryChannel

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channe
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
[hadoop@h71 conf]$ sh hehe.sh
但是:
tail -n +$(tail -n -1 /home/hadoop/n) -F /home/hadoop/data.txt这条指令在Linux中好使,在flume中却不好使,难道是$的原因吗。。。
在Linux中:
[hadoop@h71 conf]$ tail -n +$(tail -n -1 /home/hadoop/n) -F /home/hadoop/data.txt
Jan 18 22:25:55 192.168.101.254 s_sys@hui syslog-ng[69]: STATS: dropped 0
Jan 31 10:24:27 192.168.101.254 s_sys@hui kernel: External interface ADSL is down
Jan 31 10:24:31 192.168.101.254 s_sys@hui system: ||||IPSec event unroute-client on tunnel to gz
Feb  2 06:26:01 h107 rsyslogd0: action 'action 17' resumed (module 'builtin:ompipe') [try http://www.rsyslog.com/e/0 ]
Feb 20 06:25:04 h107 rsyslogd: [origin software="rsyslogd" swVersion="8.4.2" x-pid="22204" x-info="http://www.rsyslog.com"] rsyslogd was HUPed
Feb 21 10:24:32 192.168.101.254 s_sys@hui kernel: 
在flume中sink能开启,但是source端却抽不到数据。。


无奈之下我只能换了思路,那就是每次flume进程因故障挂掉的时候,去查看n文件的最后一行数字就是flume进程结束时停止的行数,然后再手动去修改hbase_simple.conf中的
a1.sources.r1.command = tail -n +0 -F /home/hadoop/data.txt   (-n后面的参数就是查看n文件最后一行的数字再加一)
比如n文件的最后一行是7,那么再启动flume的时候就得将hbase_simple.conf中的a1.sources.r1.command改为tail -n +8 -F /home/hadoop/data.txt
哎,但是我感觉这种方法好笨啊,虽然是无奈之举。。


额,后来又想到这样还不能在a1.sources.r1.command中写入两个文件了,如果写入两个文件的话会出现这种情况(无法分开统计,成了累加了):
[hadoop@h71 ~]$ tail -n +0 -F /home/hadoop/data.txt /home/hadoop/data2.txt |awk 'ARNGIND==1{next}{i++;print i > "/home/hadoop/n"}' /home/hadoop/data.txt /home/hadoop/data2.txt
[hadoop@h71 ~]$ cat n
1
2
3
4
5
6
7
8
9
10
11
12
[hadoop@h71 ~]$ tail -n +0 -F /home/hadoop/data.txt /home/hadoop/data2.txt

==> /home/hadoop/data.txt <==
Jan 18 22:25:55 192.168.101.254 s_sys@hui syslog-ng[69]: STATS: dropped 0
Jan 31 10:24:27 192.168.101.254 s_sys@hui kernel: External interface ADSL is down
Jan 31 10:24:31 192.168.101.254 s_sys@hui system: ||||IPSec event unroute-client on tunnel to gz
Feb  2 06:26:01 h107 rsyslogd0: action 'action 17' resumed (module 'builtin:ompipe') [try http://www.rsyslog.com/e/0 ]
Feb 20 06:25:04 h107 rsyslogd: [origin software="rsyslogd" swVersion="8.4.2" x-pid="22204" x-info="http://www.rsyslog.com"] rsyslogd was HUPed
Feb 21 10:24:32 192.168.101.254 s_sys@hui kernel: 
Feb 21 10:24:33 192.168.101.254 s_sys@hui kernel: 
Feb 21 10:24:33 192.168.101.254 s_sys@hui kernel: 

==> data2.txt <==
Feb 01 05:54:55 192.168.101.254 s_sys@hui trafficlogger: empty map for 1:4097 in classnames
Jan 23 20:07:00 192.168.101.254 s_sys@hui trafficlogger: empty map for 1:4097 in classnames
Jan 29 06:29:39 h107 rsyslogd-2007: action 'action 17' suspended, next retry is Sun Jan 29 06:30:09 2017 [try http://www.rsyslog.com/e/2007 ]
Jan 29 20:07:01 192.168.101.254 s_sys@hui trafficlogger: empty map for 1:4097 in classnames
参考地址:http://www.cnblogs.com/zhzhang/p/5778836.html

后来发现在Linux下这个命令真的挺好用的,如果flume好使的话真的是不错,奈何flume不好使
a1.sources.r1.command = tail -n +$(tail -n1 /home/hadoop/n) -F /home/hadoop/data.txt 2>&1 | awk 'ARGIND==1{i=$0;next}{i++;if($0~/^tail/){i=0};print $0;print i >> "/home/hadoop/n";fflush("")}' /home/hadoop/n -
并且需要在首次启动flume进程时把n文件内容设置为1,如果设置为0的话第二次启动的时候读取的行数不是结束时的行数。


后来在群里问有人说flume不一定可以理解这么复杂的shell的,人家说exec是Java程序而已,又不是shell终端,普通的tail -f也只是给flume数据流而已,需要看看exec的源码,估计也只是从shell命令拿到数据流而已


方法二:自己编写source端
参考:github地址:https://github.com/cwtree/flume-filemonitor-source


先在hbase中建立相应的表:
hbase(main):175:0> create 'messages','cf','host'


flume配置:
[hadoop@h71 conf]$ cat chiwei.conf 

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = org.apache.flume.chiwei.filemonitor.FileMonitorSource
a1.sources.r1.channels = c1
a1.sources.r1.file = /home/hadoop/messages
a1.sources.r1.positionDir = /home/hadoop

a1.sinks.k1.type = logger
a1.sinks.k1.type = hbase
a1.sinks.k1.table = messages
a1.sinks.k1.columnFamily = host
a1.sinks.k1.columnFamily = cf
a1.sinks.k1.serializer = com.tcloud.flume.AsyncHbaseLogEventSerializer
a1.sinks.k1.channel = memoryChannel

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
它会监测那个文件的变化的,并且周期性的将当前写过的位置记录到position.log文件中,按字节处理的;如果进程挂了,下次重新启动,它会自动按照position.log最后记录的字节位置开始往后写到下游的

这个有个弊端:
就是再次启动flume进程的时候当意外停止进程到启动进程之间有一行日志输入我在sink端可以匹配空白行解决,但是如果期间有多行导入的时候就无能为力了,这期间的数据就会丢失了,启动之后日志如果是一行一行的输入也没事,就怕一次导入多行我这个就不灵了。我感觉得修改source端的源码让数据一行一行的读取,而不是一起读取。


问题再现:

Jan 31 10:24:36 192.168.101.254 s_sys@hui kernel: External interface ADSL is downn
Jan 31 10:24:37 192.168.101.254 s_sys@hui kernel: External interface ADSL is downn
Jan 31 10:24:38 192.168.101.254 s_sys@hui kernel: External interface ADSL is downn
Jan 31 10:24:39 192.168.101.254 s_sys@hui kernel: External interface ADSL is downn(一次输入多行导入hbase失败)

null
Jan 31 10:24:40 192.168.101.254 s_sys@hui kernel: External interface ADSL is downn(一次输入一行导入hbase成功)

com.tcloud.flume.AccessLog@150cf6c
20170131102440
Jan 31 10:24:41 192.168.101.254 s_sys@hui kernel: External interface ADSL is downn

com.tcloud.flume.AccessLog@cb6c33
20170131102441
解决:
我在AsyncHbaseLogEventSerializer中添加了数组遍历,而不是在正则中匹配空行,这才解决了问题。。。

12/12/13 18:13:04 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started
Jan 18 22:25:55 192.168.101.254 s_sys@hui syslog-ng[69]: STATS: dropped 0
Jan 31 10:24:27 192.168.101.254 s_sys@hui kernel: External interface ADSL is down
Jan 31 10:24:31 192.168.101.254 s_sys@hui system: ||||IPSec event unroute-client on tunnel to gz
Feb  2 06:26:01 h107 rsyslogd0: action 'action 17' resumed (module 'builtin:ompipe') [try http://www.rsyslog.com/e/0 ]
Feb 20 06:25:04 h107 rsyslogd: [origin software="rsyslogd" swVersion="8.4.2" x-pid="22204" x-info="http://www.rsyslog.com"] rsyslogd was HUPed
Jan 31 10:24:27 192.168.101.254 s_sys@hui kernel: External interface ADSL is down
Jan 31 10:24:28 192.168.101.254 s_sys@hui kernel: External interface ADSL is down

Jan 18 22:25:55 192.168.101.254 s_sys@hui syslog-ng[69]: STATS: dropped 0
com.tcloud.flume.AccessLog@150cf6c
20170118222555
Jan 31 10:24:27 192.168.101.254 s_sys@hui kernel: External interface ADSL is down
com.tcloud.flume.AccessLog@1ee970
20170131102427
Jan 31 10:24:31 192.168.101.254 s_sys@hui system: ||||IPSec event unroute-client on tunnel to gz
com.tcloud.flume.AccessLog@16fe335
20170131102431
Feb  2 06:26:01 h107 rsyslogd0: action 'action 17' resumed (module 'builtin:ompipe') [try http://www.rsyslog.com/e/0 ]
com.tcloud.flume.AccessLog@adca08
20170202062601
Feb 20 06:25:04 h107 rsyslogd: [origin software="rsyslogd" swVersion="8.4.2" x-pid="22204" x-info="http://www.rsyslog.com"] rsyslogd was HUPed
com.tcloud.flume.AccessLog@1ab04ce
20170220062504
Jan 31 10:24:27 192.168.101.254 s_sys@hui kernel: External interface ADSL is down
com.tcloud.flume.AccessLog@c889db
20170131102427
Jan 31 10:24:28 192.168.101.254 s_sys@hui kernel: External interface ADSL is down
com.tcloud.flume.AccessLog@1d2b352
20170131102428
Jan 31 10:24:29 192.168.101.254 s_sys@hui kernel: External interface ADSL is down

Jan 31 10:24:29 192.168.101.254 s_sys@hui kernel: External interface ADSL is down
com.tcloud.flume.AccessLog@1913b29
20170131102429
代码已上传 http://download.csdn.net/download/m0_37739193/10154916


总结:

上面两个方法是在flume的旧版本的时候没有断点续传功能的时候大家的一种大胆的尝试,可是毕竟大家水平有限而且都是个人在搞,所以都存在一定的缺陷。直到flume1.7推出了TaildirSource组件后(大家可以参考我的另一篇文章http://blog.csdn.net/m0_37739193/article/details/72962192),终于实现了断点续传功能,而且人家肯定是比较好的(有团队组织在维护,里面有大牛在哈,人家肯定是经过了反复的测试研究后才发布的)。上面两个方法就是为了给大家打开思路,提供参考而已,感觉思想还是很不错的。最后就是,既然已经有了现成好用的东西了,你也没必有再自己造轮子了,而且你造的轮子未必有人家的好。。。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/88973
推荐阅读
相关标签
  

闽ICP备14008679号