赞
踩
Flume 是一个在大数据开发中非常常用的组件。可以用于 Kafka 的生产者,也可以用于
Flume 的消费者。
场景:在服务器当中/opt/module/applog/app.*里有一个文件以app开头的,我们通过flume实时监控它,只要有数据变化我们就能实时监控它,所以要使用taildir source,支持断点续传且时刻可以监控到每一个文件的变化,由于传输的是普通的日志,没有必要追求太高的可靠性,我们可以使用效率较高的memory channel,数据发往kafka中,所以sink只能使用kafka sink
(1)启动 kafka 集群
[chenyunde@hadoop102 ~]$ zk.sh start
[chenyunde@hadoop102 ~]$ kf.sh start
(2)启动 kafka 消费者
[chenyunde@hadoop103 kafka]$ bin/kafka-console-consumer.sh
–bootstrap-server hadoop102:9092 --topic first
(3)配置 Flume
在 hadoop102 节点的 Flume 的 job 目录下创建 file_to_kafka.conf
[chenyunde@hadoop102 flume]$ mkdir jobs
[chenyunde@hadoop102 flume]$ vim jobs/file_to_kafka.conf
配置文件内容如下
# 1 组件定义 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 2 配置 source a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /opt/module/applog/app.* a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json # 3 配置 channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 4 配置 sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sinks.k1.kafka.topic = first a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 # 5 拼接组件 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
(4)启动 Flume
[chenyunde@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f
jobs/file_to_kafka.conf &
(5)向/opt/module/applog/app.log 里追加数据,查看 kafka 消费者消费情况
[chenyunde@hadoop102 module]$ mkdir applog
[chenyunde@hadoop102 applog]$ echo hello >> /opt/module/applog/app.log
(6)观察 kafka 消费者,能够看到消费的 hello 数据
(1)配置 Flume
在 hadoop102 节点的 Flume 的/opt/module/flume/jobs 目录下创建 kafka_to_file.conf
[chenyunde@hadoop102 jobs]$ vim kafka_to_file.conf
配置文件内容如下
# 1 组件定义 a1.sources = r1 a1.sinks = k1 a1.channels = c1
# 2 配置 source a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 50 a1.sources.r1.batchDurationMillis = 200 a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092 a1.sources.r1.kafka.topics = first a1.sources.r1.kafka.consumer.group.id = custom.g.id
# 3 配置 channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
# 4 配置 sink a1.sinks.k1.type = logger
# 5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
(2)启动 Flume
[chenyunde@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f
jobs/kafka_to_file.conf -Dflume.root.logger=INFO,console
(3)启动 kafka 生产者
[chenyunde@hadoop103 kafka]$ bin/kafka-console-producer.sh –
bootstrap-server hadoop102:9092 --topic first
并输入数据,例如:hello world
(4)观察控制台输出的日志
Flink 是一个在大数据开发中非常常用的组件。可以用于 Kafka 的生产者,也可以用于
Flink 的消费者。
1)Flink 环境准备
(1)创建一个 maven 项目 flink-kafka
(2)添加配置文件
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.13.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.13.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>1.13.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.12</artifactId> <version>1.13.0</version> </dependency> </dependencies>
(3)将 log4j.properties 文件添加到 resources 里面,就能更改打印日志的级别为 error
log4j.rootLogger=error, stdout,R
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{
yyyy-MM-dd
HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n
log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.File=../log/agent.log
log4j.appender.R.MaxFileSize=1024KB
log4j.appender.R.MaxBackupIndex=1
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%d{
yyyy-MM-dd
HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L) : %m%n
(4)在 java 文件夹下创建包名为 com.chenyunde.flink
(1)在 com.chenyunde.flink 包下创建 java 类:FlinkKafkaProd
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。