当前位置:   article > 正文

Flume:kafka+flume 对接数据_flume+kafka+flume

flume+kafka+flume

1、启动集群

启动zookeeper,master,kafka,flume
# 1、三个节点
/usr/zookeeper/zookeeper-3.4.10/bin/zkServer.sh start 
/usr/zookeeper/zookeeper-3.4.10/bin/zkServer.sh status

# 2、master节点 启动hadoop
/usr/hadoop/hadoop-2.7.3/sbin/start-all.sh

# 3、启动kafka(三个节点)
cd /usr/kafka/kafka_2.11-2.4.0/
bin/kafka-server-start.sh config/server.properties

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

当我们启动了zookeeper、hadoop、kafka。

kafka的安装配置可参考链接Kafka集群分布式部署与测试


2、kafka创建topic

# 创建topic--badou_data
kafka-topics.sh --create --topic badou_data --partitions 3 --replication-factor 2 --zookeeper master:2181,slave1:2181,slave2:2181

# 消费badou_data
./bin/kafka-console-consumer.sh --from-beginning --topic badou_data --bootstrap-server master:9092,slave1:9092,slave2:9092
  • 1
  • 2
  • 3
  • 4
  • 5

3、编辑conf

cd /usr/flume/flume-1.7.0
vi conf/flume_kafka.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /usr/flume/flume-1.7.0/day6/flume_exec_test.txt

# a1.sinks.k1.type = logger
# 设置kafka接收器
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
# 设置kafka的broker地址和端口号
a1.sinks.k1.brokerList=master:9092
# 设置Kafka的topic
a1.sinks.k1.topic=badou_data
# 设置序列化的方式
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder

# use a channel which buffers events in memory
a1.channels.c1.type=memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 1000

# Bind the source and sink to the channel
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

设置接受sink地址master:9092,启动Flume。

cd /usr/flume/flume-1.7.0

./bin/flume-ng agent --conf conf --conf-file ./conf/flume_kafka.conf -name a1 -Dflume.root.logger=INFO,console
  • 1
  • 2
  • 3

4、清空日志文件、并执行python

cd /usr/flume/flume-1.7.0/day6
echo '' > flume_exec_test.txt
  • 1
  • 2

执行python flume_data_write.py,模拟将后端日志写入到日志文件中 python flume_data_write.py

import random
import time
import pandas as pd
import json

writeFileName="/usr/flume/flume-1.7.0/day6/flume_exec_test.txt"
cols = ["order_id","user_id","eval_set","order_number","order_dow","hour","day"]
df1 = pd.read_csv('/root/day3/orders.csv')
df1.columns = cols
df = df1.fillna(0)
with open(writeFileName,'a+') as wf:
        for idx,row in df.iterrows():
                d = {}
                for col in cols:
                        d[col]=row[col]
                js = json.dumps(d)
                wf.write(js+'\n')
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

在这里插入图片描述
我们会发现,python的数据源源从kafka消费到 flume_exec_test.txt。
在这里插入图片描述
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/692325
推荐阅读
相关标签
  

闽ICP备14008679号