当前位置:   article > 正文

Flume详解(3)

Flume详解(3)

Host Interceptor

主机拦截器,本质上不是拦截数据,而是在数据的headers中添加一个host字段,可以用于标记数据来源(被收集)的主机。

Host Interceptor可以配置的选项有:

表-22 配置选项

选项

备注

解释

type

required

拦截器类型,此处必须是host

preserveExisting

optional

如果host字段已经存在,是否替换。默认是false

useIP

optional

是否使用IP。如果为true,则显示为IP;如果为false,则显示为主机名。默认为true

hostHeader

optional

显示的字段,默认是host

案例:

  1. a1.sources = s1
  2. a1.channels = c1
  3. a1.sinks = k1
  4. a1.sources.s1.type = netcat
  5. a1.sources.s1.bind = 0.0.0.0
  6. a1.sources.s1.port = 8090
  7. a1.sources.s1.interceptors = i1
  8. # 配置Host Interceptor
  9. a1.sources.s1.interceptors.i1.type = host
  10. # 是否使用IP
  11. a1.sources.s1.interceptors.i1.useIP = false
  12. a1.channels.c1.type = memory
  13. a1.sinks.k1.type = logger
  14. a1.sources.s1.channels = c1
  15. a1.sinks.k1.channel = c1

Static Interceptor

静态拦截器,本质上不是拦截器,而是在数据的headers中添加一个指定的字段。

Static Interceptor可以配置的选项有:

表-23 配置选项

选项

备注

解释

type

required

拦截器类型,此处必须是static

preserveExisting

optional

如果指定字段已经存在,是否替换。默认是false

key

optional

指定的键。默认是key

value

optional

指定的值,默认是value

案例:

  1. a1.sources = s1
  2. a1.channels = c1
  3. a1.sinks = k1
  4. a1.sources.s1.type = netcat
  5. a1.sources.s1.bind = 0.0.0.0
  6. a1.sources.s1.port = 8090
  7. a1.sources.s1.interceptors = i1
  8. # 配置Static Interceptor
  9. a1.sources.s1.interceptors.i1.type = static
  10. # 指定键
  11. a1.sources.s1.interceptors.i1.key = kind
  12. # 指定值
  13. a1.sources.s1.interceptors.i1.value = test
  14. a1.channels.c1.type = memory
  15. a1.sinks.k1.type = logger
  16. a1.sources.s1.channels = c1
  17. a1.sinks.k1.channel = c1

UUID Interceptor

UUID拦截器,本质上也不是一个拦截器,而是在数据的headers中添加一个id字段,可以用于标记数据的唯一性。

UUID Interceptor可以配置的选项包含:

表-24 配置选项

选项

备注

解释

type

required

拦截器类型,此处必须是org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder

headerName

optional

headers中添加的字段名,默认是id

preserveExisting

optional

如果headers中已经存在id字段,是否替换。默认是true

prefix

optional

在生成的id之前添加前缀

案例:

  1. a1.sources = s1
  2. a1.channels = c1
  3. a1.sinks = k1
  4. a1.sources.s1.type = netcat
  5. a1.sources.s1.bind = 0.0.0.0
  6. a1.sources.s1.port = 8090
  7. a1.sources.s1.interceptors = i1
  8. # 配置UUID Interceptor
  9. a1.sources.s1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
  10. a1.channels.c1.type = memory
  11. a1.sinks.k1.type = logger
  12. a1.sources.s1.channels = c1
  13. a1.sinks.k1.channel = c1

Search And Replace Interceptor

搜索和替换拦截器,在使用的时候需要指定正则表达式,会根据正则表达式指定的规则,对Event中body部分的数据进行替换。注意,只替换body部分的数据,而不会影响headers中的数据。正则表达式的使用规则和Java中正则表达式的规则是一致的。

Search And Replace Interceptor中可以配置的选项包含:

表-25 配置选项

选项

备注

解释

type

required

拦截器类型,此处必须是search_replace

searchPattern

optional

搜索的正则表达式形式

