当前位置:   article > 正文

structure streaming输入源之File源实现_structured streaming中file源生成json文件

structured streaming中file源生成json文件

文件放入到给定目录的操作应该具有原子性,即不能长时间在给定目录下打开文件写入内容,而是应该写入到临时文件后移动文件到给定目录下

步骤:
1.创建程序生成jason格式的file源测试数据
2.创建程序对数据进行统计

#!/usr/bin/env python3
#-*-coding: utf8-*-

import os
import shutil
import random
import time

TEST_DATA_TEMP_DIR='/tmp/'
TEST_DATA_DIR='/tmp/testdata/'

ACTION_DEF=['login','logout','purchase']#三种用户行为
DISTRICT_DEF = ['fujian','beijing','shanghai','guangzhou']
JSON_LINE_PATTREN = '{{"eventTime":{},"action":"{}","district":"{}"}}\n'#统一模式

#测试文件夹是否存在,如果有就清空掉重新创建
def test_setUp():
  if os.path.exists(TEST_DATA_DIR):
    shutil.rmtree(TEST_DATA_DIR,ignore_errors=True)
  os.mkdir(TEST_DATA_DIR)
#测试环境的恢复,对文件夹进行清洗,结束时用它把文件都删了
def test_tearDown():
  if os.path.exists(TEST_DATA_DIR):
    shutil.rmtree(TEST_DATA_DIR,ignore_errors = True)
#生成测试文件
def write_and_move(filename,data):
  with open(TEST_DATA_TEMP_DIR+filename,"wt",encoding="utf-8") as f:
    f.write(data)
  shutil.move(TEST_DATA_TEMP_DIR+filename,TEST_DATA_DIR+filename)

if__nane__=="__main__":
  test_setUp()
  #生成1000个文件每个文件里是100行jason数据
  for i in range(1000):
    filename='e-mall-{}.jason'.format(i)

    content =''
    rndcount = list(range(100))
    random.shuffle(rndcount)#打乱0到99
    for_in rndcount:
      content +=JASON_LINE_PATTERN.format(str(int(time.time))),random.choice(ACTION_DEF),random.choice(DISTRICT_DEF)#取出当前时间,random.choice就是随便取一个出来
    write_and_remove(filename,content)
    
    time.sleep(1)
  test_tearDown()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

vim spark_ss_filesource.py

#!/usr/bin/env python3
#-*-coding: utf8-*-

import os
import shutil
from pprint import pprint

from pyspark.sql import SparkSession
from pyspark.sql.functions import window,asc
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import TimeStampType, StringTyoe 
#要被统计的文件就在这个目录下
TEST_DATA_DIR_TEMP='file:///tmp/testdata/'

if__Name__=='__main__':
  schema = StructType([StructField('eventTime', TimestampType(), True),StructField('action',StringType(),True),StructField('district',StringType(),True)])

  spark =SparkSession.builder.appName("StructuredEMallPurchaseCount").getOrCreate()
  spark.sparkContext.setLogLevel('WARN')
#每次最多读100个文件,这个路径下有一对jason文件
lines=spark.readStream.format('jason').schema(schema).option("maxFilesPerTrigger",100).load(TEST_DATA_DIR)
#窗口统计,在指定的窗口大小内有多少人购买
windowDuration = '1 minutes'
#只统计购买的用户,按地区分离,窗口统计给出时间轴,时间间隔时多少,排序
windowCounts = lines.filter("action='purchase'").groupBy('district',window('eventTime',windowDuration)).count().sort(asc('window'))
#启动流计算
query =windowCounts.writeStream.outputMode("complete").format("console").option('truncate','false').trigger(processingTime = "10 seconds").start()#过长不要截断,每隔10s执行一次流计算

query.awitTermination()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

开始测试
cd /usr/local/hadoop
sbin/start-dfs.sh
新建数据源终端
cd /usr/local/spark/mycode/structuredstreaming/file
python3 sparkspark_ss_filesource_generate.py
新建流计算客户端终端
cd /usr/local/spark/mycode/structuredstreaming/file
python3 spark_ss_filesource.py

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/709947
推荐阅读
相关标签
  

闽ICP备14008679号