赞
踩
模拟log4j的日志生产,将日志输出到flume服务器。然后,通过flume将日志信息输出到kafka,进而Streaming可以从kafka获得日志,并且进行简单的处理。
使用log4j将日志输按照一定格式输出,并且传递给flume服务器特定端口接收数据。然后使用kafka接收,并使用streaming处理。
(1)在IDEA的test文件夹下面建立java测试文件夹,并且设置为测试代码!
(2)指定log4j日志格式,并且和flume对接
-》 新加test的resources文件夹,新建log4j.properties
- log4j.rootCategory=INFO,stdout,flume
- #...log4j输出格式
- log4j.appender.stdout=org.apache.log4j.ConsoleAppender
- log4j.appender.stdout.target=System.out
- log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
- log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} [%t] [%C] [%p] - %m%n
-
-
- #...log4j输出到flume位置
- log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
- log4j.appender.flume.Hostname = hadoop
- log4j.appender.flume.Port = 41414
- log4j.appender.flume.UnsafeMode = true
实现功能:(b)指定日志生产格式,(b)指定输出到特定的flume服务器端口,即与flume进行关联
【参考官网:http://flume.apache.org/FlumeUserGuide.html搜索Log4J Appender】
- 日志格式:
- 2018-09-23 12:13:52 [main] [LoggerGenerator] [INFO] - current value is :0
- 2018-09-23 12:13:54 [main] [LoggerGenerator] [INFO] - current value is :1
- 2018-09-23 12:13:55 [main] [LoggerGenerator] [INFO] - current value is :2
(3)添加依赖
- <dependency>
- <groupId>org.apache.flume.flume-ng-clients</groupId>
- <artifactId>flume-ng-log4jappender</artifactId>
- <version>1.6.0</version>
- </dependency>
(4)重新运行java程序LoggerGenerator
- import org.apache.log4j.Logger;
-
- public class LoggerGenerator {
- private static Logger logger= Logger.getLogger(LoggerGenerator.class.getName());
-
- public static void main(String[] args) throws Exception{
- int index=0;
- while (true){
- Thread.sleep(100);
- logger.info("value is :"+ index++);
- }
- }
- }
(1)flume日志文件streaming2.conf
- agent1.sources=avro-source
- agent1.channels=logger-channel
- agent1.sinks=kafka-sink
-
- #define source
- agent1.sources.avro-source.type=avro
- agent1.sources.avro-source.bind=0.0.0.0
- agent1.sources.avro-source.port=41414
-
- #define channel
- agent1.channels.logger-channel.type=memory
-
- #define sink
- agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
- agent1.sinks.kafka-sink.topic = streamingtopic
- agent1.sinks.kafka-sink.brokerList = hadoop:9092
- agent1.sinks.kafka-sink.requiredAcks = 1
- agent1.sinks.kafka-sink.batchSize = 20
-
- agent1.sources.avro-source.channels=logger-channel
- agent1.sinks.kafka-sink.channel=logger-channel
(2)启动flume【暂时不启动,因为kafka还没有启动,启动后不会报错,但是一旦有数据,就会报错!】
bin/flume-ng agent --conf conf --conf-file conf/streaming2.conf --name agent1 -Dflume.root.logger=INFO,console
(1)启动zookeeper
(2)启动kafka server
bin/kafka-server-start.sh -daemon config/server.properties
(3)创建topic
bin/kafka-topics.sh --create --topic streamingtopic --zookeeper hadoop:2181/kafka08 --partitions 1 --replication-factor 1
(4)进行简单测试,验证从日志到kafka的流程
-》打开flume
bin/flume-ng agent --conf conf --conf-file conf/streaming2.conf --name agent1 -Dflume.root.logger=INFO,console
-》开启kafka消费者
bin/kafka-console-consumer.sh --topic streamingtopic --zookeeper hadoop:2181/kafka08
(经测试成功!)
(1)代码
- package Spark
-
- import org.apache.spark.SparkConf
- import org.apache.spark.streaming.dstream.ReceiverInputDStream
- import org.apache.spark.streaming.kafka.KafkaUtils
- import org.apache.spark.streaming.{Seconds, StreamingContext}
-
-
- /**
- * Streaming和kafka对接
- */
- object KafkaStreamingApp_product {
- def main(args: Array[String]): Unit = {
- if(args.length!=4){
- System.err.println("Usage: KafkaStreamingApp_product <zkQuorum><group><topics><numThreads>")
- }
-
- val Array(zkQuorum,group,topics,numThreads)=args
- //因为这个是生产环境,所以注释
- val sparkConf=new SparkConf().setAppName("KafkaStreamingApp_product")
- .setMaster("local[2]")
-
- val ssc=new StreamingContext(sparkConf,Seconds(5))
-
- val topicMap=topics.split(",").map((_,numThreads.toInt)).toMap
- //TODO: Spark streaming如何对接kafka
- //参考源码createStream
- val messages: ReceiverInputDStream[(String, String)] =KafkaUtils.createStream(ssc,zkQuorum,group,topicMap)
- //取第2个
- // messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
- messages.map(_._2).count().print()
-
-
- ssc.start()
- ssc.awaitTermination()
- }
- }
(2)运行环境配置,添加参数
hadoop:2181/kafka08 test streamingtopic 1
1.启动zk
2.启动flume
bin/flume-ng agent --conf conf --conf-file conf/streaming2.conf --name agent1 -Dflume.root.logger=INFO,console
3.启动kafka服务器
bin/kafka-server-start.sh -daemon config/server.properties
4.启动日志生产类LoggerGenerator
5.启动SparkStreaming类KafkaStreamingApp_product
(经测试,成功!)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。