赞
踩
数据流 :数据流通常被视为一个随时间延续而无限增长的动态数据集合,是一组顺序、大量、快速、连续到达的数据序列。通过对流数据处理,可以进行卫星云图监测、股市走向分析、网络攻击判断、传感器实时信号分析。
https://www.apache.org/dyn/closer.lua/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz
通过xshell把安装包上传虚拟机
解压文件,修改文件名称
- 1.sudo tar -zxvf apache-flume-1.7.0-bin.tar.gz -C /opt
-
- 2.sudo mv ./apache-flume-1.7.0-bin ./flume
-
- 3.sudo chown -R root:root ./flume
输入:
sudo vim ~/.bashrc
在文件里面添加
(根据你自己的安装路径)
- export JAVA_HOME=/opt/jdk-1.8;
-
- export FLUME_HOME=/opt/flume
-
- export FLUME_CONF_DIR=$FLUME_HOME/conf
-
- export PATH=$PATH:$FLUME_HOME/bin
source ~/.bashrc
- cd /opt/flume/conf
-
- cp ./flume-env.sh.template ./flume-env.sh
修改flume-env.sh文件
在文件开头增加一行,设置JAVA_HOME
- vim flume-env.sh
-
-
- export JAVA_HOME=/opt/jdk-1.8;
/opt/flume/bin/flume-ng version
Avro可以发送一个给定的文件给Flume,Avro 源使用AVRO RPC机制。请对Flume的相关配置文件进行设置,从而可以实现如下功能:在一个终端中新建一个文件helloworld.txt(里面包含一行文本“Hello World”),在另外一个终端中启动Flume以后,可以把helloworld.txt中的文本内容显示出来。
cd /opt/flume/conf
vim avro.conf
- a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
-
- #定制source,绑定channel、主机以及端口
- a1.sources.r1.type = avro
- a1.sources.r1.channels = c1
- a1.sources.r1.bind = 0.0.0.0
- a1.sources.r1.port = 4141
-
- #描述并配置sinks组件
- a1.sinks.k1.type = logger
-
- #描述并配置channels
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 1000
- a1.channels.c1.transactionCapacity = 100
-
- #将sources和sink通过同一个channel连接绑定
- a1.sources.r1.channels = c1
- a1.sinks.k1.channel = c1
上面Avro Source参数说明如下:
Avro Source的别名是avro,也可以使用完整类别名称org.apache.flume.source.AvroSource,因此,上面有一行设置是a1.sources.r1.type = avro,表示数据源的类型是avro。
bind绑定的ip地址或主机名,使用0.0.0.0表示绑定机器所有的接口。
a1.sources.r1.bind = 0.0.0.0,就表示绑定机器所有的接口。
port表示绑定的端口。
a1.sources.r1.port = 4141,表示绑定的端口是4141。
a1.sinks.k1.type = logger,表示sinks的类型是logger。
这个终端不要关闭
- cd /opt/flume
-
- echo “Hello World”>> ./helloworld.txt
/opt/flume/bin/flume-ng avro-client --conf conf -H localhost -p 4141 -F /opt/flume/helloworld.txt
执行之后,我们就可以在前面不让关闭的那个终端看到Hello World了
在一个Linux终端(这里称为“Flume终端”)中,启动Flume,在另一个终端(这里称为“Telnet终端”)中,输入命令“telnet localhost 44444”,然后,在Telnet终端中输入任何字符,让这些字符可以顺利地在Flume终端中显示出来。
- cd /opt/flume
-
- vim netcat.conf
- #/usr/local/flume/conf/netcat.conf
- # Name the components on this agent
- a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
-
- # Describe/configure the source
- a1.sources.r1.type = netcat
- a1.sources.r1.bind = localhost
- a1.sources.r1.port = 44444
- #同上,记住该端口名
-
- # Describe the sink
- a1.sinks.k1.type = logger
-
- # Use a channel which buffers events in memory
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 1000
- a1.channels.c1.transactionCapacity = 100
-
- # Bind the source and sink to the channel
- a1.sources.r1.channels = c1
- a1.sinks.k1.channel = c1
cd /opt/flume/conf
不要关闭这个终端
/opt/flume/bin/flume-ng agent --conf ./conf --conf-file netcat.conf --name a1 -Dflume.root.logger=INFO,console
输入:
telnet localhost 44444
或者
nc localhost 44444
#在这个终端输入字符串就可以显示在前面那个终端里了,但是中文是不支持的,显示长度也有限
如果报出下面的错误:
解决方法:
安装telnet插件
yum install telnet
在前面的终端查看数据
Flume是非常流行的日志采集系统,可以作为Spark Streaming的高级数据源。请把Flume Source设置为netcat类型,从终端上不断给Flume Source发送各种消息,Flume把消息汇集到Sink,这里把Sink类型设置为avro,由Sink把消息推送给Spark Streaming,由自己编写的Spark Streaming应用程序对消息进行处理。
- cd /opt/flume/conf
-
- vim flume-to-spark.conf
把以下内容输入文件
- #flume-to-spark.conf: A single-node Flume configuration
- # Name the components on this agent
- a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
-
- # Describe/configure the source
- a1.sources.r1.type = netcat
- a1.sources.r1.bind = localhost
- a1.sources.r1.port = 33333
-
- # Describe the sink
- a1.sinks.k1.type = avro
- a1.sinks.k1.hostname = localhost
- a1.sinks.k1.port =44444
-
- # Use a channel which buffers events in memory
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 1000000
- a1.channels.c1.transactionCapacity = 1000000
-
- # Bind the source and sink to the channel
- a1.sources.r1.channels = c1
- a1.sinks.k1.channel = c1
#说明:
1.Flume suorce类为netcat,绑定到localhost的33333端口,消息可以通过telnet localhost 33333 发送到flume suorce
2.Flume Sink类为avro,绑定44444端口,flume sink通过localhost 44444端口把消息发送出来。而spark streaming程序一直监听44444端口。
#注意!!先不要启动Flume agent,因为44444端口还没打开,sink的消息无处可去,44444端口由spark streaming程序打开。
下载spark-streaming-flume_2.11-2.3.4.jar
官网:
https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume
镜像网站(下载速度较快)
https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-flume_2.11/
把这个jar文件放到/opt/spark/jars/flume目录下
把文件拖进去就行了
- cd /usr/local/spark/jars
-
- mkdir flume
vim spark-env.sh
添加以下内容
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath):$(/opt/hbase/bin/hbase classpath):/usr/local/spark/examples/jars/*:/opt/spark/jars/kafka/*:/usr/local/kafka/libs/*:/opt/spark/jars/flume/*:/opt/flume/lib/*
- cd /opt/spark-2.4.0
-
- mkdir test
-
- cd test
-
- vim FlumeEventCount.py
- from __future__ import print_function
-
- import sys
-
- from pyspark import SparkContext
- from pyspark.streaming import StreamingContext
- from pyspark.streaming.flume import FlumeUtils
- import pyspark
- if __name__ == "__main__":
- if len(sys.argv) != 3:
- print("Usage: flume_wordcount.py <hostname> <port>", file=sys.stderr)
- exit(-1)
-
- sc = SparkContext(appName="FlumeEventCount")
- ssc = StreamingContext(sc, 2)
-
- hostname= sys.argv[1]
- port = int(sys.argv[2])
- stream = FlumeUtils.createStream(ssc, hostname, port,pyspark.StorageLevel.MEMORY_AND_DISK_SER_2)
- stream.count().map(lambda cnt : "Recieve " + str(cnt) +" Flume events!!!!").pprint()
-
- ssc.start()
- ssc.awaitTermination()
注意:可能需要安装pyspark,命令为:
pip3 install pyspark
./bin/spark-submit --driver-class-path /opt/spark-2.4.0/jars/*:/opt/spark-2.4.0/jars/flume/* ./test/FlumeEventCount.py localhost 44444
启动一个新的终端
cd /opt/flume
bin/flume-ng agent --conf ./conf --conf-file ./conf/flume-to-spark.conf --name a1 -Dflume.root.logger=INFO,console
启动一个新的终端
telnet localhost 33333
现在你可以在最后这个终端里输入一些字符了。在你输入字符后可以看到第一个终端会显示如下的信息
结果:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。