replaceString

optional

替换的形式

charset

optional

body部分的字符集编码,默认是UTF-8

案例:

  1. a1.sources = s1
  2. a1.channels = c1
  3. a1.sinks = k1
  4. a1.sources.s1.type = netcat
  5. a1.sources.s1.bind = 0.0.0.0
  6. a1.sources.s1.port = 8090
  7. a1.sources.s1.interceptors = i1
  8. # 配置Search And Replace Interceptor
  9. a1.sources.s1.interceptors.i1.type = search_replace
  10. # 指定正则表达式
  11. a1.sources.s1.interceptors.i1.searchPattern = [a-z]
  12. # 指定替换后的形式
  13. a1.sources.s1.interceptors.i1.replaceString = *
  14. a1.channels.c1.type = memory
  15. a1.sinks.k1.type = logger
  16. a1.sources.s1.channels = c1
  17. a1.sinks.k1.channel = c1

Regex Filtering Interceptor

正则过滤拦截器,在使用的时候需要指定一个正则表达式,然后根据属性excludeEvents的值来确定过滤方式。如果excludeEvents的值为true,则表示过滤掉符合正则表达式形式的数据,其他数据留下来;如果excludeEvents的值为false,则表示过滤掉不合符正则表达式形式的数据,符合形式的数据留下来。默认情况下,excludeEvents的值为false。

Regex Filtering Interceptor中可以配置的选项包含:

表-26 配置选项

选项

备注

解释

type

required

拦截器类型,此处必须是regex_filter

regex

optional

正则表达式

excludeEvents

optional

替换规则,默认为false

案例:

  1. a1.sources = s1
  2. a1.channels = c1
  3. a1.sinks = k1
  4. a1.sources.s1.type = netcat
  5. a1.sources.s1.bind = 0.0.0.0
  6. a1.sources.s1.port = 8090
  7. a1.sources.s1.interceptors = i1
  8. # 配置Regex Filtering Interceptor
  9. a1.sources.s1.interceptors.i1.type = regex_filter
  10. # 指定正则表达式
  11. a1.sources.s1.interceptors.i1.regex = .*[0-9].*
  12. # 指定替换规则
  13. a1.sources.s1.interceptors.i1.excludeEvents = true
  14. a1.channels.c1.type = memory
  15. a1.sinks.k1.type = logger
  16. a1.sources.s1.channels = c1
  17. a1.sinks.k1.channel = c1

Custom Interceptor

在实际开发过程中,如果Flume提供的拦截器无法满足业务需求,那么Flume同样支持自定义拦截器。但是不同于其他组件的地方在于,Flume中的拦截器在启动的时候,是通过内部类Builder来启动,所以拦截器中需要覆盖其内部类。

步骤:

1)定义类实现Interceptor接口,覆盖其中的intercept方法,其他方法可以忽略掉;同时需要定义内部类实现Interceptor.Builder接口,覆盖其中的build方法;如果需要获取配置,那么可以配置configure方法。

  1. package com.flume.interceptor;
  2. import org.apache.flume.Context;
  3. import org.apache.flume.Event;
  4. import org.apache.flume.interceptor.Interceptor;
  5. import java.util.ArrayList;
  6. import java.util.List;
  7. import java.util.Map;
  8. public class AuthInterceptor implements Interceptor {
  9.     @Override
  10.     public void initialize() {
  11.     }
  12.     @Override
  13.     public Event intercept(Event event) {
  14.         // 获取headers部分
  15.         Map<String, String> headers = event.getHeaders();
  16.         // 判断headers是否包含time或者timestamp字段
  17.         if (headers.containsKey("time") || headers.containsKey("timestamp")) return event;
  18.         // 如果没有,则添加当前的时间戳
  19.         headers.put("timestamp", String.valueOf(System.currentTimeMillis()));
  20.         return event;
  21.     }
  22.     @Override
  23.     public List<Event> intercept(List<Event> events) {
  24.         // 定义新的集合
  25.         List<Event> es = new ArrayList<>();
  26.         // 遍历
  27.         for (Event event : events) {
  28.             es.add(intercept(event));
  29.         }
  30.         return es;
  31.     }
  32.     @Override
  33.     public void close() {
  34.     }
  35.     public static class Builder implements Interceptor.Builder {
  36.         @Override
  37.         public Interceptor build() {
  38.             return new AuthInterceptor();
  39.         }
  40.         @Override
  41.         public void configure(Context context) {
  42.         }
  43.     }
  44. }

