赞
踩
0.1 讲义文件源-json数据任务。按照讲义中json数据的生成及分析,复现实验,并适当分析。
import os import shutil import random import time TEST_DATA_TEMP_DIR = '/tmp/' TEST_DATA_DIR = '/tmp/testdata/' ACTION_DEF = ['login', 'logout', 'purchase'] DISTRICT_DEF = ['fujian', 'beijing', 'shanghai', 'guangzhou'] JSON_LINE_PATTERN = '{{"eventTime": {}, "action": "{}", "district": "{}"}}\n‘ # 测试的环境搭建,判断文件夹是否存在,如果存在则删除旧数据,并建立文件夹 def test_setUp(): if os.path.exists(TEST_DATA_DIR): shutil.rmtree(TEST_DATA_DIR, ignore_errors=True) os.mkdir(TEST_DATA_DIR) # 测试环境的恢复,对文件夹进行清理 def test_tearDown(): if os.path.exists(TEST_DATA_DIR): shutil.rmtree(TEST_DATA_DIR, ignore_errors=True) # 生成测试文件 def write_and_move(filename, data): with open(TEST_DATA_TEMP_DIR + filename, "wt", encoding="utf-8") as f: f.write(data) shutil.move(TEST_DATA_TEMP_DIR + filename, TEST_DATA_DIR + filename) if __name__ == "__main__": test_setUp() # 这里生成200个文件 for i in range(200): filename = 'e-mall-{}.json'.format(i) content = '' rndcount = list(range(100)) random.shuffle(rndcount) for _ in rndcount: content += JSON_LINE_PATTERN.format( str(int(time.time())), random.choice(ACTION_DEF), random .choice(DISTRICT_DEF)) write_and_move(filename, content) time.sleep(1)
# 导入需要用到的模块 import os import shutil from pprint import pprint from pyspark.sql import SparkSession from pyspark.sql.functions import window, asc from pyspark.sql.types import StructType, StructField from pyspark.sql.types import TimestampType, StringType # 定义JSON文件的路径常量(此为本地路径) TEST_DATA_DIR_SPARK = '/tmp/testdata/' if __name__ == "__main__": # 定义模式,为时间戳类型的eventTime、字符串类型的操作和省份组成 schema = StructType([ StructField("eventTime", TimestampType(), True), StructField("action", StringType(), True), StructField("district", StringType(), True)]) spark = SparkSession \ .builder \ .appName("StructuredEMallPurchaseCount") \ .getOrCreate() spark.sparkContext.setLogLevel('WARN') lines = spark \ .readStream \ .format("json") \ .schema(schema) \ .option("maxFilesPerTrigger", 100) \ .load(TEST_DATA_DIR_SPARK) # 定义窗口 windowDuration = '1 minutes' windowedCounts = lines \ .filter("action = 'purchase'") \ .groupBy('district', window('eventTime', windowDuration)) \ .count() \ .sort(asc('window')) query = windowedCounts \ .writeStream \ .outputMode("complete") \ .format("console") \ .option('truncate', 'false') \ .trigger(processingTime="10 seconds") \ .start() query.awaitTermination()
0.2 讲义kafka源,2字母单词分析任务按照讲义要求,复现kafka源实验。
cd /usr/local/kafka
./bin/zookeeper-server-start.sh config/zookeeper.properties
cd /usr/local/kafka
./bin/kafka-server-start.sh config/server.properties
cd /usr/local/kafka
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wordcount-topic
cd /usr/local/kafka
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wordcount-result-topic
# spark_ss_kafka_producer.py import string import random import time from kafka import KafkaProducer if __name__ == "__main__": producer = KafkaProducer(bootstrap_servers=['localhost:9092']) while True: s2 = (random.choice(string.ascii_lowercase) for _ in range(2)) word = ''.join(s2) value = bytearray(word, 'utf-8') producer.send('wordcount-topic', value=value) \ .get(timeout=10) time.sleep(0.1)
# spark_ss_kafka_consumer.py from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession \ .builder \ .appName("StructuredKafkaWordCount") \ .getOrCreate() spark.sparkContext.setLogLevel('WARN') lines = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("subscribe", 'wordcount-topic') \ .load() \ .selectExpr("CAST(value AS STRING)") wordCounts = lines.groupBy("value").count() query = wordCounts \ .selectExpr("CAST(value AS STRING) as key", "CONCAT(CAST(value AS STRING), ':', CAST(count AS STRING)) as value") \ .writeStream \ .outputMode("complete") \ .format("kafka") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("topic", "wordcount-result-topic") \ .option("checkpointLocation", "file:///tmp/kafka-sink-cp") \ .trigger(processingTime="8 seconds") \ .start() query.awaitTermination()
0.3 讲义socket源,结构化流实现词频统计。按照讲义要求,复现socket源实验。
# StructuredNetworkWordCount.py # 步骤1:导入pyspark模块 from pyspark.sql import SparkSession from pyspark.sql.functions import split from pyspark.sql.functions import explode # 步骤2:创建SparkSession对象 # 创建一个SparkSession对象,代码如下: if __name__ == "__main__": spark = SparkSession \ .builder \ .appName("StructuredNetworkWordCount") \ .getOrCreate() spark.sparkContext.setLogLevel('WARN') # 步骤3:创建输入数据源 # 创建一个输入数据源,从“监听在本机(localhost)的9999端口上的服务”那里接收文本数据,具体语句如下: lines = spark \ .readStream \ .format("socket") \ .option("host", "localhost") \ .option("port", 9999) \ .load() # 步骤4:定义流计算过程 # 有了输入数据源以后,接着需要定义相关的查询语句,具体如下: words = lines.select( explode( split(lines.value, " ") ).alias("word") ) wordCounts = words.groupBy("word").count() # 步骤5:启动流计算并输出结果 # 定义完查询语句后,下面就可以开始真正执行流计算,具体语句如下: query = wordCounts \ .writeStream \ .outputMode("complete") \ .format("console") \ .trigger(processingTime="8 seconds") \ .start() query.awaitTermination()
cd /opt/module/hadoop
./sbin/start-dfs.sh
nc -lk 9999
0.4(不选)使用rate源,评估系统性能。
# spark_ss_rate.py from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession \ .builder \ .appName("TestRateStreamSource") \ .getOrCreate() spark.sparkContext.setLogLevel('WARN') lines = spark \ .readStream \ .format("rate") \ .option('rowsPerSecond', 5) \ .load() print(lines.schema) query = lines \ .writeStream \ .outputMode("update") \ .format("console") \ .option('truncate', 'false') \ .start() query.awaitTermination()
1.1通过Socket传送Syslog到Spark日志分析是一个大数据分析中较为常见的场景。
tail -n+1 -f /var/log/syslog | nc -lk 9988“tail -n+1 -f /var/log/syslog”
logger ‘I am a test error log message.
from pyspark import SparkContext from pyspark.streaming import StreamingContext # 创建SparkContext和StreamingContext sc = SparkContext(appName="SyslogAnalysis") ssc = StreamingContext(sc, 1) # 创建一个DStream,接收来自Socket的数据流 lines = ssc.socketTextStream("localhost", 9988) # 在数据流上应用转换和操作 word_counts = lines.flatMap(lambda line: line.split(" ")) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda x, y: x + y) # 输出结果到控制台 word_counts.pprint() # 启动StreamingContext ssc.start() ssc.awaitTermination()
1.2对Syslog进行查询
from pyspark.sql.functions import window from pyspark.sql import SparkSession from pyspark.sql.functions import col from pyspark.sql.types import StructType, StructField, StringType, TimestampType # 创建SparkSession spark = SparkSession.builder \ .appName("LogAnalysis") \ .getOrCreate() # 定义日志数据的模式 schema = StructType([ StructField("timestamp", TimestampType(), True), StructField("message", StringType(), True) ]) # 从socket接收日志数据流 logs = spark.readStream \ .format("socket") \ .option("host", "localhost") \ .option("port", 9988) \ .load() # 将接收到的日志数据流应用模式 logs = logs.selectExpr("CAST(value AS STRING)") \ .selectExpr("to_timestamp(value, 'yyyy-MM-dd HH:mm:ss') AS timestamp", "value AS message") \ .select(col("timestamp"), col("message").alias("log_message")) # 统计CRON进程每小时生成的日志数,并按时间顺序排列 cron_logs = logs.filter(col("log_message").contains("CRON")) \ .groupBy(window("timestamp", "1 hour")) \ .count() \ .orderBy("window") # 统计每小时每个进程或服务产生的日志总数 service_logs = logs.groupBy(window("timestamp", "1 hour"), "log_message") \ .count() \ .orderBy("window") # 输出所有带有"error"的日志内容 error_logs = logs.filter(col("log_message").contains("error")) # 设置水印为1分钟 cron_logs = cron_logs.withWatermark("window", "1 minute") service_logs = service_logs.withWatermark("window", "1 minute") error_logs = error_logs.withWatermark("timestamp", "1 minute") # 启动流式处理并输出结果 query_cron_logs = cron_logs.writeStream \ .outputMode("complete") \ .format("console") \ .start() query_service_logs = service_logs.writeStream \ .outputMode("complete") \ .format("console") \ .start() query_error_logs = error_logs.writeStream \ .outputMode("append") \ .format("console") \ .start() # 等待流式处理完成 query_cron_logs.awaitTermination() query_service_logs.awaitTermination() query_error_logs.awaitTermination()
1.设置流以将数据输入structed streaming。
import pandas as pd
# 读取数据文件
data = pd.read_csv('/usr/local/data/dj30.csv')
# 选择需要的列
selected_data = data[['Long Date', 'Close']]
# 输出数据到控制台
print(selected_data)
# 保存数据到文件
selected_data.to_csv('/usr/local/data/dj.csv', index=False)
from pyspark.sql import SparkSession from pyspark.sql.functions import * from pyspark.sql.types import * # 创建SparkSession spark = SparkSession.builder \ .appName("StructuredStreamingExample") \ .getOrCreate() # 定义数据模式 schema = StructType([ StructField("Long Date", StringType()), StructField("Close", DoubleType()) ]) # 读取数据流 data_stream = spark.readStream \ .format("csv") \ .option("header", True) \ .schema(schema) \ .load("/usr/local/dj30.csv") # 处理数据流 processed_stream = data_stream.select("Long Date", "Close") # 输出到控制台 query = processed_stream.writeStream \ .format("console") \ .outputMode("append") \ .start() # 等待流处理完成 query.awaitTermination()
2.使用structed streaming窗口累计 dj30sum和dj30ct,分别为价格的总和和计数
3.将这两个structed streaming (dj30sum和dj30ct)分开产生dj30avg,从而创建10天MA和40天MA的移动平均值
4.比较两个移动平均线(短期移动平均线和长期移动平均线)来指示买入和卖出信号。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。