赞
踩
Flume 是一个分布式、可靠且高可用的日志收集和聚合系统。它是 Apache 基金会下的一个开源项目,旨在帮助用户轻松地从多个源收集、聚合和移动大量的日志数据。
架构:Flume 的架构包括三个核心组件:Source、Channel 和 Sink。Source 负责从不同的数据源(如日志文件、Kafka、Syslog 等)收集数据,Channel 作为数据的缓冲区,用于将数据传递给 Sink,Sink 将数据发送到最终的目的地(如 HDFS、HBase、Kafka 等)。
数据流:Flume 的数据流通过 Event 来表示,一个 Event 包含了要传输的数据以及一些可选的头部信息。Event 在 Source 和 Channel 之间、Channel 和 Sink 之间进行传递。
Source:Source 是 Flume 的数据输入端,它从各个数据源收集数据并将其转换为 Event,然后发送到 Channel。Flume 提供了多种类型的 Source,如 Avro、Netcat、Spooling Directory 等,以适应不同的数据源。
Channel:Channel 是 Flume 的数据缓冲区,它用于存储从 Source 收集到的 Event,并将其传递给 Sink。Flume 提供了多种类型的 Channel,如 Memory、File、Kafka 等,以适应不同的需求和场景。
Sink:Sink 是 Flume 的数据输出端,它从 Channel 中获取 Event 并将其发送到最终的目的地。Flume 提供了多种类型的 Sink,如 HDFS、HBase、Kafka 等,以支持不同的数据存储和分析需求。
Agent:Agent 是 Flume 的基本工作单元,它是一个独立的进程,包含了一组 Source、Channel 和 Sink 的配置。一个 Agent 可以用于收集和传输特定类型的数据。
事件处理:Flume 支持一些事件处理机制,如拦截器(Interceptors)和转换器(Converters)。拦截器可以用于对 Event 进行预处理、过滤或添加额外的信息,转换器可以用于对 Event 中的数据进行格式转换。
可靠性和高可用性:Flume 提供了一些机制来保证数据的可靠性和高可用性,如事务、重试、故障转移等。可以通过适当的配置来确保数据流的稳定性和可靠性。
属性 | 默认值 | 解释 |
---|---|---|
channels | – | 与Source绑定的channel,多个用空格分开 |
type | – | 组件类型,这个是: |
bind | – | 监听的服务器名hostname或者ip |
port | – | 监听的端口 |
配置范例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
属性 | 默认值 | 解释 |
---|---|---|
channels | – | 与Source绑定的channel,多个用空格分开 |
type | – | 组件类型,这个是: |
command | – | 所使用的系统命令,一般是cat 或者tail |
配置范例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1
属性名 | 默认值 | 解释 |
---|---|---|
channels | – | 与Source绑定的channel,多个用空格分开 |
type | – | 组件类型,这个是: |
spoolDir | – | Flume Source监控的文件夹目录,该目录下的文件会被Flume收集 |
配置范例:
a1.channels = ch-1
a1.sources = src-1
a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
a1.sources.src-1.fileHeader = true
属性 | 默认值 | 解释 |
---|---|---|
channels | – | 与Source绑定的channel,多个用空格分开 |
type | – | 组件类型,这个是: |
host | – | 要监听的hostname或者IP地址 |
port | – | 要监听的端口 |
配置范例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
属性名 | 默认值 | 解释 |
---|---|---|
channels | – | 与Source绑定的channel,多个用空格分开 |
type | – | 组件类型,这个是: |
filegroups | – | 被监控的文件夹目录集合,这些文件夹下的文件都会被监控,多个用空格分隔 |
filegroups.<filegroupName> | – | 被监控文件夹的绝对路径。正则表达式(注意不会匹配文件系统的目录)只是用来匹配文件名 |
配置范例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true
a1.sources.ri.maxBatchCount = 1000
属性名 | 默认值 | 解释 |
---|---|---|
channel | – | 与 Sink 连接的 channel |
type | – | 组件类型,这个是: |
hdfs.path | – | HDFS目录路径(例如:hdfs://namenode/flume/webdata/) |
配置范例:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
属性 | 默认值 | 解释 |
---|---|---|
channel | – | 与 Sink 绑定的 channel |
type | – | 组件类型,这个是: |
table | – | 要写入的 Hbase 表名 |
columnFamily | – | 要写入的 Hbase 列族 |
配置范例:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
a1.sinks.k1.channel = c1
属性 | 默认值 | 解释 |
---|---|---|
type | – | 组件类型,这个是: |
kafka.bootstrap.servers | – | Kafka Sink 使用的 Kafka 集群的实例列表,可以是实例的部分列表。但是更建议至少两个用于高可用(HA)支持。格式为 hostname:port,多个用逗号分隔 |
配置范例:
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy
属性 | 默认值 | 解释 |
---|---|---|
channel | – | 与 Sink 绑定的 channel |
type | – | 组件类型,这个是: |
maxBytesToLog | 16 | Event body 输出到日志的最大字节数,超出的部分会被丢弃 |
配置范例:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
属性 | 默认值 | 解释 |
---|---|---|
channel | – | 与 Sink 绑定的 channel |
type | – | 组件类型,这个是: |
hostNames | – | 逗号分隔的hostname:port列表,如果端口不存在,则使用默认的9300端口 |
配置范例:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = elasticsearch
a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300
a1.sinks.k1.indexName = foo_index
a1.sinks.k1.indexType = bar_type
a1.sinks.k1.clusterName = foobar_cluster
a1.sinks.k1.batchSize = 500
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
a1.sinks.k1.channel = c1
属性 | 默认值 | 解释 |
---|---|---|
type | – | 组件类型,这个是: |
配置范例:
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000
属性 | 默认值 | 解释 |
---|---|---|
type | – | 组件类型,这个是: |
配置范例:
a1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data
属性 | 默认值 | 解释 |
---|---|---|
type | – | 组件类型,这个是: |
配置范例:
a1.channels = c1
a1.channels.c1.type = jdbc
属性 | 默认值 | 解释 |
---|---|---|
type | – | 组件类型,这个是: |
kafka.bootstrap.servers | – | channel使用的Kafka集群的实例列表,可以是实例的部分列表。但是更建议至少两个用于高可用支持。格式为hostname:port,多个用逗号分隔 |
配置范例:
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
属性 | 默认值 | 解释 |
---|---|---|
type | – | 组件类型,这个是: |
配置范例:
a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 10000
a1.channels.c1.overflowCapacity = 1000000
a1.channels.c1.byteCapacity = 800000
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。