赞
踩
首先需要安装pymysql外部包
from pymysql import Connection # 构建到MySQL数据库的连接 connection = Connection( host="localhost", port=3306, user="root", password="123456" ) # 获取游标 cunsor = connection.cursor() # 选择数据库 connection.select_db("test") # 执行sql cunsor.execute("select * from people") # 获取查询结果 results : tuple = cunsor.fetchall() for result in results: print(result) # 关闭连接 connection.close()
在插入数据时,需要手动提交事务
connection.commit()
或者在Connection构造方法中传参
autocommit=True
Spark是Apache基金会旗下的顶级开源项目,用于对海量数据进行大规模分布式计算。
PySpark是Spark的Python实现,是Spark为Python开发者提供的编程入口,用于以Python代码完成Spark任务的开发
PySpark不仅可以作为Python第三方库使用,也可以将程序提交的Spark集群环境中,调度大规模集群进行执行。
国内代理镜像网站(清华大学源)
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark
PySpark的编程,主要分为三个步骤
想要使用PySpark库完成数据处理,首先需要构建一个执行环境入口对象。
PySpark的执行环境入口对象是:类 SparkContext 的类对象
SparkContext类对象,是PySpark编程中一切功能的入口。
from pyspark import SparkConf, SparkContext
# 创建SparkConf对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf对象创建SparkContext对象
sc = SparkContext(conf=conf)
# 打印PySpark的运行版本
print(sc.version)
# 停止SparkContext运行(停止PySpark程序)
sc.stop()
PySpark支持多种数据的输入,在输入完成后,都会得到一个:RDD类的对象
RDD全称为:弹性分布式数据集(Resilient Distributed Datasets)
RDD对象称之为分布式弹性数据集,是PySpark中数据计算的载体,它可以:
from pyspark import SparkConf, SparkContext # 创建SparkConf对象和SparkContext对象 conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app") sc = SparkContext(conf=conf) # 通过SparkContext的parallelize成员方法,将Python数据容器转换为RDD对象 rdd1 = sc.parallelize([1, 2, 3, 4, 5]) rdd2 = sc.parallelize((1, 2, 3, 4, 5)) rdd3 = sc.parallelize("abcdefg") rdd4 = sc.parallelize({1, 2, 3, 4, 5}) rdd5 = sc.parallelize({"key1": "value1", "key2": "value2"}) # 如果要查看RDD中的内容,需要调用collect()方法 print(rdd1.collect()) print(rdd2.collect()) print(rdd3.collect()) print(rdd4.collect()) print(rdd5.collect()) # 停止SparkContext运行(停止PySpark程序) sc.stop()
from pyspark import SparkConf, SparkContext
# 创建SparkConf对象和SparkContext对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
# 通过SparkContext的textFile成员方法,读取文本文件得到RDD对象
rdd = sc.textFile("D:\\MyStudy\\MyCode\\PycharmProjects\\py_mysql\\test.txt")
print(rdd.collect())
# 停止SparkContext运行(停止PySpark程序)
sc.stop()
from pyspark import SparkConf, SparkContext import os os.environ['PYSPARK_PYTHON'] = "D:\\MyStudy\\Environment\\Python\\Python310\\python.exe" # 创建SparkConf对象和SparkContext对象 conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app") sc = SparkContext(conf=conf) # 准备一个RDD rdd = sc.parallelize([1, 2, 3, 4, 5]) # 通过map方法将全部数据都乘以十,链式调用 rdd2 = rdd.map(lambda data: data * 10).map(lambda data: data + 5) print(rdd2.collect()) # 停止SparkContext运行(停止PySpark程序) sc.stop()
from pyspark import SparkConf, SparkContext import os os.environ['PYSPARK_PYTHON'] = "D:\\MyStudy\\Environment\\Python\\Python310\\python.exe" # 创建SparkConf对象和SparkContext对象 conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app") sc = SparkContext(conf=conf) # 准备一个RDD rdd = sc.parallelize(["this is", "python 310", "flatmap", "rdd spark"]) # 通过map方法将全部数据都乘以十,链式调用 rdd2 = rdd.flatMap(lambda data: data.split(" ")) print(rdd2.collect()) # 停止SparkContext运行(停止PySpark程序) sc.stop()
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:\\MyStudy\\Environment\\Python\\Python310\\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
# 准备一个RDD
rdd = sc.parallelize([('boy', 99), ('boy', 88), ('boy', 77), ('girl', 99), ('girl', 66)])
# 求男生和女生两个组的成绩之和
rdd2 = rdd.reduceByKey(lambda a, b: a + b)
print(rdd2.collect())
sc.stop()
提前设置好一个等待读取的txt文档:
spark python java zhang rdd python rdd
python java cpp c cpp spark pyspark zhang
rdd c python py pyspark python pp rdd
from pyspark import SparkConf, SparkContext import os os.environ['PYSPARK_PYTHON'] = "D:\\MyStudy\\Environment\\Python\\Python310\\python.exe" conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app") sc = SparkContext(conf=conf) # 读取数据文件 rdd = sc.textFile("D:\\MyStudy\\MyCode\\PycharmProjects\\py_mysql\\test.txt") # 取出全部单词 word_rdd = rdd.flatMap(lambda data: data.split(" ")) # 将单词转换为二元元组,单词为key,value设置为1 word_one_rdd = word_rdd.map(lambda word: (word, 1)) # 分组求和 result_rdd = word_one_rdd.reduceByKey(lambda a, b: a + b) # 打印结果 print(result_rdd.collect()) sc.stop()
接受一个处理函数,可用lambda快速编写
函数对RDD数据逐个处理,得到True的保留至返回值的RDD中
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:\\MyStudy\\Environment\\Python\\Python310\\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
# 取出列表中的奇数
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
rdd2 = rdd.filter(lambda num: num % 2 == 0)
print(rdd2.collect())
sc.stop()
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:\\MyStudy\\Environment\\Python\\Python310\\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1, 2, 3, 4, 5, 1, 2, 6, 7, 3])
rdd2 = rdd.distinct()
print(rdd2.collect())
sc.stop()
from pyspark import SparkConf, SparkContext import os os.environ['PYSPARK_PYTHON'] = "D:\\MyStudy\\Environment\\Python\\Python310\\python.exe" conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app") sc = SparkContext(conf=conf) rdd = sc.textFile("D:\\MyStudy\\MyCode\\PycharmProjects\\py_mysql\\test.txt") word_rdd = rdd.flatMap(lambda data: data.split(" ")) word_one_rdd = word_rdd.map(lambda word: (word, 1)) result_rdd = word_one_rdd.reduceByKey(lambda a, b: a + b) result_rdd_sortBy = result_rdd.sortBy(lambda t: t[1], ascending=False, numPartitions=1) print(result_rdd_sortBy.collect()) sc.stop()
需求:把下述内容写入txt文件,使用Spark读取文件进行计算
各个城市销售额排名,从大到小
全部城市,有哪些商品类别在售卖
北京市有哪些商品类别在售卖
{"id":1,"timestamp":"2019-05-08T01:03.00Z","category":"平板电脑","areaName":"北京","money":"1450"}|{"id":2,"timestamp":"2019-05-08T01:01.00Z","category":"手机","areaName":"北京","money":"1450"}|{"id":3,"timestamp":"2019-05-08T01:03.00Z","category":"手机","areaName":"北京","money":"8412"}
{"id":4,"timestamp":"2019-05-08T05:01.00Z","category":"电脑","areaName":"上海","money":"1513"}|{"id":5,"timestamp":"2019-05-08T01:03.00Z","category":"家电","areaName":"北京","money":"1550"}|{"id":6,"timestamp":"2019-05-08T01:01.00Z","category":"电脑","areaName":"杭州","money":"1550"}
{"id":7,"timestamp":"2019-05-08T01:03.00Z","category":"电脑","areaName":"北京","money":"5611"}|{"id":8,"timestamp":"2019-05-08T03:01.00Z","category":"家电","areaName":"北京","money":"4410"}|{"id":9,"timestamp":"2019-05-08T01:03.00Z","category":"家具","areaName":"郑州","money":"1120"}
{"id":10,"timestamp":"2019-05-08T01:01.00Z","category":"家具","areaName":"北京","money":"6661"}|{"id":11,"timestamp":"2019-05-08T05:03.00Z","category":"家具","areaName":"杭州","money":"1230"}|{"id":12,"timestamp":"2019-05-08T01:01.00Z","category":"书籍","areaName":"北京","money":"5550"}
{"id":13,"timestamp":"2019-05-08T01:03.00Z","category":"书籍","areaName":"北京","money":"5550"}|{"id":14,"timestamp":"2019-05-08T01:01.00Z","category":"电脑","areaName":"北京","money":"1261"}|{"id":15,"timestamp":"2019-05-08T03:03.00Z","category":"电脑","areaName":"杭州","money":"6660"}
{"id":16,"timestamp":"2019-05-08T01:01.00Z","category":"电脑","areaName":"天津","money":"6660"}|{"id":17,"timestamp":"2019-05-08T01:03.00Z","category":"书籍","areaName":"北京","money":"9000"}|{"id":18,"timestamp":"2019-05-08T05:01.00Z","category":"书籍","areaName":"北京","money":"1230"}
{"id":19,"timestamp":"2019-05-08T01:03.00Z","category":"电脑","areaName":"杭州","money":"5551"}|{"id":20,"timestamp":"2019-05-08T01:01.00Z","category":"电脑","areaName":"北京","money":"2450"}
{"id":21,"timestamp":"2019-05-08T01:03.00Z","category":"食品","areaName":"北京","money":"5520"}|{"id":22,"timestamp":"2019-05-08T01:01.00Z","category":"食品","areaName":"北京","money":"6650"}
{"id":23,"timestamp":"2019-05-08T01:03.00Z","category":"服饰","areaName":"杭州","money":"1240"}|{"id":24,"timestamp":"2019-05-08T01:01.00Z","category":"食品","areaName":"天津","money":"5600"}
{"id":25,"timestamp":"2019-05-08T01:03.00Z","category":"食品","areaName":"北京","money":"7801"}|{"id":26,"timestamp":"2019-05-08T01:01.00Z","category":"服饰","areaName":"北京","money":"9000"}
{"id":27,"timestamp":"2019-05-08T01:03.00Z","category":"服饰","areaName":"杭州","money":"5600"}|{"id":28,"timestamp":"2019-05-08T01:01.00Z","category":"食品","areaName":"北京","money":"8000"}|{"id":29,"timestamp":"2019-05-08T02:03.00Z","category":"服饰","areaName":"杭州","money":"7000"}
from pyspark import SparkConf, SparkContext import os import json os.environ['PYSPARK_PYTHON'] = "D:\\MyStudy\\Environment\\Python\\Python310\\python.exe" conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app") sc = SparkContext(conf=conf) # 公共操作 # 打开文件并获取每个json字符串 file_rdd = sc.textFile("D:\\MyStudy\\MyCode\\PycharmProjects\\py_mysql\\data.txt") json_str_rdd = file_rdd.flatMap(lambda data: data.split("|")) # 通过json类的类方法转化为字典对象 dict_rdd = json_str_rdd.map(lambda data: json.loads(data)) # TODO 需求1:城市销售额排名 # 清洗数据,保留字典中的有用信息 clear_data_rdd = dict_rdd.map(lambda data: (data['areaName'], int(data['money']))) # 按照地名,聚合数据,并进行排序 aggregation_rdd = clear_data_rdd.reduceByKey(lambda a, b: a + b) aggregation_sort_rdd = aggregation_rdd.sortBy(lambda x: x[1], ascending=False, numPartitions=1) # 打印结果 print(f"城市销售额排名为:{aggregation_sort_rdd.collect()}") # TODO 需求2:全部城市中有哪些商品在售卖 # 清洗数据,保留字典中的有用信息 clear_data_rdd = dict_rdd.map(lambda data: (data['category'])) # 去重 category_rdd = clear_data_rdd.distinct() # 打印结果 print(f"所有正在售卖的商品有:{category_rdd.collect()}") # TODO 需求3:北京有哪些商品在售卖 # 过滤数据,保留满足地名为北京的json数据 bj_rdd = dict_rdd.filter(lambda data: data['areaName'] == '北京') # 清洗数据,保留字典中的有用信息 clear_data_rdd = bj_rdd.map(lambda data: (data['category'])) # 去重 category_rdd = clear_data_rdd.distinct() # 打印结果 print(f"北京正在售卖的商品有:{category_rdd.collect()}") # 公共操作 sc.stop()
collect:将RDD内容转换为list
reduce:对RDD内容进行自定义聚合
take:取出RDD的前N个元素组成list
count:统计RDD元素个数
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:\\MyStudy\\Environment\\Python\\Python310\\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.collect())
print(type(rdd.collect()))
sc.stop()
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:\\MyStudy\\Environment\\Python\\Python310\\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1, 2, 3, 4, 5])
num = rdd.reduce(lambda a, b: a + b)
print(num)
sc.stop()
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:\\MyStudy\\Environment\\Python\\Python310\\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = rdd.take(3)
print(rdd2)
sc.stop()
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:\\MyStudy\\Environment\\Python\\Python310\\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = rdd.count()
print(rdd2)
sc.stop()
将RDD的数据写入文本文件中
支持本地写出,hdfs等文件系统
调用保存文件的算子,需要配置Hadoop依赖
下载Hadoop安装包
解压到电脑任意位置
在Python代码中使用os模块配置:os.environ[‘HADOOP_HOME’] = ‘HADOOP解压文件夹路径’
下载winutils.exe,并放入Hadoop解压文件夹的bin目录内
下载hadoop.dll,并放入:C:/Windows/System32 文件夹内
修改rdd分区为1个
case1:SparkConf对象设置属性全局并行度为1:
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
conf.set("spark.default.parallelism", "1")
sc = SparkContext(conf=conf)
case2:创建RDD的时候设置(parallelize方法传入numSlices参数为1)
rdd1 = sc.parallelize([1, 2, 3, 4, 5], numSlices=1)
# 或者
rdd2 = sc.parallelize([1, 2, 3, 4, 5], 1)
rdd1.saveAsTextFile("rdd1")
rdd2.saveAsTextFile("rdd2")
from pyspark import SparkConf, SparkContext import os os.environ['PYSPARK_PYTHON'] = "D:\\MyStudy\\Environment\\Python\\Python310\\python.exe" os.environ['HADOOP_HOME'] = "D:\\MyStudy\\Environment\\Hadoop\\spark-3.3.1-bin-hadoop3" conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app") conf.set("spark.default.parallelism", "1") sc = SparkContext(conf=conf) # 读取文件转化为RDD file_rdd = sc.textFile("D:\\MyStudy\\MyCode\\PycharmProjects\\py_mysql\\search_log.txt") # TODO 需求1:热门搜索时间段(小时精度)Top3 # 将rdd返回的list中的每个元素按\t划分,列表嵌套 # 每行(内层列表)第0个元素即为时间,如:00:00:00,且前两位为小时 # 将小时组装为元组,并按照小时进行聚合 # 按照出现总次数进行降序排序,取前三位即为TOP3 result1 = file_rdd.map(lambda x: x.split("\t")). \ map(lambda x: x[0][:2]). \ map(lambda x: (x + "点", 1)). \ reduceByKey(lambda a, b: a + b). \ sortBy(lambda x: x[1], ascending=False, numPartitions=1). \ take(3) print(f"热门搜索时间段(小时精度)Top3:{result1}") # TODO 需求2:热门搜索词Top3 result2 = file_rdd.map(lambda x: (x.split("\t")[2], 1)). \ reduceByKey(lambda a, b: a + b). \ sortBy(lambda x: x[1], ascending=False, numPartitions=1). \ take(3) print(f"热门搜索词Top3:{result2}") # TODO 需求3:统计黑马程序员关键字在哪个时段被搜索最多 result3 = file_rdd.map(lambda x: x.split("\t")). \ filter(lambda x: x[2] == "黑马程序员"). \ map(lambda x: (x[0][:2], 1)). \ reduceByKey(lambda a, b: a + b). \ sortBy(lambda x: x[1], ascending=False, numPartitions=1). \ take(1) print(f"统计黑马程序员关键字在哪个时段被搜索最多:{result3}") # TODO 需求4:将数据转换为JSON格式,写出为文件 file_rdd.map(lambda x: x.split("\t")). \ map(lambda x: {"time": x[0], "user_id": x[1], "key_word": x[2], "rank1": x[3], "rank2": x[4], "url": x[5]}). \ saveAsTextFile("output_json") print("文件已写出") sc.stop()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。