当前位置:   article > 正文

笔记二十三:Structured Streaming基础_structure streaming outputmode("complete")

structure streaming outputmode("complete")

Structured Streaming

  1. 结构化流(Structured Streaming)是流式计算思想的一种实现
  2. 是基于 Spark SQL DataFrame API 构建的流处理引擎,提供简单易用的流处理接口,
  3. 允许用户使用类似于批处理的方式对流数据执行 SQL 查询和复杂的数据处理操作。

 

流式程序的开发流程

  1. #1.构建SparkSession对象
  2. #2.Source(指定数据源)
  3. #3.Operations/Transformation(操作,数据处理)
  4. #4.Sink(数据输出)
  5. #5.启动流式任务

结构化流的入门案例

  1. # 需求
  2. 监听node1节点的10086的端口号,从端口号中获取单词数据,将其转换为DF进行单词统计。

  1. from pyspark.sql import SparkSession
  2. import os
  3. import pyspark.sql.functions as F
  4. # 1.构建SparkSession
  5. # 建造者模式:类名.builder.配置…….getOrCreate()
  6. # 自动帮你构建一个SparkConf对象,只要指定你需要哪些配置就可
  7. spark = SparkSession \
  8. .builder \
  9. .master("local[2]") \
  10. .appName("SparkSQLAppName") \
  11. .config("spark.sql.shuffle.partitions", 4) \
  12. .getOrCreate()
  13. # 2.数据输入
  14. #socket = hostname + port,就是socket数据源
  15. #host:主机名
  16. #port:端口号
  17. #format=socket,说明数据源是socket
  18. lines = spark\
  19. .readStream\
  20. .format("socket")\
  21. .option("host","node1")\
  22. .option("port","10086")\
  23. .load()
  24. # 3.数据处理
  25. #和SparkSQL一样
  26. words = lines.select(
  27. F.explode(
  28. F.split(lines['value'], " ")
  29. ).alias("word")
  30. )
  31. wordCounts = words.groupBy("word").count()
  32. # 4.数据输出
  33. lines.printSchema()
  34. #console:控制台
  35. #format:数据输出的介质,console 就是控制台
  36. #outputMode:
  37. #输出模式,append,只能用于非聚合情况
  38. #追加的模式、complete,只能用于聚合的情况
  39. #更新的模式,update,用于聚合和非聚合的情况,但是只显示更新的数据(新增、修改)
  40. query = wordCounts.writeStream.format("console").outputMode("complete")
  41. # 5.启动流式任务
  42. #awaitTermination:阻塞流程程序,处理源源不断到达的数据
  43. #start:启动流式程序
  44. query.start().awaitTermination()

从kafka中读取数据

  1. from pyspark.sql import SparkSession
  2. import os
  3. import pyspark.sql.functions as F
  4. # 1.构建SparkSession
  5. # 建造者模式:类名.builder.配置…….getOrCreate()
  6. # 自动帮你构建一个SparkConf对象,只要指定你需要哪些配置就可
  7. spark = SparkSession \
  8. .builder \
  9. .master("local[2]") \
  10. .appName("SparkSQLAppName") \
  11. .config("spark.sql.shuffle.partitions", 4) \
  12. .getOrCreate()
  13. # 2.数据输入
  14. #kafka:数据源是Kafka的数据源
  15. #kafka.bootstrap.servers:制定Kafka的broker地址
  16. #subscribe:订阅某个Topic
  17. input_df = spark \
  18. .readStream \
  19. .format("kafka") \
  20. .option("kafka.bootstrap.servers", "node1:9092,node2:9092,node3:9092") \
  21. .option("subscribe", "test06") \
  22. .load()
  23. # 3.数据处理
  24. input_df.printSchema()
  25. #selectExpr:在DSL中运行执行SQL语句
  26. #方式一,必须掌握,官方写法
  27. #input_df = input_df.selectExpr("cast(value as string)")
  28. #方式二,可选
  29. input_df = input_df.withColumn("value",F.expr("cast(value as string)"))
  30. # 4.数据输出
  31. queyr = input_df.writeStream.format("console").outputMode("append")
  32. # 5.启动流式任务
  33. queyr.start().awaitTermination()
  1. # Kafka的结构化流接收到kafka的数据,最终会转换为一个DF对象
  2. +---+-----+-----+---------+------+---------+-------------+
  3. |key|value|topic|partition|offset|timestamp|timestampType|
  4. +---+-----+-----+---------+------+---------+-------------+
  5. +---+-----+-----+---------+------+---------+-------------+
  6. #1.key
  7. 生产者发送的key数据, 如果没有发送, 即为NULL
  8. #2.value
  9. 生产者发送的value数据, value也是消息数据
  10. #3.topic
  11. 消息是从哪个topic接收的
  12. #4.partition
  13. 消息是从哪个topic的那个分片上接收的
  14. #5.offset
  15. 偏移量信息
  16. #6.timestamp
  17. 时间戳 (消费消息时间)
  18. #7.timestampType
  19. 时间戳类型

