赞
踩
Flume vs Logstash vs Filebeat
当时选择数据采集工具时,我们主要参考了市面上热度比较高的Flume和Logstash还有Filebeat,据目前所知,美团和苏宁用的是Flume。
Flume当初的设计初衷就是将数据传送到HDFS中,它更加地注重数据的传输,而Logstash是ELK组件(Elastic Search、Logstash、Kibana)中的一员,侧重于数据预处理。
Flume比Logstash多了一个可靠性策略,在Flume中传输的数据会持久化在channel中,除非已确认数据被传输到了下一位置,即sink端,否则不会删除,这个过程是通过事务来控制的,这样的设计使得可靠性非常好。
Filebeat是一个轻量型日志采集工具,虽能更稳健少宕机,但它设计目的是写Logstash和ES,主要处理轻量级数据。
可能数据重复。
例如数据已经成功由Sink发出,但是没有接收到响应,Sink会再次发送数据,此时可能会导致数据的重复。
解决:我们可以在搭建数仓的dwd层做去重。
Sources:
① Taildir Source:断点续传、多目录
② Exec Source:监测文件中新增内容
③ Spooldir Source:监测目录下新增的文件
④ Avro Source 用于Flume的级联场景
Channels:
为什么要channel?
Channel是位于Source和Sink之间的缓冲区。因此,Channel允许Source和Sink运作在不同的速率上。Channel是线程安全的,可以同时处理几个Source的写入操作和几个Sink的读取操作。
① File Channel:将所有事件写到磁盘,数据存储在磁盘中,宕机数据可以保存。传输速率慢,适合对数据传输可靠性要求高的场景,例如:金融行业
② Memory Channel:是内存中的队列,数据存储在内存中,容易受到容量限制,无法持久化,宕机数据容易丢失。传输效率快,适合对数据传输可靠性要求不高的场景,例如:日志数据
③ Kafka Channel:减少Flume的Sink阶段,提高了传输效率
Flume的通道选择器Channel Selectors
复制和多路选择
默认是Replicating Channel Selector,将从Source过来的events发往所有Channel
Multiplexing则可以选择发往哪些Channel
在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的分析系统。此时会用到Flume拓扑结构中的Multiplexing结构,Multiplexing的原理是,根据event中Header(kv键值对类型)的某个key的值,将不同的event发送到不同的Channel中,所以我们需要自定义一个Interceptor,为不同类型的event的Header中的key对应value值赋予不同的值。
在该案例中,我们以端口数据模拟日志,以数字(单个)和字母(单个)模拟不同类型的日志,我们需要自定义interceptor区分数字和字母,将其分别发往不同的分析系统(Channel)。
@Override public void initialize() { } @Override public Event intercept(Event event) { byte[] body = event.getBody(); if (body[0] < 'z' && body[0] > 'a') { event.getHeaders().put("type", "letter"); } else if (body[0] > '0' && body[0] < '9') { event.getHeaders().put("type", "number"); } return event; }
Sinks:
① HDFS Sink:推送数据到HDFS,可能产生大量小文件问题
② Avro Sink:用于Flume的级联场景
③ File Roll Sink:存放在外部文件系统中,每隔指定时长生成文件并且保存在这段时间内收集到的日志数据
④ HBase Sink:存储在HBase中
⑤ Logger Sink:用于调试
我们是使用第三方框架Ganglia实时监控Flume,当然,你也可以使用Zabbix对Flume监控,但是这种方式需要在Flume源码中添加监控模块,相对较麻烦。
如果Souce -> Channel传输了的消息数,即put数,和Channel -> Sink传输了的消息数,即take数,两者消息量的偏差过大即说明Flume需要做调整。可以先在启动Flume Agent时增加两个参数来指定生成的报告类型和服务器端口查看报告。
参考:http://lxw1234.com/archives/2018/02/899.htm
选型:
Taildir Source:可监测多目录,且支持断点续传
实现断点续传:保持偏移量到某个文件,当故障重启后先读取该文件获取上一次读取位置后读数据。
File Channel:传输比Memory Channel慢,但是宕机后可以数据保存,不丢失。
HDFS Sink:传送数据到HDFS,如果是HBase使用Hbase Sink。
自定义拦截器的步骤
四步走:
① 实现Interceptor拦截器的接口
② 重写四个方法:
initialize 初始化
public Event intercept(Event event) 处理单个Event
public Listintercept(Listevents) 处理多个event方法,在这个方法中调用Event intercept(Event event)
③ close 方法
④ 静态内部类,实现interceptor.Builder
这里增加了两个拦截器进行串联,但性能上会有点损耗,
ETL拦截器:数据清洗,去除不符合Json格式的数据,
① 获取数据,数据都存在body中:使用event.getBody()
② 解析Json:JSON.parseObject(json)
TimeStamp时间戳拦截器:把事件时间加入到header中,以事件时间而不是系统处理时间分区,解决日志数据的时间跨天的问题。
① 获取数据,数据都存在body中:使用event.getBody()
② 解析Json:JSON.parseObject(json)
获取时间戳:Long ts = obj.getLong(“ts”);
③将时间戳写入header
Map<String, String> headers = event.getHeaders();
// 要求传入字符串,但解析得到的ts为Long类型,故使用+"",即用字符串拼接方法转换成字符串
headers.put(“timestamp”,ts+"");
④ 数据返回
return event;
Flume的事务机制(类似数据库的事务机制):
Flume使用两个独立的事务分别负责从Source到Channel(put事务),以及从Channel到Sink(take事务)的事件传递。
比如Spooling Directory Source 为文件的每一行创建一个事件,一旦事务中所有的事件全部传递到Channel且提交成功,那么Source就将该文件标记为完成。同理,事务以类似的方式处理从Channel到Sink的传递过程,如果因为某种原因使得事件无法记录,那么事务将会回滚且所有的事件都会保持到Channel中,等待重新传递。
不会,Channel通道可以选择File Channel,数据传输自身有事务机制保证。
根据Flume的架构原理,Flume是不可能丢失数据的,其内部有完善的事务机制,Source到Channel是事务性的,Channel到Sink是事务性的,因此这两个环节不会出现数据的丢失,唯一可能丢失数据的情况是Channel采用MemoryChannel,Agent宕机导致数据丢失,或者Channel存储数据已满,导致Source不再写入,未写入的数据丢失。
Flume不会丢失数据,但是有可能造成数据的重复,例如数据已经成功由Sink发出,但是没有接收到响应,Sink会再次发送数据,此时可能会导致数据的重复。
Event = Header + Body
答:它是数据流的基本单元,由一个装载数据的字节数组(Byte Array)Body和一系列可选的字符串属性来组成(可选头部记Header)
参考:https://www.pianshen.com/article/46741071230/
实际开发在flume-env.sh中设置JVM Heap为4G或更高,部署在单独的服务器上面。
-Xmx与-Xms最好设置一致,减少内存抖动带来的性能影响,如果不一致容易导致频繁Full GC。
flume-env.sh配置的是公共的agent堆大小,即适用所有,但是如果想要给某个agent单独配置,则发送命令启动脚本时可指定两参数-Xmx -Xms。
-Xmx:设置JVM最大可用内存。
-Xms:设置JVM初始内存。初始内存-Xms可以设置成跟-Xmx最大可用内存相同,以避免每次垃圾回收完成后JVM重新分配内存。
① Source
增加Source个数
使用Tairdir Source时可增加FileGroups个数以增大Source读取数据的能力。例如:当某一个目录下产生的新文件过多时,我们就可以将这个文件目录拆分成多个文件目录,同时配置好多个Source 以保证Source有足够的能力获取到新产生的数据。
batchSize参数决定Source一次批量运输到Channel的event条数,适当调大这个参数可以提高Source搬运Event到Channel时的性能。
② Channel
type 选择memory时Channel的性能最好,但是如果Flume进程意外挂掉可能会丢失数据。type选择file时Channel的容错性更好,但是性能上会比memory channel差。
使用file Channel时dataDirs配置多个不同盘下的目录可以提高性能。
checkpointDir和backupCheckpointDir也尽量配置在不同的硬盘对应的目录中,保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据。
Capacity参数决定Channel可容纳最大的event条数。transactionCapacity参数决定每次Source往channel里面写的最大event条数和每次Sink从channel里面读的最大event条数。transactionCapacity需要大于Source和Sink的batchSize参数。
Flume数据堆压问题
capacity >= transactionCapacity >= batchSize
batchSize是针对Source和Sink而言的,用来限制Source和Sink对event的批量处理。
capacity容量都是对于channel而言的。
如果出现数据堆压,先调用Flume事务的doRollBack方法回滚,清空putList,接着重新采集。
③ Sink
增加Sink的个数可以增加Sink消费event的能力。Sink也不是越多越好够用就行,过多的Sink会占用系统资源,造成系统资源不必要的浪费。
batchSize参数决定Sink一次批量从Channel读取的event条数,适当调大这个参数可以提高Sink从Channel搬出event的性能。
参考:https://blog.csdn.net/weixin_43990496/article/details/106306470
一、影响:
① 元数据层:每个小文件都有一个元数据,这些信息都保存在Namenode内存中。小文件过多会占用Namenode服务器大量的内存,影响Namenode性能和使用寿命
② 计算层面:默认情况下MR会对小文件启用一个MR任务计算,非常影响计算性能,同时也影响磁盘寻址时间
二、处理:
Sink接收到数据先生成了一个.tmp临时文件,当达到设定参数值才会截断生成一个正式的文件。
官方默认三个参数配置写入HDFS后会产生小文件,
hdfs.rollInterval 滚动时间的间隔
hdfs.rollSize 滚动文件大小
hdfs.rollCount 滚动事件个数
几个参数作用如下:
rollSize:即tmp文件在达到多少兆时会滚动生成正式文件
rollInterval:即tmp文件创建超多少秒时会滚动生成正式文件
rollCount:即tmp文件在达到多少个事件时会滚动生成正式文件
解决:
将这几个参数的值设置为0,不启用roll滚动
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollSize = 0
改用round一轮一轮制
例如我们打算对时间戳根据分钟以每1分钟为单位进行四舍五入。
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 1
a1.sinks.k1.hdfs.roundUnit = minute
使用官方提供的Flume-Kafka插件,插件的实现方式是自定义了Flume的 Sink,将数据从Channel中取出,通过Kafka的producer写入到Kafka中,可以自定义分区等。
① Flume采集日志是通过流的方式直接将日志收集到存储层,而Kafka是将数据缓存在Kafka集群中,待后期可以采集到存储层。
② Flume采集中间停了,可以采用文件的方式记录之前的日志,而Kafka是采用offset偏移量的方式记录之前的日志。
Flume -> Kafka -> HDFS -> MR计算
Kafka作为消息中间件主要作用是解耦。因为在不同的系统之间的融合处往往数据生产速率和消费速率不相同,这时可以在这些系统之间加Kafka缓存。例如线上数据需要输入HDFS,线上数据生产快且具有突发性,如果直接连接上HDFS(kafka-consumer)可能会使得高峰时间hdfs数据写失败,这种情况你可以把数据先写到Kafka,然后从Kafka中导入到hdfs。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。