赞
踩
文件放入到给定目录的操作应该具有原子性,即不能长时间在给定目录下打开文件写入内容,而是应该写入到临时文件后移动文件到给定目录下
步骤:
1.创建程序生成jason格式的file源测试数据
2.创建程序对数据进行统计
#!/usr/bin/env python3 #-*-coding: utf8-*- import os import shutil import random import time TEST_DATA_TEMP_DIR='/tmp/' TEST_DATA_DIR='/tmp/testdata/' ACTION_DEF=['login','logout','purchase']#三种用户行为 DISTRICT_DEF = ['fujian','beijing','shanghai','guangzhou'] JSON_LINE_PATTREN = '{{"eventTime":{},"action":"{}","district":"{}"}}\n'#统一模式 #测试文件夹是否存在,如果有就清空掉重新创建 def test_setUp(): if os.path.exists(TEST_DATA_DIR): shutil.rmtree(TEST_DATA_DIR,ignore_errors=True) os.mkdir(TEST_DATA_DIR) #测试环境的恢复,对文件夹进行清洗,结束时用它把文件都删了 def test_tearDown(): if os.path.exists(TEST_DATA_DIR): shutil.rmtree(TEST_DATA_DIR,ignore_errors = True) #生成测试文件 def write_and_move(filename,data): with open(TEST_DATA_TEMP_DIR+filename,"wt",encoding="utf-8") as f: f.write(data) shutil.move(TEST_DATA_TEMP_DIR+filename,TEST_DATA_DIR+filename) if__nane__=="__main__": test_setUp() #生成1000个文件每个文件里是100行jason数据 for i in range(1000): filename='e-mall-{}.jason'.format(i) content ='' rndcount = list(range(100)) random.shuffle(rndcount)#打乱0到99 for_in rndcount: content +=JASON_LINE_PATTERN.format(str(int(time.time))),random.choice(ACTION_DEF),random.choice(DISTRICT_DEF)#取出当前时间,random.choice就是随便取一个出来 write_and_remove(filename,content) time.sleep(1) test_tearDown()
vim spark_ss_filesource.py
#!/usr/bin/env python3 #-*-coding: utf8-*- import os import shutil from pprint import pprint from pyspark.sql import SparkSession from pyspark.sql.functions import window,asc from pyspark.sql.types import StructType, StructField from pyspark.sql.types import TimeStampType, StringTyoe #要被统计的文件就在这个目录下 TEST_DATA_DIR_TEMP='file:///tmp/testdata/' if__Name__=='__main__': schema = StructType([StructField('eventTime', TimestampType(), True),StructField('action',StringType(),True),StructField('district',StringType(),True)]) spark =SparkSession.builder.appName("StructuredEMallPurchaseCount").getOrCreate() spark.sparkContext.setLogLevel('WARN') #每次最多读100个文件,这个路径下有一对jason文件 lines=spark.readStream.format('jason').schema(schema).option("maxFilesPerTrigger",100).load(TEST_DATA_DIR) #窗口统计,在指定的窗口大小内有多少人购买 windowDuration = '1 minutes' #只统计购买的用户,按地区分离,窗口统计给出时间轴,时间间隔时多少,排序 windowCounts = lines.filter("action='purchase'").groupBy('district',window('eventTime',windowDuration)).count().sort(asc('window')) #启动流计算 query =windowCounts.writeStream.outputMode("complete").format("console").option('truncate','false').trigger(processingTime = "10 seconds").start()#过长不要截断,每隔10s执行一次流计算 query.awitTermination()
开始测试
cd /usr/local/hadoop
sbin/start-dfs.sh
新建数据源终端
cd /usr/local/spark/mycode/structuredstreaming/file
python3 sparkspark_ss_filesource_generate.py
新建流计算客户端终端
cd /usr/local/spark/mycode/structuredstreaming/file
python3 spark_ss_filesource.py
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。