当前位置:   article > 正文

spark stream 3.0.0 scala版本写入kafka消息数据_scala往kafka写数据

scala往kafka写数据

这里实际上是调用kafka客户端来执行kafka消息数据写入的。这里模拟随机产生一系列数据,持续写入kafka,形成持续的消息流数据。

1. 添加依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.spark</groupId>
  4. <artifactId>spark-core_2.12</artifactId>
  5. <version>3.0.0</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.spark</groupId>
  9. <artifactId>spark-streaming_2.12</artifactId>
  10. <version>3.0.0</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.apache.spark</groupId>
  14. <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
  15. <version>3.1.0</version>
  16. </dependency>
  17. </dependencies>

2. 测试代码

  1. package com.demo
  2. import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
  3. import java.util.Properties
  4. import scala.collection.mutable.ArrayBuffer
  5. import scala.util.Random
  6. object MockerRealTime {
  7. /**
  8. * 模拟的数据
  9. *
  10. * 格式 :timestamp area city userid adid
  11. * 某个时间点 某个地区 某个城市 某个用户 某个广告
  12. */
  13. def generateMockData(): Array[String] = {
  14. val array: ArrayBuffer[String] = ArrayBuffer[String]()
  15. val CityRandomOpt = RandomOptions(RanOpt(CityInfo(1, "北京", "华北"), 30),
  16. RanOpt(CityInfo(2, "上海", "华东"), 30),
  17. RanOpt(CityInfo(3, "广州", "华南"), 10),
  18. RanOpt(CityInfo(4, "深圳", "华南"), 20),
  19. RanOpt(CityInfo(5, "天津", "华北"), 10))
  20. val random = new Random()
  21. // 模拟实时数据:
  22. // timestamp province city userid adid
  23. for (i <- 0 to 50) {
  24. val timestamp: Long = System.currentTimeMillis()
  25. val cityInfo: CityInfo = CityRandomOpt.getRandomOpt
  26. val city: String = cityInfo.city_name
  27. val area: String = cityInfo.area
  28. val adid: Int = 1 + random.nextInt(6)
  29. val userid: Int = 1 + random.nextInt(6)
  30. // 拼接实时数据
  31. array += timestamp + " " + area + " " + city + " " + userid + " " + adid
  32. }
  33. array.toArray
  34. }
  35. def createKafkaProducer(broker: String): KafkaProducer[String, String] = {
  36. // 创建配置对象
  37. val prop = new Properties()
  38. // 添加配置
  39. prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker)
  40. prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  41. "org.apache.kafka.common.serialization.StringSerializer")
  42. prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  43. "org.apache.kafka.common.serialization.StringSerializer")
  44. // 根据配置创建 Kafka 生产者
  45. new KafkaProducer[String, String](prop)
  46. }
  47. def main(args: Array[String]): Unit = {
  48. // 获取配置文件 config.properties 中的 Kafka 配置参数
  49. val config: Properties = PropertiesUtil.load("config.properties")
  50. val broker: String = config.getProperty("kafka.broker.list")
  51. val topic = "test"
  52. // 创建 Kafka 消费者
  53. val kafkaProducer: KafkaProducer[String, String] = createKafkaProducer(broker)
  54. while (true) {
  55. // 随机产生实时数据并通过 Kafka 生产者发送到 Kafka 集群中
  56. for (line <- generateMockData()) {
  57. kafkaProducer.send(new ProducerRecord[String, String](topic, line))
  58. println(line)
  59. }
  60. Thread.sleep(2000)
  61. }
  62. }
  63. }

kafka生产者参数配置主要由createKafkaProducer完成。

主要的配置内容时kafka的ip地址,端口号,topic以及key和value的序列化。

3. kafka配置(config.properties)

  1. # Kafka 配置
  2. kafka.broker.list=192.168.22.56:9092

4. 辅助代码(PropertiesUtil.scala)

  1. package com.demo
  2. import java.io.InputStreamReader
  3. import java.util.Properties
  4. object PropertiesUtil {
  5. def load(propertiesName:String): Properties ={
  6. val prop=new Properties()
  7. prop.load(new
  8. InputStreamReader(Thread.currentThread().getContextClassLoader.getResourceAsStream(propertiesName) , "UTF-8"))
  9. prop
  10. }
  11. }

5. 辅助代码(RandomOptions.scala)

  1. package com.demo
  2. import scala.collection.mutable.ListBuffer
  3. import scala.util.Random
  4. case class RanOpt[T](value: T, weight: Int)
  5. object RandomOptions {
  6. def apply[T](opts: RanOpt[T]*): RandomOptions[T] = {
  7. val randomOptions = new RandomOptions[T]()
  8. for (opt <- opts) {
  9. randomOptions.totalWeight += opt.weight
  10. for (i <- 1 to opt.weight) {
  11. randomOptions.optsBuffer += opt.value
  12. }
  13. }
  14. randomOptions
  15. }
  16. }
  17. class RandomOptions[T](opts: RanOpt[T]*) {
  18. var totalWeight = 0
  19. var optsBuffer = new ListBuffer[T]
  20. def getRandomOpt: T = {
  21. val randomNum: Int = new Random().nextInt(totalWeight)
  22. optsBuffer(randomNum)
  23. }
  24. }

6. 辅助代码(CityInfo.scala)

  1. package com.demo
  2. /**
  3. *
  4. * 城市信息表
  5. *
  6. * @param city_id 城市 id
  7. * @param city_name 城市名称
  8. * @param area 城市所在大区
  9. */
  10. case class CityInfo (city_id:Long,
  11. city_name:String,
  12. area:String)

7. 执行程序测试

可以同时看到idea控制台和kafka的命令行消费者输出。

  1. 1645151518980 华南 深圳 6 6
  2. 1645151518980 华南 深圳 2 3
  3. 1645151518980 华南 深圳 4 6
  4. 1645151518980 华东 上海 3 6
  5. 1645151518980 华北 北京 2 4
  6. 1645151518980 华东 上海 6 2
  7. 1645151518980 华北 北京 2 1

kafka消息输出。

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/606378
推荐阅读
相关标签
  

闽ICP备14008679号