2)将定义好的Interceptor打成jar包。

3)将jar包上传到Flume安装目录的lib目录下:

# 进入Flume的lib目录

cd /opt/software/flume-1.11.0/lib/

# 选择jar包,上传

rz

4)回到数据目录,编辑文件:

# 回到数据目录

cd ../data

# 编辑文件

vim authin.properties

在文件中添加:

  1. a1.sources = s1
  2. a1.channels = c1
  3. a1.sinks = k1
  4. a1.sources.s1.type = netcat
  5. a1.sources.s1.bind = 0.0.0.0
  6. a1.sources.s1.port = 8090
  7. a1.sources.s1.interceptors = i1
  8. # 配置Custom Interceptor
  9. a1.sources.s1.interceptors.i1.type = com.flume.interceptor.AuthInterceptor$Builder
  10. a1.channels.c1.type = memory
  11. a1.sinks.k1.type = logger
  12. a1.sources.s1.channels = c1
  13. a1.sinks.k1.channel = c1

5)启动Flume:

flume-ng agent -n a1 -c $FLUME_HOME/conf -f authin.properties -Dflume.root.logger=INFO,console

其他

事务

在Flume中,也存在事务的问题:

图-7 Flume事务

流程如下:

1)Source进行doPut将数据写到临时的缓冲区PutList中;

2)PutList会推送数据给Channel,如果Channel中有足够的位置,则数据推送成功(doCommit),如果Channel中没有位置,则推送失败,进行回滚(doRollback);

3)Channel进行doTake操作将数据写到临时缓冲区TakeList中;

4)将TakeList中的数据通过Sink批量写往目的地;

5)如果写成功了,则执行doCommit操作;如果写失败了,则执行doRollback操作。

执行流程

Flume执行流程如下图所示:

图-8 Flume执行流程

流程如下:

1)Source会先采集数据,然后将数据发送给ChannelProcessor进行处理;

2)ChannelProcessor收到数据处理之后,会将数据交给Interceptor来处理,注意,在Flume允许存在多个Interceptor来构成拦截器链;

3)Interceptor处理完成之后,会交给Channel Selector处理,Selector存在三种模式:replicating、multiplexing和load_balancing。Selector收到数据之后会根据对应的模式将数据交给对应的Channel来处理;

4)Channel处理之后会交给SinkProcessor。SinkProcessor本质上是一个Sinkgroup,包含了三种方式:default,failover和load_balance。SinkProcessor收到数据之后会根据对应的方式将数据交给Sink来处理;

5)Sink收到数据之后,会将数据写到指定的目的地。

扩展:Flume监控

Ganglia概述

Flume的数据流可以通过Ganglia来进行监控。Ganglia是UC Berkeley发起的一个开源的集群监控项目,可以用于监测数以千计的节点性能。

Ganglia的核心主要包含了三个模块:

1)gmond(Ganglia Monitoring Daemon):轻量级的服务,需要安装在每一个需要收集指标数据的主机上。gmond可以用于收集系统指标数据,包含CPU、内存、磁盘、网络以及活跃的进程数量等。

2)gmetad(Ganglia Meta Daemon):用于整合所有的信息,并将这些信息以RRD格式来存储到磁盘上。

3)gweb(Ganglia Web):Ganglia提供的一个可视化工具,本身是使用PHP开发的,提供了WEB页面,在WEB界面中以图标形式来显示集群的运行状态,以及所收集到的不同的指标数据。

Ganglia安装

1)所有节点安装httpd和php服务:

yum -y install httpd php

