赞
踩
本人学习笔记,不提供基础知识讲解。
本文实现效果是:
监控hadoop01节点的/home/hadoop/logs/flume.log,当该文件有内容追加时,将追加内容发送到hadoop02的44444端口,
hadoop02节点监控到44444有消息时,将消息push到kafka集群的topic为flume-kafka下。
sparkstreaming进行流式计算单词数。
注意:
根据自己的情况对参数进行调整!!!
代码:
package com.jtv.sparkStreaming_kafka //集群方式 //本地方式 val sparkConf = new SparkConf().setAppName("sparkStreaming_kafka_direct_10_HA").setMaster("local[2]") |
执行流程:
1、首先在hadoop01节点上创建一个文件:/home/hadoop/logs/flume.log
2、启动spark,kafka,zookeeper,hdfs。
3、在hadoop02节点上运行:(avro-kafka.conf文件见后面代码)
flume-ng agent --conf conf --conf-file /home/hadoop/apps/flume/agentConf/avro-kafka.conf --name agent2 -Dflume.root.logger=INFO,console
4、在hadoop01节点上运行:(exec-avro.conf文件见后面代码)
flume-ng agent --conf conf --conf-file /home/hadoop/apps/flume/agentConf/exec-avro.conf --name agent1 -Dflume.root.logger=INFO,console
5、在kafka集群任意节点(例如:hadoop03)节点上启动kafka消费者,
kafka-console-consumer.sh --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 --from-beginning --topic flume-kafka
6、执行程序(本地方法、打jar包方法)
7、打jar包方式,需要一些依赖包,放在/home/hadoop/lib/目录下,自行下载。
8、cd /home/hadoop/logs目录下:echo aa bb c >> flume.log
1、本地执行:
Program arguments:
"/sparkStreaming/direct_kafka_10_HA/" "hadoop01:9092,hadoop02:9092,hadoop03:9092" "flume-kafka" 4
2、打jar包到集群运行:
spark-submit命令:
spark-submit \ --class com.jtv.sparkStreaming_kafka.sparkStreaming_kafka_direct_10_HA \ --master spark://hadoop02:7077,hadoop03:7077 \ --driver-memory 512m \ --total-executor-cores 3 \ --executor-memory 512m \ --supervise \ --jars /home/hadoop/lib/spark-streaming-kafka-0-10_2.11-2.3.2.jar,\ /home/hadoop/lib/kafka-clients-2.1.1.jar,\ /home/hadoop/lib/metrics-core-2.2.0.jar,\ /home/hadoop/lib/spark-streaming_2.11-2.3.2.jar,\ /home/hadoop/lib/zkclient-0.3.jar \ /home/hadoop/localFile/original-SparkCore-1.0-SNAPSHOT.jar \ /sparkStreaming/direct_kafka_10_HA/ \ hadoop01:9092,hadoop02:9092,hadoop03:9092 \ flume-kafka \ 4 |
效果:
avro-kafka.conf
# The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'agent'
agent2.sources = r2 agent2.channels = c2 agent2.sinks = k2
#define sources agent2.sources.r2.type = avro agent2.sources.r2.bind = hadoop02 agent2.sources.r2.port = 44444
#define channels agent2.channels.c2.type = memory agent2.channels.c2.capacity = 1000 agent2.channels.c2.transactionCapacity = 100
#define sink agent2.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink agent2.sinks.k2.brokerList = hadoop01:9092,hadoop02:9092,hadoop03:9092 agent2.sinks.k2.topic = flume-kafka agent2.sinks.k2.batchSize = 4 agent2.sinks.k2.requiredAcks = 1
#bind sources and sink to channel agent2.sources.r2.channels = c2 agent2.sinks.k2.channel = c2 |
exec-avro.conf
# The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'agent'
agent1.sources = r1 agent1.channels = c1 agent1.sinks = k1
#define sources agent1.sources.r1.type = exec agent1.sources.r1.command = tail -F /home/hadoop/logs/flume.log
#define channels agent1.channels.c1.type = memory agent1.channels.c1.capacity = 1000 agent1.channels.c1.transactionCapacity = 100
#define sink agent1.sinks.k1.type = avro agent1.sinks.k1.hostname = hadoop02 agent1.sinks.k1.port = 44444
#bind sources and sink to channel agent1.sources.r1.channels = c1 agent1.sinks.k1.channel = c1 |
pom.xml:(spark所用到的全部jar,可自行删减)
<?xml version="1.0" encoding="UTF-8"?> |
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。