当前位置:   article > 正文

采用Flume-kafka-Flume将数据导入到HDFS中_采用多路复用模式,flume接收数据注入kafka 的同时,将数据备份到hdfs目录/user/te

采用多路复用模式,flume接收数据注入kafka 的同时,将数据备份到hdfs目录/user/tes

首先先去官网下载flume
http://flume.apache.org/download.html

安装Flume,然后进行配置

下载完成后,解压

tar -zxvf apache-flume-1.9.0-bin
  • 1

解压完成之后,进入conf目录下面,把flume-env.sh.template文件更改为flume-env.sh,然后进入flume-env.sh配置jdk的路径。

 export JAVA_HOME=/data/java/jdk1.8.0_11
  • 1

然后进入bin目录,执行命令,验证flume是否安装成功

flume-ng version
  • 1

如果能正常输出版本信息,则表示flume安装成功了
在这里插入图片描述

然后自定义一个conf文件,我是在conf目录下面建的(这个文件在哪里建都可以,只要启动flume的时候,指定这个配置文件的路线就行了)。新增一个flume-to-kafka.conf的文件,然后配置:

#为各组件命名
a1.sources = r1
a1.channels = c1

#描述source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
#监控文件所在路径
a1.sources.r1.filegroups.f1 = /data/module/applogs/log/app.*

#描述channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel

#kafka的安装地址
a1.channels.c1.kafka.bootstrap.servers = 192.168.0.110:9092
a1.channels.c1.kafka.topic = flume
a1.channels.c1.parseAsFlumeEvent = false

#绑定source和channel以及sink和channel的关系
a1.sources.r1.channels = c1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

配置完成之后,我们启动测试一下,进入到bin目录,执行命令,启动flume

flume-ng agent --name a1 -f /data/flume/apache-flume-1.9.0-bin/conf/flume-to-kafka.conf -Dflume.root.logger=INFO,console
  • 1

启动之后,看到flume正常读取了我们指定目录下面的,前缀为app的文件,代表flume配置已经完成.
在这里插入图片描述

kafka安装

接下来是安装kafka,安装包可以去官网下载 http://kafka.apache.org/downloads

下载完成之后,解压

tar -zxvf kafka_2.12-2.7.0.tgz
  • 1

解压完成之后,进入config目录,修改server.properties文件

broker.id=0
port=9092
host.name=192.168.0.110
#指定日志输出目录
log.dirs=/data/kafka/kafka-logs
#zookeeper集群地址,这里zk我装了三个
zookeeper.connect=192.168.0.110:2181,192.168.0.111:2181,192.168.0.112:2181
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

修改完成之后,进入到bin目录,启动kafka。

kafka-server-start.sh -daemon /data/kafka/kafka_2.12-2.7.0/config/server.properties
  • 1

因为第一步的时候,我们已经安装好了flume,当我们启动kafka的时候,我们可以创建一个消费者,去消费,从flume推送过来的。存在kafka里面的数据,也就是消费kafka的数据。注意一下,这个topic 后面的flume是之前配置flume的时候,指定的kafka中topic的名称,名称要对

bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.110:9092 --from-beginning --topic flume
  • 1

启动之后,如果页面上面出现,打印了之前flume目录指定下的文件信息 ,则flume-kafka的通道是通的。

安装第二个flume

安装的步骤跟之前的 一直,只是这个时候,我们需要新建一个kafka-to-hdfs.conf文件,从kafka读取数据到hdfs。

## 组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1

## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
#kafka的安装地址,如果是多个kafka,这里用,分割,指定多个地址就可以了
a1.sources.r1.kafka.bootstrap.servers = 192.168.0.110:9092
a1.sources.r1.kafka.topics=flume

## channel1
a1.channels.c1.type = file
#数据备份目录
a1.channels.c1.checkpointDir = /data/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /data/flume/behavior1/

## sink1
a1.sinks.k1.type = hdfs

#hdfs地址
a1.sinks.k1.hdfs.path = hdfs://192.168.0.109:9000/data/flume/log/flume_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log-
a1.sinks.k1.hdfs.round = false


a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0


## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

配置完成之后,我们启动这个flume去消费kafka里面的数据,进入到bin目录,执行

flume-ng agent --name a1 -f /data/flume/apache-flume-1.9.0-bin/conf/kafka-to-hdfs.conf -Dflume.root.logger=INFO,console
  • 1

执行完成之后,可以看见hdfs里面已经生成了指定的文件

在这里插入图片描述
自学大数据的路中,建了一个交流群,678513209。有转大数据的小伙伴,加群一起交流

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/504916
推荐阅读
相关标签
  

闽ICP备14008679号