当前位置:   article > 正文

Kafka集成

kafka集成

第 1 章 集成 Flume

Flume 是一个在大数据开发中非常常用的组件。可以用于 Kafka 的生产者,也可以用于
Flume 的消费者。
在这里插入图片描述

1.1 Flume 生产者

在这里插入图片描述

场景:在服务器当中/opt/module/applog/app.*里有一个文件以app开头的,我们通过flume实时监控它,只要有数据变化我们就能实时监控它,所以要使用taildir source,支持断点续传且时刻可以监控到每一个文件的变化,由于传输的是普通的日志,没有必要追求太高的可靠性,我们可以使用效率较高的memory channel,数据发往kafka中,所以sink只能使用kafka sink

(1)启动 kafka 集群

[chenyunde@hadoop102 ~]$ zk.sh start
[chenyunde@hadoop102 ~]$ kf.sh start

(2)启动 kafka 消费者

[chenyunde@hadoop103 kafka]$ bin/kafka-console-consumer.sh
–bootstrap-server hadoop102:9092 --topic first

(3)配置 Flume
在 hadoop102 节点的 Flume 的 job 目录下创建 file_to_kafka.conf

[chenyunde@hadoop102 flume]$ mkdir jobs

[chenyunde@hadoop102 flume]$ vim jobs/file_to_kafka.conf

配置文件内容如下

# 1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 2 配置 source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/app.*
a1.sources.r1.positionFile = 
/opt/module/flume/taildir_position.json
# 3 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 4 配置 sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = 
hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# 5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

(4)启动 Flume

[chenyunde@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f
jobs/file_to_kafka.conf &

(5)向/opt/module/applog/app.log 里追加数据,查看 kafka 消费者消费情况

[chenyunde@hadoop102 module]$ mkdir applog

[chenyunde@hadoop102 applog]$ echo hello >> /opt/module/applog/app.log

(6)观察 kafka 消费者,能够看到消费的 hello 数据

1.2 Flume 消费者

在这里插入图片描述

(1)配置 Flume
在 hadoop102 节点的 Flume 的/opt/module/flume/jobs 目录下创建 kafka_to_file.conf

[chenyunde@hadoop102 jobs]$ vim kafka_to_file.conf

配置文件内容如下

# 1 组件定义 a1.sources = r1 a1.sinks = k1 a1.channels = c1
# 2 配置 source a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 50 a1.sources.r1.batchDurationMillis = 200 a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092 a1.sources.r1.kafka.topics = first a1.sources.r1.kafka.consumer.group.id = custom.g.id
# 3 配置 channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
# 4 配置 sink a1.sinks.k1.type = logger


# 5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

(2)启动 Flume

[chenyunde@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f
jobs/kafka_to_file.conf -Dflume.root.logger=INFO,console

(3)启动 kafka 生产者

[chenyunde@hadoop103 kafka]$ bin/kafka-console-producer.sh –
bootstrap-server hadoop102:9092 --topic first

并输入数据,例如:hello world
(4)观察控制台输出的日志

第 2 章 集成 Flink

Flink 是一个在大数据开发中非常常用的组件。可以用于 Kafka 的生产者,也可以用于
Flink 的消费者。

在这里插入图片描述

1)Flink 环境准备
(1)创建一个 maven 项目 flink-kafka
(2)添加配置文件

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.13.0</version>
</dependency>
</dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

(3)将 log4j.properties 文件添加到 resources 里面,就能更改打印日志的级别为 error

log4j.rootLogger=error, stdout,R
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{
   yyyy-MM-dd 
HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n
log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.File=../log/agent.log
log4j.appender.R.MaxFileSize=1024KB
log4j.appender.R.MaxBackupIndex=1
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%d{
   yyyy-MM-dd 
HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L) : %m%n
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

(4)在 java 文件夹下创建包名为 com.chenyunde.flink

2.1 Flink 生产者

(1)在 com.chenyunde.flink 包下创建 java 类:FlinkKafkaProd

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/638888
推荐阅读
相关标签
  

闽ICP备14008679号