赞
踩
20200806 -
在编程过程中,遇到这样一个问题,给定一个范围,需要通过对这个范围的内容数值进行运算来返回结果。说白了,就是进行爆破。但是这个数据量太大了,0-0xffffffff
,可以想象这个数据量有多大。一开始的时候我在想使用python的多线程/多进程来完成这个工作,但是我记得python有一个全局锁,并不是真正意义上的多线程(这部分知识需要回顾一下)。而且想利用多台机器来实现这个运算过程,就考虑使用spark或者hadoop来进行这部分运算来实现。
(20200807 增加)
前文内容为原始部分针对数值线性增长类型进行描述,下文为组合类型。
前面的描述是针对某种数字进行爆破,但是目前又遇到了另外一个问题,取值范围是,
tmp_str = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
print(len(tmp_str))
#output:
#62
import math
int(math.pow(62,16))
#output: 47672401706823532720689250304L
从上述字符串中随机取出16个(可重复,放回式取出,可重复的排列),这样的组合空间,最后一行是全部的数量。个人认为,我面临的这个问题我利用爆破的手法可能不太对;这里仅仅思考这类组合方式应该利用什么方法来实现spark的编程。
平时的时候,使用spark都是直接面向数据来处理,读取文件,然后进行相应的查询各种操作。但是,这里的一个问题是这样的。
问题描述:
对某个函数生成的字符串进行爆破,函数的参数是0x0-0xffffffff
之间的无符号整数。
那么,问题本质上就是通过遍历这个数组,并查看数值作为参数返回的结果是否满足要求。
目前,我就是python的for循环来实现计算,但是单进程的方式毕竟要慢很多,所以希望通过spark来实现大数据量的计算(这里暂时不讨论hadoop的内容)。
其实,我感觉这个任务也简单,就是生成一个大数组,然后将这个数组分发出去,然后worker分别执行这些计算任务。(不过,我感觉好像这种任务跟过年的时候,雪松问我的那个分布式框架很像,就是把任务分散出去的内容)但是,对于一些细节,不是很清楚,这里来简单描述一下。
import os
##下面这个语句,我有印象,好像是为了满足某些shell的什么东西吧,但是删除了依然好使
os.environ['PYSPARK_PYTHON']='/usr/bin/python2'
spark_home = os.getenv("SPARK_HOME")
#将pyspark的位置增加到python的库搜索路径
import sys
sys.path.insert(0,os.path.join(spark_home,"python"))
#获取sparkSession
from pyspark.sql import SparkSession
def get_spark_session(app_name = None, master = "spark://master:7077"):
spark = SparkSession.builder\
.master(master) \
.appName(app_name) \
.config("ui.showConsoleProgress","true") \
.config("spark.executor.cores","3") \
.config("spark.executor.memory","10G") \
.getOrCreate()
return spark
import time
now_date_str = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
Your_app_name = ""
spark = get_spark_session(Your_app_name + now_date_str)
count_list = spark.sparkContext.parallelize([i for i in xrange(0xffffffff)])
直接生成这个数组,但是由于这部分工作是在driver上直接执行的(即使设置了parallelize的分片数量,因为首先要将这个数组生成在driver内存中),最终必然导致了内存不够用,然后应用失败,如下图2-1所示。如果实现小数据量还行。
但是这种方式对于小数据量的情况下,应该是可行的;那么首先利用小数据量来实现以下后续的代码。
假设我要做的事情是,返回列表中每个数值的平方。
compute_array = spark.sparkContext.parallelize([i for i in xrange(0xffff)])
def square(x):
return x*x
compute_array.map(square).take(10)
# output:
# [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
从结果来看,证明了这种方式是可行的。
在小数据量的情况下,能够利用这种方式实现这个任务;但是在大数据量的情况下,因为生成数组的时候是在driver的机器上,所以必须将这部分任务转移。这里,问题就到达了,怎么生成这么大的RDD问题上来。
思考以下正常的情况下,如果是读取文件的形式的话,是每个excutor来读取文件,这样就将内存给分开了。如果是这样的话,我可以将所有的任务都写到文件中,然后再将这个文件上传至HDFS中,通过然后加载文件来执行。不过我感觉,可能这种方法生成这个数据就挺慢的。这也是一个问题。
假设我使用python来进行这个任务(采用多线程的形式),那么我可以利用线程池的方式,然后利用生成器生成这个数组,也就是说,我启动了十个线程(假设),他们各自从生成器取出数据,然后进行运算,最后将运算的结果返回,或生成文件,或直接打印日志。
那么,如果想将这种模型移植到spark中,应该怎么处理呢?
(我个人感觉通过上面这种形式好像移植不过来)
这里主要的问题就是,平时如果是想驱动其他excutor来执行任务,都是使用map这种函数,然后他们处理的逻辑也是一行一行的,然后返回每行的结果。这里我有一个想法,是不是可以通过先将这个数组变小,比如变小100倍,或者说,我每行的内容就是一个数据的范围。
*但是这种方案,在第一想法中感觉不太对的就是,本来他返回的是一行一行的,按照key来对应,但是这里我返回的就是一堆了,这种可能不太对应于原来的模型。*如果真的是这种情况,需要每行数据都返回一个结果,那这个数据量本来就很大。
好,来描述一下具体内容,忽略上面返回结果的问题,那么我要做的就是,我给每个excutor一个范围,如果这个范围内的数字经过了某个函数之后,能够满足我的要求,他就返回这个内容。
some_value = 4200000000
start_pos = 0
one_count = 100000
end_pos = 0
compute_array = []
while end_pos < some_value - 1:
end_pos += one_count
compute_array.append((start_pos,end_pos))
start_pos = end_pos + 1
compute_array_rdd = spark.sparkContext.parallelize(compute_array, 100)
def test_fun(value):
x,y = value
i = 0
tmp_value = x
return_array = []
while tmp_value <= y:
if tmp_value % 521 == 0:
return_array.append(tmp_value)
tmp_value += 1
return ",".join(map(str,return_array))
return_rdd = compute_array_rdd.map(test_fun)
代码如下,其中测试函数是计算某个数值是否能整除521,同时将上面的RDD分成了100份,计算还是很快的;同时返回的内容是字符串,来满足一行的需求。
上述的解决方案都是将生成的数组缩小到driver可以执行的程度,本质上是将内存的需求转嫁到excutor上。(这里也存在一个问题,当然我程序没有这样的顾虑,那就是如果你要生成的结果占用内存越来越大,同样也会导致程序崩溃)
本小节中针对的类型是数值型,对于大的数值取值范围,是在driver将分割数组的工作完成。但我个人思考后觉得,可以直接只传递一个1-100
(举例说明),然后让excutor自己去选择自己的范围即可。
本问题属于从备选字符集(或其他内类型集合)中选取多个字符生成相应的组合结果,利用组合结果进行分布式计算的过程;如果从树数据结构的来思考,那么我就是遍历一个从根节点之后,每次都分支备选集个数的叶节点,一直递归到相应的组合长度的深度。
在2.3.3中提到,通过将实际的内存占用过程,或者说子空间的遍历行为转嫁到excutor上,以此缓解driver的压力。那么这种树结构形式的方法也可以尝试尝试,但是必然要保证遍历空间一定能完全遍历。
本部分关于如何设置spark的环境不再赘述,与前文一致。
跟第二小节一样,如果能够将这部分全部生成,就采用生成的方式即可。假设,我要从0-9
字符中随机选取,获取全部的长度为l
的字符串。
char_set = '0123456789'
char_set_len = len(char_set)
#res_len = 4
i = j = k = l = 0
str_res = []
for i in range(char_set_len):
for j in range(char_set_len):
for k in range(char_set_len):
for l in range(char_set_len):
str_res.append(char_set[i] + char_set[j] + char_set[k] + char_set[l])
print(len(str_res))
#output: 10000
print(str_res[:10])
'''
output:
[
'0000',
'0001',
'0002',
'0003',
'0004',
'0005',
'0006',
'0007',
'0008',
'0009',
]
'''
这种方法的代码,看起来很简单,如果最终字符串的长度越来越长,要管理的for循环个数也会越来越多,小长度的情况可以使用。
前面已经提到,这种取集合的方式可以看成一个树,在分支节点上,每次都选择一个节点,一直到最深的节点,那么可以用递归的方法来描述上述代码。
关于递归的形式,暂时有两个问题要考虑:
import copy
str_res = []
#生成的字符长度
l = 4
#可选字符集合
char_set = '0123456789'
char_set_len = len(char_set)
tmp_str = ['0' for i in range(l)]
def recursive_str_fun(depth):
if depth == l:
str_res.append(copy.copy("".join(tmp_str)))
return
for i in range(char_set_len):
tmp_str[depth] = char_set[i]
recursive_str_fun(depth + 1)
resursive_str_fun(0)
print(len(str_res))
#output: 10000
print(str_res[:10])
'''
output:
[
'0000',
'0001',
'0002',
'0003',
'0004',
'0005',
'0006',
'0007',
'0008',
'0009',
]
'''
上面的方法是我自己思考的结果,其实python中已经自带了这部分功能,同时还可以写成生成器的形式这样更方便。
文章[1]中介绍了使用itertools生成排列组合的方式,对于我这部分功能的生成,以下代码即可完成。
import itertools
str_res=("".join(x) for x in itertools.product("0123456789",repeat=4))
同时这种功能还是一种生成器的变形[2],生成器的形式有两种,一种就是函数中使用yield关键词,另一种就是(i for i in range(10)),当然[i for i in range(10)]也是可以的。
对于长度比较长,而且备选数据集比较大的场景下,同样利用树的结构来考虑。因为最后一个字符只有在最底层的时候才能生成,那么可以通过挑选前面的一些枝干作为种子,传递给excutor。也就是说,将将树结构根节点分支的前几层作为种子。
前几层的种子可以直接使用3.2小节的第三种方法生成,后面也一样。也就是说,在driver中,生成前几层的种子,excutor来生成后面的字符。
import itertools
mylist=("".join(x) for x in itertools.product("0123456789abcdefghijklmn",repeat=4))
def check_func(prefix_str):
tmp_list = ("".join(x) for x in itertools.product("0123456789abcdefghijklmn",repeat=4))
for one in tmp_list:
if prefix_str + one == "01234567":
return prefix_str + one
else:
continue
return ""
test_list_rdd = spark.sparkContext.parallelize(mylist,100)
return_rdd = test_list_rdd.map(check_func)
res_rdd = return_rdd.filter(lambda x: len(x)>0)
res_rdd.count()
(注意,这里其实有一个小bug,就是我之前以为[]中的变量x不会被外面访问,但是实际情况是他访问了,当时我把()换成了[],就一直出现这个我问题,所以一定要注意,尽量避免变量的重名,按说[]内的变量作用域应该是释放了,主要是因为参数名字的问题,所以出现了这种问题。)
上述代码的意义是,字符集"0123456789abcdefghijklmn"
,选取四个生成种子文件,然后执行map函数让excutor来生成剩余的字符,同时查看是否匹配。
上面这个代码看起来挺简单,最后在spark集群上还是运行了13分钟,看来这个运算量还是很大的。
虽然任务成功了,但我觉得还是有提升的空间,就是是不是还有别的方法能够完成这种任务呢?按说这种任务的需求应该挺大的。
同时,针对这部分的性能优化,应该考虑具体的机器配置,这部分后续再进行优化。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。