当前位置:   article > 正文

SparkStreaming(14):log4j日志-flume-kafka-SparkStreaming的整合_sparkstreaming任务日志级别在log4j那个项

sparkstreaming任务日志级别在log4j那个项

一、功能实现

模拟log4j的日志生产,将日志输出到flume服务器。然后,通过flume将日志信息输出到kafka,进而Streaming可以从kafka获得日志,并且进行简单的处理。

二、步骤

1.目的:

使用log4j将日志输按照一定格式输出,并且传递给flume服务器特定端口接收数据。然后使用kafka接收,并使用streaming处理。

2.产生log4j日志:


(1)在IDEA的test文件夹下面建立java测试文件夹,并且设置为测试代码!

(2)指定log4j日志格式,并且和flume对接

 -》 新加test的resources文件夹,新建log4j.properties

  1. log4j.rootCategory=INFO,stdout,flume
  2. #...log4j输出格式
  3. log4j.appender.stdout=org.apache.log4j.ConsoleAppender
  4. log4j.appender.stdout.target=System.out
  5. log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
  6. log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} [%t] [%C] [%p] - %m%n
  7. #...log4j输出到flume位置
  8. log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
  9. log4j.appender.flume.Hostname = hadoop
  10. log4j.appender.flume.Port = 41414
  11. log4j.appender.flume.UnsafeMode = true

实现功能:(b)指定日志生产格式,(b)指定输出到特定的flume服务器端口,即与flume进行关联

【参考官网:http://flume.apache.org/FlumeUserGuide.html搜索Log4J Appender】

  1. 日志格式:
  2. 2018-09-23 12:13:52 [main] [LoggerGenerator] [INFO] - current value is :0
  3. 2018-09-23 12:13:54 [main] [LoggerGenerator] [INFO] - current value is :1
  4. 2018-09-23 12:13:55 [main] [LoggerGenerator] [INFO] - current value is :2

(3)添加依赖

  1. <dependency>
  2. <groupId>org.apache.flume.flume-ng-clients</groupId>
  3. <artifactId>flume-ng-log4jappender</artifactId>
  4. <version>1.6.0</version>
  5. </dependency>

(4)重新运行java程序LoggerGenerator 

  1. import org.apache.log4j.Logger;
  2. public class LoggerGenerator {
  3. private static Logger logger= Logger.getLogger(LoggerGenerator.class.getName());
  4. public static void main(String[] args) throws Exception{
  5. int index=0;
  6. while (true){
  7. Thread.sleep(100);
  8. logger.info("value is :"+ index++);
  9. }
  10. }
  11. }

 

3.flume接收日志配置

(1)flume日志文件streaming2.conf

  1. agent1.sources=avro-source
  2. agent1.channels=logger-channel
  3. agent1.sinks=kafka-sink
  4. #define source
  5. agent1.sources.avro-source.type=avro
  6. agent1.sources.avro-source.bind=0.0.0.0
  7. agent1.sources.avro-source.port=41414
  8. #define channel
  9. agent1.channels.logger-channel.type=memory
  10. #define sink
  11. agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
  12. agent1.sinks.kafka-sink.topic = streamingtopic
  13. agent1.sinks.kafka-sink.brokerList = hadoop:9092
  14. agent1.sinks.kafka-sink.requiredAcks = 1
  15. agent1.sinks.kafka-sink.batchSize = 20
  16. agent1.sources.avro-source.channels=logger-channel
  17. agent1.sinks.kafka-sink.channel=logger-channel

(2)启动flume【暂时不启动,因为kafka还没有启动,启动后不会报错,但是一旦有数据,就会报错!】

bin/flume-ng agent --conf conf --conf-file conf/streaming2.conf --name agent1 -Dflume.root.logger=INFO,console

 

4.kafka接收flume传递的数据

(1)启动zookeeper

(2)启动kafka server

bin/kafka-server-start.sh -daemon config/server.properties 

(3)创建topic

bin/kafka-topics.sh --create --topic streamingtopic --zookeeper hadoop:2181/kafka08 --partitions 1 --replication-factor 1

(4)进行简单测试,验证从日志到kafka的流程

-》打开flume

bin/flume-ng agent --conf conf --conf-file conf/streaming2.conf --name agent1 -Dflume.root.logger=INFO,console

-》开启kafka消费者

bin/kafka-console-consumer.sh --topic streamingtopic --zookeeper hadoop:2181/kafka08

(经测试成功!)

 

5.spark streaming代码处理从kafka得到的信息

(1)代码

  1. package Spark
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.streaming.dstream.ReceiverInputDStream
  4. import org.apache.spark.streaming.kafka.KafkaUtils
  5. import org.apache.spark.streaming.{Seconds, StreamingContext}
  6. /**
  7. * Streaming和kafka对接
  8. */
  9. object KafkaStreamingApp_product {
  10. def main(args: Array[String]): Unit = {
  11. if(args.length!=4){
  12. System.err.println("Usage: KafkaStreamingApp_product <zkQuorum><group><topics><numThreads>")
  13. }
  14. val Array(zkQuorum,group,topics,numThreads)=args
  15. //因为这个是生产环境,所以注释
  16. val sparkConf=new SparkConf().setAppName("KafkaStreamingApp_product")
  17. .setMaster("local[2]")
  18. val ssc=new StreamingContext(sparkConf,Seconds(5))
  19. val topicMap=topics.split(",").map((_,numThreads.toInt)).toMap
  20. //TODO: Spark streaming如何对接kafka
  21. //参考源码createStream
  22. val messages: ReceiverInputDStream[(String, String)] =KafkaUtils.createStream(ssc,zkQuorum,group,topicMap)
  23. //取第2
  24. // messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
  25. messages.map(_._2).count().print()
  26. ssc.start()
  27. ssc.awaitTermination()
  28. }
  29. }

(2)运行环境配置,添加参数

hadoop:2181/kafka08 test streamingtopic 1

三、测试

1.启动zk

2.启动flume

bin/flume-ng agent --conf conf --conf-file conf/streaming2.conf --name agent1 -Dflume.root.logger=INFO,console

3.启动kafka服务器

bin/kafka-server-start.sh -daemon config/server.properties 

4.启动日志生产类LoggerGenerator

5.启动SparkStreaming类KafkaStreamingApp_product

 

(经测试,成功!)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/389468
推荐阅读
相关标签
  

闽ICP备14008679号