2)所有节点安装关联依赖:

yum -y install rrdtool perl-rrdtool rrdtool-devel apr-devel

3)安装Epel:

yum -y install epel-release

4)在第一个节点上安装gmetad,gmond和gweb:

yum -y install ganglia-gmetad

yum -y install ganglia-gmond

yum -y install ganglia-web

5)在其他节点上安装gmond:

yum -y install ganglia-gmond

6)第一个节点上修改ganglia.conf:

# 编辑文件

vim /etc/httpd/conf.d/ganglia.conf

# 修改内容如下

  1. <Location /ganglia>
  2.   # Require local
  3.   # Require ip 10.1.2.3
  4.   # Require host example.org
  5.   Require all granted
  6. </Location>

7)第一个节点上修改gmetad.conf:

# 编辑文件

vim /etc/ganglia/gmetad.conf

# 修改data_source属性

data_source "flume_cluster" hadoop01

8)所有节点修改gmond.conf文件:

# 编辑文件

vim /etc/ganglia/gmond.conf

# 修改cluster中的属性值

  1. cluster {
  2.   name = "flume_cluster"
  3.   owner = "unspecified"
  4.   latlong = "unspecified"
  5.   url = "unspecified"
  6. }

# 修改udp_send_channel中的属性值

  1. udp_send_channel {
  2.   #bind_hostname = yes # Highly recommended, soon to be default.
  3.                        # This option tells gmond to use a source address
  4.                        # that resolves to the machine's hostname.  Without
  5.                        # this, the metrics may appear to come from any
  6.                        # interface and the DNS names associated with
  7.                        # those IPs will be used to create the RRDs.
  8.   #mcast_join = 239.2.11.71

  # 监控数据发送给hadoop01

  host = hadoop01

  port = 8649

  ttl = 1

}

# 修改udp_recv_channel中的属性值

udp_recv_channel {

  # mcast_join = 239.2.11.71

  port = 8649

  # 接收来自任意连接的数据

  bind = 0.0.0.0

  retry_bind = true

  # Size of the UDP buffer. If you are handling lots of metrics you really

  # should bump it up to e.g. 10MB or even higher.

  # buffer = 10485760

}

9)所有节点关闭selinux:

# 编辑文件

vim /etc/selinux/config

# 修改SELINUX属性的值为disabled

SELINUX=disabled

# 保存退出,重启服务器

reboot

10)所有节点启动gmond:

systemctl start gmond

11)在第一个节点上启动gmetad和httpd:

systemctl start gmetad

systemctl start httpd

12)在浏览器输入http://IP/ganglia,查看Ganglia页面。

图-9 Ganglia界面

监控Flume

1)修改Flume的配置文件:

# 进入Flume的配置目录

cd /opt/software/flume-1.11.0/conf/

# 复制文件

cp flume-env.sh.template flume-env.sh

# 编辑文件

vim flume-env.sh

# 在文件尾部添加

export JAVA_HOME=/opt/software/jdk1.8

export JAVA_OPTS="-Dflume.monitoring.type=ganglia -Dflume.root.monitoring.hosts=hadoop01:8649 -Xms100m -Xmx200m"

# 保存退出,生效

source flume-env.sh

2)启动Flume:

# 回到数据目录

cd ../data

# 启动Flume

flume-ng agent -n a1 -c $FLUME_HOME/conf -f basic.properties -Dflume.root.logger=INFO,console -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=hadoop01:8649

3)属性解释:

表-27 属性解释

属性

解释

ChannelCapacity

Channel的容量

ChannelFillPercentage

Channel的利用率

ChannelSize

Channel的大小

EventPutAttemptCount

Source试图放入Channel的次数

EventPutSuccessCount

Source向Channel成功放入数据的次数

EventTakeAttemptCount

Channel试图向Sink发送数据的次数

EventTakeSuccessCount

Channel向Sink成功发送数据的次数

startTime

起始时间

stopTime

结束时间

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/316655
推荐阅读
相关标签
  

闽ICP备14008679号