当前位置:   article > 正文

Spark实战(四)spark+python快速入门实战小例子(PySpark)_spark python 例子

spark python 例子

   由于目前很多spark程序资料都是用scala语言写的,但是现在需要用python来实现,于是在网上找了scala写的例子改为python实现

1、集群测试实例

   代码如下:
from pyspark.sql import SparkSession

if __name__ == "__main__":
    spark = SparkSession\
            .builder\
            .appName("PythonWordCount")\
            .master("spark://mini1:7077") \
            .getOrCreate()
    spark.conf.set("spark.executor.memory", "500M")
    sc = spark.sparkContext
    a = sc.parallelize([1, 2, 3])
    b = a.flatMap(lambda x: (x,x ** 2))
    print(a.collect())
    print(b.collect())
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

   运行结果:
在这里插入图片描述

2、从文件中读取

   为了方便调试,这里采用本地模式进行测试

from py4j.compat import long
from pyspark.sql import SparkSession
def formatData(arr):
    # arr = arr.split(",")
    mb = (arr[0], arr[2])
    flag = arr[3]
    time = long(arr[1])
    # time = arr[1]
    if flag == "1":
          time = -time
    return (mb,time)


if __name__ == "__main__":
    spark = SparkSession\
            .builder\
            .appName("PythonWordCount")\
            .master("local")\
            .getOrCreate()

    sc = spark.sparkContext
    # sc = spark.sparkContext
    line = sc.textFile("D:\\code\\hadoop\\data\\spark\\day1\\bs_log").map(lambda x: x.split(','))
    count = line.map(lambda x: formatData(x))
    rdd0 = count.reduceByKey(lambda agg, obj: agg + obj)
    # print(count.collect())
    line2 = sc.textFile("D:\\code\\hadoop\\data\\spark\\day1\\lac_info.txt").map(lambda x: x.split(','))

    rdd = count.map(lambda arr: (arr[0][1], (arr[0][0], arr[1])))
    rdd1 = line2.map(lambda arr: (arr[0], (arr[1], arr[2])))

    rdd3 = rdd.join(rdd1)
    rdd4 =rdd0.map(lambda arr: (arr[0][0], arr[0][1], arr[1]))
        # .map(lambda arr: list(arr).sortBy(lambda arr1: arr1[2]).reverse)
    rdd5 = rdd4.groupBy(lambda arr: arr[0]).values().map(lambda das: sorted(list(das), key=lambda x: x[2], reverse=True))
    print(rdd5.collect())
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

   原文件数据:
在这里插入图片描述

在这里插入图片描述

   结果如下:

[[('18688888888', '16030401EAFB68F1E3CDF819735E1C66', 87600), ('18688888888', '9F36407EAD0629FC166F14DDE7970F68', 51200), ('18688888888', 'CC0710CC94ECC657A8561DE549D940E0', 1300)], [('18611132889', '16030401EAFB68F1E3CDF819735E1C66', 97500), ('18611132889', '9F36407EAD0629FC166F14DDE7970F68', 54000), ('18611132889', 'CC0710CC94ECC657A8561DE549D940E0', 1900)]]
  • 1

3、读取文件并将结果保存至文件

from pyspark.sql import SparkSession
from py4j.compat import long


def formatData(arr):
    # arr = arr.split(",")
    mb = (arr[0], arr[2])
    flag = arr[3]
    time = long(arr[1])
    # time = arr[1]
    if flag == "1":
          time = -time
    return (mb,time)


if __name__ == "__main__":
    spark = SparkSession\
            .builder\
            .appName("PythonWordCount")\
            .master("local")\
            .getOrCreate()
    sc = spark.sparkContext
    line = sc.textFile("D:\\code\\hadoop\\data\\spark\\day1\\bs_log").map(lambda x: x.split(','))
    rdd0 = line.map(lambda x: formatData(x))
    rdd1 = rdd0.reduceByKey(lambda agg, obj: agg + obj).map(lambda t: (t[0][1], (t[0][0], t[1])))
    line2 = sc.textFile("D:\\code\\hadoop\\data\\spark\\day1\\lac_info.txt").map(lambda x: x.split(','))
    rdd2 = line2.map(lambda x: (x[0], (x[1], x[2])))
    rdd3 = rdd1.join(rdd2).map(lambda x: (x[1][0][0], x[0], x[1][0][1], x[1][1][0], x[1][1][1]))

    rdd4 = rdd3.groupBy(lambda x: x[0])
    rdd5 = rdd4.mapValues(lambda das: sorted(list(das), key=lambda x: x[2], reverse=True)[:2])
    print(rdd1.join(rdd2).collect())
    print(rdd5.collect())
    rdd5.saveAsTextFile("D:\\code\\hadoop\\data\\spark\\day02\\out1")
    sc.stop()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
   结果如下:

在这里插入图片描述

4、根据自定义规则匹配

import urllib
from pyspark.sql import SparkSession
def getUrls(urls):
    url = urls[0]
    parsed = urllib.parse.urlparse(url)
    return (parsed.netloc, url, urls[1])

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("PythonWordCount") \
        .master("local") \
        .getOrCreate()
    sc = spark.sparkContext
    line = sc.textFile("D:\\code\\hadoop\\data\\spark\\day02\\itcast.log").map(lambda x: x.split('\t'))
    //从数据库中加载规则
    arr = ["java.itcast.cn", "php.itcast.cn", "net.itcast.cn"]
    rdd1 = line.map(lambda x: (x[1], 1))
    rdd2 = rdd1.reduceByKey(lambda agg, obj: agg + obj)
    rdd3 = rdd2.map(lambda x: getUrls(x))

    for ins in arr:
        rdd = rdd3.filter(lambda x:x[0] == ins)
        result = rdd.sortBy(lambda x: x[2], ascending = False).take(2)
        print(result)
    spark.stop()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

   结果如下:
在这里插入图片描述

5、自定义类排序

from operator import gt
from pyspark.sql import SparkSession


class Girl:
    def __init__(self, faceValue, age):
        self.faceValue = faceValue
        self.age = age

    def __gt__(self, other):
        if other.faceValue == self.faceValue:
            return gt(self.age, other.age)
        else:
            return gt(self.faceValue, other.faceValue)


if __name__ == "__main__":
    spark = SparkSession\
            .builder\
            .appName("PythonWordCount")\
            .master("local")\
            .getOrCreate()
    sc = spark.sparkContext
    rdd1 = sc.parallelize([("yuihatano", 90, 28, 1), ("angelababy", 90, 27, 2), ("JuJingYi", 95, 22, 3)])
    rdd2 = rdd1.sortBy(lambda das: Girl(das[1], das[2]),False)
    print(rdd2.collect())
    sc.stop()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

   结果如下:

在这里插入图片描述

6、JDBC

from pyspark import SQLContext
from pyspark.sql import SparkSession

if __name__ == "__main__":
    spark = SparkSession\
            .builder\
            .appName("PythonWordCount")\
            .master("local")\
            .getOrCreate()
    sc = spark.sparkContext
    sqlContext = SQLContext(sc)
    df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/hellospark",driver="com.mysql.jdbc.Driver",dbtable="(select * from actor) tmp",user="root",password="123456").load()
    print(df.select('description','age').show(2))
    # print(df.printSchema)

    sc.stop()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

   结果如下:
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/600571
推荐阅读
相关标签
  

闽ICP备14008679号