赞
踩
这里实际上是调用kafka客户端来执行kafka消息数据写入的。这里模拟随机产生一系列数据,持续写入kafka,形成持续的消息流数据。
- <dependencies>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.12</artifactId>
- <version>3.0.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_2.12</artifactId>
- <version>3.0.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
- <version>3.1.0</version>
- </dependency>
- </dependencies>
- package com.demo
-
- import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
-
- import java.util.Properties
- import scala.collection.mutable.ArrayBuffer
- import scala.util.Random
-
- object MockerRealTime {
-
- /**
- * 模拟的数据
- *
- * 格式 :timestamp area city userid adid
- * 某个时间点 某个地区 某个城市 某个用户 某个广告
- */
- def generateMockData(): Array[String] = {
- val array: ArrayBuffer[String] = ArrayBuffer[String]()
- val CityRandomOpt = RandomOptions(RanOpt(CityInfo(1, "北京", "华北"), 30),
- RanOpt(CityInfo(2, "上海", "华东"), 30),
- RanOpt(CityInfo(3, "广州", "华南"), 10),
- RanOpt(CityInfo(4, "深圳", "华南"), 20),
- RanOpt(CityInfo(5, "天津", "华北"), 10))
- val random = new Random()
- // 模拟实时数据:
- // timestamp province city userid adid
- for (i <- 0 to 50) {
- val timestamp: Long = System.currentTimeMillis()
- val cityInfo: CityInfo = CityRandomOpt.getRandomOpt
- val city: String = cityInfo.city_name
- val area: String = cityInfo.area
- val adid: Int = 1 + random.nextInt(6)
- val userid: Int = 1 + random.nextInt(6)
- // 拼接实时数据
- array += timestamp + " " + area + " " + city + " " + userid + " " + adid
- }
- array.toArray
- }
-
- def createKafkaProducer(broker: String): KafkaProducer[String, String] = {
- // 创建配置对象
- val prop = new Properties()
- // 添加配置
- prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker)
- prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
- "org.apache.kafka.common.serialization.StringSerializer")
- prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
- "org.apache.kafka.common.serialization.StringSerializer")
- // 根据配置创建 Kafka 生产者
- new KafkaProducer[String, String](prop)
- }
-
- def main(args: Array[String]): Unit = {
- // 获取配置文件 config.properties 中的 Kafka 配置参数
- val config: Properties = PropertiesUtil.load("config.properties")
- val broker: String = config.getProperty("kafka.broker.list")
- val topic = "test"
- // 创建 Kafka 消费者
- val kafkaProducer: KafkaProducer[String, String] = createKafkaProducer(broker)
- while (true) {
- // 随机产生实时数据并通过 Kafka 生产者发送到 Kafka 集群中
- for (line <- generateMockData()) {
- kafkaProducer.send(new ProducerRecord[String, String](topic, line))
- println(line)
- }
- Thread.sleep(2000)
- }
- }
-
- }
kafka生产者参数配置主要由createKafkaProducer完成。
主要的配置内容时kafka的ip地址,端口号,topic以及key和value的序列化。
- # Kafka 配置
- kafka.broker.list=192.168.22.56:9092
- package com.demo
-
- import java.io.InputStreamReader
- import java.util.Properties
-
-
- object PropertiesUtil {
-
- def load(propertiesName:String): Properties ={
- val prop=new Properties()
- prop.load(new
- InputStreamReader(Thread.currentThread().getContextClassLoader.getResourceAsStream(propertiesName) , "UTF-8"))
- prop
- }
-
- }
- package com.demo
-
- import scala.collection.mutable.ListBuffer
- import scala.util.Random
-
- case class RanOpt[T](value: T, weight: Int)
-
-
- object RandomOptions {
-
- def apply[T](opts: RanOpt[T]*): RandomOptions[T] = {
- val randomOptions = new RandomOptions[T]()
- for (opt <- opts) {
- randomOptions.totalWeight += opt.weight
- for (i <- 1 to opt.weight) {
- randomOptions.optsBuffer += opt.value
- }
- }
- randomOptions
- }
-
- }
-
- class RandomOptions[T](opts: RanOpt[T]*) {
- var totalWeight = 0
- var optsBuffer = new ListBuffer[T]
-
- def getRandomOpt: T = {
- val randomNum: Int = new Random().nextInt(totalWeight)
- optsBuffer(randomNum)
- }
- }
- package com.demo
-
- /**
- *
- * 城市信息表
- *
- * @param city_id 城市 id
- * @param city_name 城市名称
- * @param area 城市所在大区
- */
- case class CityInfo (city_id:Long,
- city_name:String,
- area:String)
可以同时看到idea控制台和kafka的命令行消费者输出。
- 1645151518980 华南 深圳 6 6
- 1645151518980 华南 深圳 2 3
- 1645151518980 华南 深圳 4 6
- 1645151518980 华东 上海 3 6
- 1645151518980 华北 北京 2 4
- 1645151518980 华东 上海 6 2
- 1645151518980 华北 北京 2 1
kafka消息输出。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。