当前位置:   article > 正文

maxwell+kafka+Spark Streaming构建MySQL Binlog日志采集实时处理方案_sparkstreaming maxwell kafka

sparkstreaming maxwell kafka

需求分析说明

根据业务场景需要实时处理日志进行实时图表展示(Highchart等),如果进行对数据库频繁抽取会对数据库服务器造成较大的压力,相应的web服务也会受到很大的影响;因此,抽取数据库的日志既能够很大的减轻数据库服务的压力,又能够解决实时处理实时展示图表的需求。本博客MySQL Binlog日志采集为例提供解决方案

一、部署安装maxwell采集器 

  1) 首先查看mysql是否已经开启binlog

 2) 下载maxwell

组件下载地址:https://download.csdn.net/download/qq_16220645/11396123
解压tar  -zxvf  maxwell-1.17.1.tar.gz

3)  给mysql授权(只针对于maxwell库的操作)
其中user01为数据库用户名 666666为数据库密码
GRANT ALL on maxwell.* to 'user01'@'%' identified by '666666';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'user01'@'%';

4)  执行maxwell命令行(注:maxwell默认是把监听的mysql的binlog日志发送到kafka的主题叫maxwell的topic上的)

具体的demo 如下:
bin/maxwell --user='user01' --password='666666' --host='127.0.0.1' --include_dbs=db1 --include_tables=table1,table2 --producer=kafka --kafka.bootstrap.servers=d1:9092,d2:9092,d3:9092 --kafka_topic test  
注:--user是数据库用户名 --password数据库密码 --host表示安装mysql的服务器地址(可以和安装maxwell的服务器不在同一台) --include_dbs表示要筛选具体的数据库 --include_tables表示筛选具体库下的具体表 --kafka.bootstrap.servers表示kafka的Ip地址和端口号 --kafka_topic kafka表示kafka对应的topic

二、kafka的相关配置(注:d1,d2,d3为每台服务器的hostname,kafka里的配置文件端口号要和命令行里给的端口号一致)

  1) 启动kafka命令行(这里以后台进程方式运行)
   nohup bin/kafka-server-start.sh config/server.properties &

  2)  创建kafka的topic为test主题
   bin/kafka-topics.sh --zookeeper d1:2181,d2:2181,d3:2181 --create --topic test --partitions 20 --replication-factor 1 

  3) 启动消费者窗口
   bin/kafka-console-consumer.sh --bootstrap-server d1:9092,d2:9092,d3:9092  --topic test

三、spark streaming结合kafka

注:本demo的spark版本为2.2.1 kafka的版本为0.10.0,请注意spark版本与kafka版本对应,具体可参考spark官方网站的说明http://spark.apache.org/docs/2.2.1/structured-streaming-kafka-integration.html

  1. package com.baison.realTimeCalculation
  2. import java.lang
  3. import org.apache.kafka.clients.consumer.ConsumerRecord
  4. import org.apache.kafka.common.serialization.StringDeserializer
  5. import org.apache.spark.SparkConf
  6. import org.apache.spark.streaming.dstream.DStream
  7. import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
  8. import org.apache.spark.streaming.kafka010.KafkaUtils
  9. import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
  10. import org.apache.spark.streaming.{Durations, StreamingContext}
  11. import scala.util.Try
  12. object IposRealTime {
  13. def main(args: Array[String]): Unit = {
  14. val conf=new SparkConf().setAppName("IposRealTime")
  15. .set("spark.streaming.blockInterval", "50")//生成block的间隔
  16. .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")//用kryo序列化
  17. .set("spark.streaming.backpressure.enabled","true") //数据的反压机制
  18. .set("spark.task.maxFailures","10")//task最大失败次数
  19. .set("spark.streaming.kafka.maxRetries","5") //kafka的最大重试次数
  20. .set("spark.streaming.stopGracefullyOnShutdown","true")//程序优雅关闭
  21. .set("spark.io.compression.codec","snappy") //压缩模式
  22. .set("spark.rdd.compress","true") //压缩RDD的分区
  23. .registerKryoClasses(Array(classOf[EveryWeekForm],classOf[HotGoodsForm],classOf[MemberFlowForm],
  24. classOf[TodayYeJiForm]))
  25. val ssc=new StreamingContext(conf,Durations.seconds(2))
  26. //kafka的配置
  27. val kafkaParam=Map[String,Object](
  28. Constants.KAFKA_METADATA_BROKER_LIST->ConfigurationManager.getProperty(Constants.KAFKA_METADATA_BROKER_LIST),
  29. "key.deserializer"->classOf[StringDeserializer],
  30. "value.deserializer"->classOf[StringDeserializer],
  31. Constants.KAFKA_GROUP_ID->ConfigurationManager.getProperty(Constants.KAFKA_GROUP_ID),
  32. Constants.KAFKA_AUTO_OFFSET_RESET->ConfigurationManager.getProperty(Constants.KAFKA_AUTO_OFFSET_RESET),//从该topic最新位置开始读取数据
  33. "enable.auto.commit"->(false:lang.Boolean),
  34. Constants.SESSION_TIMEOUT_MS->ConfigurationManager.getProperty(Constants.SESSION_TIMEOUT_MS) //最大程度的确保Spark集群和kafka连接的稳定性
  35. )
  36. val topics=List(ConfigurationManager.getProperty(Constants.KAFKA_TOPICS)).toSet
  37. val inputDStream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParam)).repartition(50)
  38. ssc.checkpoint(Constants.SPARK_CHECKPOINT_DATA)
  39. //此处进行处理数据操作
  40. ssc.start()
  41. ssc.awaitTermination()
  42. }

如有错误之处,请指正,不胜感激。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/639904
推荐阅读
相关标签
  

闽ICP备14008679号