赞
踩
- #kafka数据生成脚本 spark_ss_kafka_producer.py
-
- import random
- import string
- import time
- from kafka import KafkaProducer
-
- if __name__ == '__main__':
- producer = KafkaProducer(bootstrap_servers=['master:9092','slave1:9092','slave2:9092'])
-
- while True:
- s2 = (random.choice(string.ascii_lowercase) for _ in range(2))
- word = ''.join(s2)
- value = bytearray(word,'utf-8')
- producer.send('test',value=value).get(timeout=10)
- time.sleep(0.1)
-
- #消费端spark消费kafka数据 spark_ss_kafka_consumer.py
- from pyspark.sql import SparkSession
- if __name__ == '__main__':
- spark = SparkSession.builder.getOrCreate()
- lines = spark.readStream.format('kafka').option('subscribe','test').option('bootstrap-servers','master:9092 slave1:9092 slave2:9092').load().selectExp("CAST(value AS STRING)")
- #selectExp():将kafka下test主题下的value转化为STRING类型
- #selectExp()和select类似
- #此处如果用select(col('value').cast('string')) col来自spark.sql.functions
- #selectExp参数为string
- wordCounts = lines.groupBy("value").count()
- wordCounts.selectExp("CAST(value AS STRING) as key","CONCAT(CAST(value AS STRING),':',CAST(count AS STRING)) as value").writeStream.outputMode("complete").format("kafka").option("topic","test_count").option("kafka.bootstrap.servers","master:9092 slave1:9092 slave2:9092").option("checkpointLocation","file:///opt/software/tmp/kafka-sink-cp").trigger(processintTime="8 seconds").start().awaitTermination()
-
- """
- kafka读参数配置
- option配置:
- kafka.bootstrap.servers
- subscribe:读取主题
- format:kafka
- selectExp():必须将value转为字符串
- """
- """
- kafka写参数配置
- option配置:
- kafka.bootstrap.servers
- topic:写入主题
- format:kafka
- checkpointLocaiton:检查点位置
- """
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。