数据写入Kafka中

  1. from pyspark.sql import SparkSession
  2. import os
  3. # 1.构建SparkSession
  4. # 建造者模式:类名.builder.配置…….getOrCreate()
  5. # 自动帮你构建一个SparkConf对象,只要指定你需要哪些配置就可
  6. spark = SparkSession \
  7. .builder \
  8. .master("local[2]") \
  9. .appName("SparkSQLAppName") \
  10. .config("spark.sql.shuffle.partitions", 4) \
  11. .getOrCreate()
  12. # 2.数据输入
  13. input_df = spark.readStream.format("socket").option("host", "node1").option("port", "10086").load()
  14. # 3.数据处理
  15. input_df.printSchema()
  16. # 4.数据输出
  17. # kafka:把数据写进Kafka
  18. # kafka.bootstrap.servers:Kafka的broker地址
  19. # Topic:指定写到Kafka的哪个Topic
  20. query = input_df \
  21. .writeStream \
  22. .format("kafka") \
  23. .option("kafka.bootstrap.servers", "node1:9092,node2:9092,node3:9092") \
  24. .option("topic", "test07") \
  25. .option("checkpointLocation", "file:///root/checkpointLocation2") \
  26. .outputMode("append")
  27. # 5.启动流式任务
  28. query.start().awaitTermination()
  1. #1.Spark读取Kafka的数据,参数需要指定2个:
  2. 参数一:kafka.bootstrap.servers,broker地址
  3. 参数二:subscribe
  4. #2.Spark写入Kafka的数据,参数需要指定2个:
  5. 参数一:kafka.bootstrap.servers,broker地址
  6. 参数二:topic

