当前位置:   article > 正文

pyspark之Structured Streaming kafka_pyspark structured streaming kafka

pyspark structured streaming kafka
  1. #kafka数据生成脚本 spark_ss_kafka_producer.py
  2. import random
  3. import string
  4. import time
  5. from kafka import KafkaProducer
  6. if __name__ == '__main__':
  7. producer = KafkaProducer(bootstrap_servers=['master:9092','slave1:9092','slave2:9092'])
  8. while True:
  9. s2 = (random.choice(string.ascii_lowercase) for _ in range(2))
  10. word = ''.join(s2)
  11. value = bytearray(word,'utf-8')
  12. producer.send('test',value=value).get(timeout=10)
  13. time.sleep(0.1)
  14. #消费端spark消费kafka数据 spark_ss_kafka_consumer.py
  15. from pyspark.sql import SparkSession
  16. if __name__ == '__main__':
  17. spark = SparkSession.builder.getOrCreate()
  18. lines = spark.readStream.format('kafka').option('subscribe','test').option('bootstrap-servers','master:9092 slave1:9092 slave2:9092').load().selectExp("CAST(value AS STRING)")
  19. #selectExp():将kafka下test主题下的value转化为STRING类型
  20. #selectExp()和select类似
  21. #此处如果用select(col('value').cast('string')) col来自spark.sql.functions
  22. #selectExp参数为string
  23. wordCounts = lines.groupBy("value").count()
  24. wordCounts.selectExp("CAST(value AS STRING) as key","CONCAT(CAST(value AS STRING),':',CAST(count AS STRING)) as value").writeStream.outputMode("complete").format("kafka").option("topic","test_count").option("kafka.bootstrap.servers","master:9092 slave1:9092 slave2:9092").option("checkpointLocation","file:///opt/software/tmp/kafka-sink-cp").trigger(processintTime="8 seconds").start().awaitTermination()
  25. """
  26. kafka读参数配置
  27. option配置:
  28. kafka.bootstrap.servers
  29. subscribe:读取主题
  30. format:kafka
  31. selectExp():必须将value转为字符串
  32. """
  33. """
  34. kafka写参数配置
  35. option配置:
  36. kafka.bootstrap.servers
  37. topic:写入主题
  38. format:kafka
  39. checkpointLocaiton:检查点位置
  40. """

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/709915
推荐阅读
相关标签
  

闽ICP备14008679号