赞
踩
MapReduce框架局限性
Hadoop生态圈
批处理:MapReduce、Hive、Pig
流式计算:Storm
交互式计算:Impala、presto
需要一种灵活的框架可同时进行批处理、流式计算、交互式计算
./pyspark
sc = spark.sparkContext
words = sc.textFile('file:///home/hadoop/tmp/word.txt') \
.flatMap(lambda line: line.split(" ")) \
.map(lambda x: (x, 1)) \
.reduceByKey(lambda a, b: a + b).collect()
输出结果
[('python', 2), ('hadoop', 1), ('bc', 1), ('foo', 4), ('test', 2), ('bar', 2), ('quux', 2), ('abc', 2), ('ab', 1), ('you', 1), ('ac', 1), ('bec', 1), ('by', 1), ('see', 1), ('labs', 2), ('me', 1), ('welcome', 1)]
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合.
不可变 Rdd1->rdd2
可分区 partition
并行计算
第一步 创建sparkContext
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
创建RDD
>>>sc
<SparkContext master=local[*] appName=PySparkShell>
Parallelized Collections方式创建RDD
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
>>> data = [1, 2, 3, 4, 5]
>>> distData = sc.parallelize(data)
>>> data
[1, 2, 3, 4, 5]
>>> distData
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:175
>>> distData = sc.parallelize(data,5)
>>> distData.reduce(lambda a, b: a + b)
15
Spark将为群集的每个分区(partition)运行一个任务(task)。 通常,可以根据CPU核心数量指定分区数量(每个CPU有2-4个分区)如未指定分区数量,Spark会自动设置分区数。
通过外部数据创建RDD
PySpark可以从Hadoop支持的任何存储源创建RDD,包括本地文件系统,HDFS,Cassandra,HBase,Amazon S3等
支持整个目录、多文件、通配符
支持压缩文件
>>> rdd1 = sc.textFile('file:///root/tmp/word.txt')
>>> rdd1.collect()
['foo foo quux labs foo bar quux abc bar see you by test welcome test', 'abc labs foo me python hadoop ab ac bc bec python']
RDD 支持两种类型的操作:
transformation
action
所有的transformation操作都是惰性的(lazy)
不会立即计算结果
只记下应用于数据集的transformation操作
只有调用action一类的操作之后才会计算所有transformation
这种设计使Spark运行效率更高
例如map reduce 操作,map创建的数据集将用于reduce,map阶段的结果不会返回,仅会返回reduce结果。
persist 操作
>>> rdd1 = sc.parallelize([1,2,3,4,5,6,7,8,9],3)
>>> rdd2 = rdd1.map(lambda x: x+1)
>>> rdd2.collect()
[2, 3, 4, 5, 6, 7, 8, 9, 10]
>>> rdd1 = sc.parallelize([1,2,3,4,5,6,7,8,9],3)
>>> def add(x):
... return x+1
...
>>> rdd2 = rdd1.map(add)
>>> rdd2.collect()
[2, 3, 4, 5, 6, 7, 8, 9, 10]
filter(func) 选出所有func返回值为true的元素,生成一个新的RDD返回
>>> rdd1 = sc.parallelize([1,2,3,4,5,6,7,8,9],3)
>>> rdd2 = rdd1.map(lambda x:x*2)
>>> rdd3 = rdd2.filter(lambda x:x>4)
>>> rdd3.collect()
[6, 8, 10, 12, 14, 16, 18]
>>> rdd1 = sc.parallelize(["a b c","d e f","h i j"])
>>> rdd2 = rdd1.flatMap(lambda x:x.split(" "))
>>> rdd2.collect()
['a', 'b', 'c', 'd', 'e', 'f', 'h', 'i', 'j']
>>> rdd1 = sc.parallelize(["a b c","d e f","h i j"])
>>> rdd2 = rdd1.map(lambda x:x.split(" "))
>>> rdd2.collect()
[['a', 'b', 'c'], ['d', 'e', 'f'], ['h', 'i', 'j']]
union
>>> rdd1 = sc.parallelize([("a",1),("b",2)])
>>> rdd2 = sc.parallelize([("c",1),("b",3)])
>>> rdd3 = rdd1.union(rdd2)
>>> rdd3.collect()
[('a', 1), ('b', 2), ('c', 1), ('b', 3)]
intersection
>>> rdd1 = sc.parallelize([("a",1),("b",2)])
>>> rdd2 = sc.parallelize([("c",1),("b",3)])
>>> rdd3 = rdd1.union(rdd2)
>>> rdd4 = rdd3.intersection(rdd2)
>>> rdd4.collect()
[('c', 1), ('b', 3)]
>>> rdd1 = sc.parallelize([("a",1),("b",2)])
>>> rdd2 = sc.parallelize([("c",1),("b",3)])
>>> rdd3 = rdd1.union(rdd2)
>>> rdd4 = rdd3.groupByKey()
>>> rdd4.collect()
[('a', <pyspark.resultiterable.ResultIterable object at 0x7fba6a5e5898>), ('c', <pyspark.resultiterable.ResultIterable object at 0x7fba6a5e5518>), ('b', <pyspark.resultiterable.ResultIterable object at 0x7fba6a5e5f28>)]
>>> result[2]
('b', <pyspark.resultiterable.ResultIterable object at 0x7fba6c18e518>)
>>> result[2][1]
<pyspark.resultiterable.ResultIterable object at 0x7fba6c18e518>
>>> list(result[2][1])
[2, 3]
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> rdd.reduceByKey(lambda x,y:x+y).collect()
[('b', 1), ('a', 2)]
>>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
>>> sc.parallelize(tmp).sortByKey().first()
('1', 3)
>>> sc.parallelize(tmp).sortByKey(True, 1).collect()
[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
>>> sc.parallelize(tmp).sortByKey(True, 2).collect()
[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
>>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)]
>>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)])
>>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect()
[('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5),...('white', 9), ('whose', 6)]
collect
返回一个list,list中包含 RDD中的所有元素
只有当数据量较小的时候使用Collect 因为所有的结果都会加载到内存中
reduce
reduce将RDD中元素两两传递给输入函数,同时产生一个新的值,新产生的值与RDD中下一个元素再被传递给输入函数直到最后只有一个值为止。
>>> rdd1 = sc.parallelize([1,2,3,4,5])
>>> rdd1.reduce(lambda x,y : x+y)
15
>>> sc.parallelize([2, 3, 4]).first()
2
>>> sc.parallelize([2, 3, 4, 5, 6]).take(2)
[2, 3]
>>> sc.parallelize([2, 3, 4, 5, 6]).take(10)
[2, 3, 4, 5, 6]
>>> sc.parallelize(range(100), 100).filter(lambda x: x > 90).take(3)
[91, 92, 93]
>>> sc.parallelize([2, 3, 4]).count()
3
目标:
将spark目录下的python目录下的pyspark整体拷贝到pycharm使用的python环境下
将下图中的pyspark, 拷贝到pycharm使用的:xxx\Python\Python36\Lib\site-packages目录下
import sys from pyspark.sql import SparkSession if __name__ == '__main__': if len(sys.argv) != 2: print("Usage: avg <input>", file=sys.stderr) sys.exit(-1) spark = SparkSession.builder.appName("test").getOrCreate() sc = spark.sparkContext counts = sc.textFile(sys.argv[1]) \ .flatMap(lambda line: line.split(" ")) \ .map(lambda x: (x, 1)) \ .reduceByKey(lambda a, b: a + b) output = counts.collect() for (word, count) in output: print("%s: %i" % (word, count)) sc.stop()
将代码上传到远程cent-os系统上
在系统上执行指令
spark-submit --master local wc.py file:///root/bigdata/data/spark_test.log
在新闻类网站中,经常要衡量一条网络新闻的页面访问量,最常见的就是uv和pv,如果在所有新闻中找到访问最多的前几条新闻,topN是最常见的指标。
#每条数据代表一次访问记录 包含了ip 访问时间 访问的请求方式 访问的地址...信息
194.237.142.21 - - [18/Sep/2013:06:49:18 +0000] "GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)"
183.49.46.228 - - [18/Sep/2013:06:49:23 +0000] "-" 400 0 "-" "-"
163.177.71.12 - - [18/Sep/2013:06:49:33 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
163.177.71.12 - - [18/Sep/2013:06:49:36 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
101.226.68.137 - - [18/Sep/2013:06:49:42 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
101.226.68.137 - - [18/Sep/2013:06:49:45 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
60.208.6.156 - - [18/Sep/2013:06:49:48 +0000] "GET /wp-content/uploads/2013/07/rcassandra.png HTTP/1.0" 200 185524 "http://cos.name/category/software/packages/" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"
222.68.172.190 - - [18/Sep/2013:06:49:57 +0000] "GET /images/my.jpg HTTP/1.1" 200 19939 "http://www.angularjs.cn/A00n" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"
222.68.172.190 - - [18/Sep/2013:06:50:08 +0000] "-" 400 0 "-" "-"
pv:网站的总访问量
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("pv").getOrCreate()
sc = spark.sparkContext
rdd1 = sc.textFile("file:///root/bigdata/data/access.log")
#把每一行数据记为("pv",1)
rdd2 = rdd1.map(lambda x:("pv",1)).reduceByKey(lambda a,b:a+b)
rdd2.collect()
sc.stop()
uv:网站的独立用户访问量
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("pv").getOrCreate()
sc = spark.sparkContext
rdd1 = sc.textFile("file:///root/bigdata/data/access.log")
#对每一行按照空格拆分,将ip地址取出
rdd2 = rdd1.map(lambda x:x.split(" ")).map(lambda x:x[0])
#把每个ur记为1
rdd3 = rdd2.distinct().map(lambda x:("uv",1))
rdd4 = rdd3.reduceByKey(lambda a,b:a+b)
rdd4.saveAsTextFile("hdfs:///uv/result")
sc.stop()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("topN").getOrCreate()
sc = spark.sparkContext
rdd1 = sc.textFile("file:///root/bigdata/data/access.log")
#对每一行按照空格拆分,将url数据取出,把每个url记为1
rdd2 = rdd1.map(lambda x:x.split(" ")).filter(lambda x:len(x)>10).map(lambda x:(x[10],1))
#对数据进行累加,按照url出现次数的降序排列
rdd3 = rdd2.reduceByKey(lambda a,b:a+b).sortBy(lambda x:x[1],ascending=False)
#取出序列数据中的前n个
rdd4 = rdd3.take(5)
rdd4.collect()
sc.stop()
在互联网中,我们经常会见到城市热点图这样的报表数据,例如在百度统计中,会统计今年的热门旅游城市、热门报考学校等,会将这样的信息显示在热点图中。
因此,我们需要通过日志信息(运行商或者网站自己生成)和城市ip段信息来判断用户的ip段,统计热点经纬度。
在ip日志信息中,我们只需要关心ip这一个维度就可以了,其他的不做介绍
1、 加载城市ip段信息,获取ip起始数字和结束数字,经度,纬度
2、 加载日志数据,获取ip信息,然后转换为数字,和ip段比较
3、 比较的时候采用二分法查找,找到对应的经度和纬度
4、对相同的经度和纬度做累计求和
from pyspark.sql import SparkSession # 255.255.255.255 0~255 256 2^8 8位2进制数 32位2进制数 #将ip转换为特殊的数字形式 223.243.0.0|223.243.191.255| 255 2^8 #11011111 #00000000 #1101111100000000 # 11110011 #11011111111100110000000000000000 def ip_transform(ip): ips = ip.split(".")#[223,243,0,0] 32位二进制数 ip_num = 0 for i in ips: ip_num = int(i) | ip_num << 8 return ip_num #二分法查找ip对应的行的索引 def binary_search(ip_num, broadcast_value): start = 0 end = len(broadcast_value) - 1 while (start <= end): mid = int((start + end) / 2) if ip_num >= int(broadcast_value[mid][0]) and ip_num <= int(broadcast_value[mid][1]): return mid if ip_num < int(broadcast_value[mid][0]): end = mid if ip_num > int(broadcast_value[mid][1]): start = mid def main(): spark = SparkSession.builder.appName("test").getOrCreate() sc = spark.sparkContext city_id_rdd = sc.textFile("file:///root/tmp/ip.txt").map(lambda x:x.split("|")).map(lambda x: (x[2], x[3], x[13], x[14])) #创建一个广播变量 city_broadcast = sc.broadcast(city_id_rdd.collect()) dest_data = sc.textFile("file:///root/tmp/20090121000132.394251.http.format").map( lambda x: x.split("|")[1]) #根据取出对应的位置信息 def get_pos(x): city_broadcast_value = city_broadcast.value #根据单个ip获取对应经纬度信息 def get_result(ip): ip_num = ip_transform(ip) index = binary_search(ip_num, city_broadcast_value) #((纬度,精度),1) return ((city_broadcast_value[index][2], city_broadcast_value[index][3]), 1) x = map(tuple,[get_result(ip) for ip in x]) return x dest_rdd = dest_data.mapPartitions(lambda x: get_pos(x)) #((纬度,精度),1) result_rdd = dest_rdd.reduceByKey(lambda a, b: a + b) print(result_rdd.collect()) sc.stop() if __name__ == '__main__': main()
要统计Ip所对应的经纬度, 每一条数据都会去查询ip表
每一个task 都需要这一个ip表, 默认情况下, 所有task都会去复制ip表
实际上 每一个Worker上会有多个task, 数据也是只需要进行查询操作的, 所以这份数据可以共享,没必要每个task复制一份
可以通过广播变量, 通知当前worker上所有的task, 来共享这个数据,避免数据的多次复制,可以大大降低内存的开销
sparkContext.broadcast(要共享的数据)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。