物联网分析案例

  1. # 数据模拟器
  2. import json
  3. import random
  4. import time
  5. from kafka import KafkaProducer
  6. from pyspark.sql import SparkSession
  7. import pyspark.sql.functions as F
  8. import os
  9. # 2.数据输入
  10. #2.1 构建Kafka的生产者
  11. producer = KafkaProducer(
  12. bootstrap_servers=['node1:9092', 'node2:9092', 'node3:9092'],
  13. acks='all',
  14. value_serializer=lambda m: json.dumps(m).encode("utf-8")
  15. )
  16. #2.2 物联网设备类型
  17. deviceTypes = ["洗衣机", "油烟机", "空调", "窗帘", "灯", "窗户", "煤气报警器", "水表", "燃气表"]
  18. #2.3 模拟数据生成
  19. while True:
  20. index = random.choice(range(0, len(deviceTypes)))
  21. deviceID = f'device_{index}_{random.randrange(1, 20)}'
  22. deviceType = deviceTypes[index]
  23. deviceSignal = random.choice(range(10, 100))
  24. # 组装数据集
  25. print({'deviceID': deviceID, 'deviceType': deviceType, 'deviceSignal': deviceSignal,
  26. 'time': time.strftime('%Y%m%d')})
  27. # 发送数据
  28. producer.send(topic='iot',
  29. value={'deviceID': deviceID, 'deviceType': deviceType, 'deviceSignal': deviceSignal,
  30. 'time': time.strftime('%Y%m%d')}
  31. )
  32. # 间隔时间 5s内随机
  33. time.sleep(random.choice(range(1, 5)))
  1. # 模拟器程序会向kafka中源源不断的产生数据
  2. # 需求
  3. 用Spark从Kafka中接收数据,并且分析信号强度大于30的设备中,
  4. 按照类型统计它们的数量和平均信号强度,打印到标准输出。
  5. **示例数据**
  6. {'deviceID': 'device_5_14', 'deviceType': '窗户', 'deviceSignal': 16, 'time': '20220702'}
  7. {'deviceID': 'device_1_17', 'deviceType': '油烟机', 'deviceSignal': 93, 'time': '20220702'}
  1. # DSL实现
  2. from pyspark.sql import SparkSession
  3. import os
  4. import pyspark.sql.functions as F
  5. # 1.构建SparkSession
  6. # 建造者模式:类名.builder.配置…….getOrCreate()
  7. # 自动帮你构建一个SparkConf对象,只要指定你需要哪些配置就可
  8. spark = SparkSession \
  9. .builder \
  10. .master("local[2]") \
  11. .appName("SparkSQLAppName") \
  12. .config("spark.sql.shuffle.partitions", 4) \
  13. .getOrCreate()
  14. # 2.数据输入
  15. input_df = spark\
  16. .readStream\
  17. .format("kafka")\
  18. .option("kafka.bootstrap.servers","node1:9092,node2:9092,node3:9092")\
  19. .option("subscribe","iot")\
  20. .load()
  21. # 3.数据处理
  22. input_df.printSchema()
  23. print("---------------1.DSL-----------------")
  24. result_df = input_df.selectExpr("cast(value as string)")
  25. #json_tuple(jsonStr,p1,p2...pn)
  26. #json_tuple:专门处理json数据的函数,可以给多个字段,可以抽取每个字段的值
  27. #jsonStr:待处理的json字符串
  28. #p1~pn:字段名
  29. result_df = result_df.select(F.json_tuple(result_df['value'],"deviceID","deviceType","deviceSignal","time")
  30. .alias("deviceID","deviceType","deviceSignal","time"))
  31. result_df = result_df\
  32. .where("deviceSignal > 30")\
  33. .groupBy("deviceType")\
  34. .agg(F.count("deviceType").alias("cnt"),F.avg("deviceSignal").alias("avg_signal"))
  35. # 4.数据输出
  36. #option("truncate","False"):设置不截断
  37. query = result_df.writeStream.format("console").outputMode("complete").option("truncate","False")
  38. # 5.启动流式任务
  39. query.start().awaitTermination()
  1. # 输出
  2. Batch: 2
  3. -------------------------------------------
  4. +----------+---------------+-----------------+
  5. |deviceID |count(deviceID)|avg(deviceSignal)|
  6. +----------+---------------+-----------------+
  7. |device_6_5|1 |85.0 |
  8. |device_7_5|1 |33.0 |
  9. +----------+---------------+-----------------+
  10. ......
  11. Batch: 6
  12. -------------------------------------------
  13. +-----------+---------------+-----------------+
  14. |deviceID |count(deviceID)|avg(deviceSignal)|
  15. +-----------+---------------+-----------------+
  16. |device_6_5 |1 |85.0 |
  17. |device_0_13|1 |39.0 |
  18. |device_1_16|1 |56.0 |
  19. |device_7_5 |1 |33.0 |
  20. +-----------+---------------+-----------------+
  1. # SQL实现
  2. from pyspark.sql import SparkSession
  3. import os
  4. import pyspark.sql.functions as F
  5. # 1.构建SparkSession
  6. # 建造者模式:类名.builder.配置…….getOrCreate()
  7. # 自动帮你构建一个SparkConf对象,只要指定你需要哪些配置就可
  8. spark = SparkSession \
  9. .builder \
  10. .master("local[2]") \
  11. .appName("SparkSQLAppName") \
  12. .config("spark.sql.shuffle.partitions", 4) \
  13. .getOrCreate()
  14. # 2.数据输入
  15. input_df = spark\
  16. .readStream\
  17. .format("kafka")\
  18. .option("kafka.bootstrap.servers","node1:9092,node2:9092,node3:9092")\
  19. .option("subscribe","iot")\
  20. .load()
  21. # 3.数据处理
  22. input_df.printSchema()
  23. # 4.数据输出
  24. #option("truncate","False"):设置不截断
  25. print("---------------2.SQL-----------------")
  26. input_df.createOrReplaceTempView("t1")
  27. #UDTF函数的使用语法:源表 lateral view UDTF函数()
  28. result_df = spark.sql("""
  29. select
  30. deviceType,
  31. count(deviceType) as cnt,
  32. avg(deviceSignal) as avg_signal
  33. from t1 lateral view
  34. json_tuple(cast(value as string),"deviceID","deviceType","deviceSignal","time") as deviceID,deviceType,deviceSignal,`time`
  35. where deviceSignal > 30
  36. group by deviceType
  37. """)
  38. query = result_df.writeStream.format("console").outputMode("complete").option("truncate","False")
  39. # 5.启动流式任务
  40. query.start().awaitTermination()

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/709964
推荐阅读
相关标签
  

闽ICP备14008679号