鉴于 RDD:
+----------+----------+------------------------+
| a | me | [(1;1); (10;2); (5;3)] |
| | | |
| b | dog | [(1;3); (10;4); (2;4)] |
+----------+----------+------------------------+
我希望映射每一行,以便它的子数组按键(每个元组的第一个元素)排序,而不使用内置的 Python 函数,因为这些不是分布式函数。子列表的大小也很大。排序后的输出将如下所示:
+----------+----------+------------------------+
| a | me | [(1;1); (5;3); (10;2)] |
| | | |
| b | dog | [(1;3); (2;4); (10;4)] |
+----------+----------+------------------------+
你可以对待分号“;” 作为逗号。它们被使用,所以我使用的表生成器不会将数组分成几部分。
原始输入:
(a,me,[(1,1),(10,2),(5,3)])
(b,dog,[(1,3),(10,4),(2,4)])
原始输出:
(a,me,[(1,1),(5,3),(10,2)])
(b,dog,[(1,3),(2,4),(10,4)])
目前我正在使用它以及一个简单的 map() 调用来对子列表进行排序:
def sort_sublist(row):
return (row[0], row[1], sorted(row[2], key=lambda tup: int(tup[0])))
...
my_rdd = my_rdd.map(lambda row: sort_sublist(row))
该函数使用未分发的 Python 的 sorted() 函数。为了使 map-reduce 算法更高效,我需要找到一种方法来使用 Apache Spark 的函数(map()、reduce() 等)来完成上述函数中的操作。
一个主意:
我已经完成了以下伪代码:
[ (a,me,[(1,1),(5,3),(10,2)]),
(b,dog,[(1,3),(2,4),(10,4)]) ]
=> map =>
[ [(1,1),(5,3),(10,2)],
[(1,3),(2,4),(10,4)] ]
=> zipWithIndex =>
[ ([(1,1),(5,3),(10,2)], 0),
([(1,3),(2,4),(10,4)], 1) ]
=> flatMap =>
[ ( (0,(1,1)),(0,(5,3)),(0,(10,2)) ),
( (1,(1,3)),(1,(2,4)),(1,(10,4)) ) ]
这是我遇到麻烦的地方。如果我使用字符串连接创建唯一的字符串键:
尝试 1:
=> map =>
[ (0+1,(1,1)),(0+5,(5,3)),(0+10,(10,2)),
(1+1,(1,3)),(1+2,(2,4)),(1+10,(10,4)) ]
=> key string concatenation =>
[ (01,(1,1)),(05,(5,3)),(010,(10,2)),
(11,(1,3)),(12,(2,4)),(110,(10,4)) ]
=> sortByKey =>
[ (01,(1,1)),(05,(5,3)),(010,(10,2)),
(11,(1,3)),(110,(10,4)),(12,(2,4)) ]
ISSUE 1:第二行的顺序不对。如果我使用整数创建唯一键:
尝试 2:
=> map =>
[ (0+1,(1,1)),(0+5,(5,3)),(0+10,(10,2)),
(1+1,(1,3)),(1+2,(2,4)),(1+10,(10,4)) ]
=> key integer sum =>
[ (1,(1,1)),(5,(5,3)),(10,(10,2)),
(2,(1,3)),(3,(2,4)),(11,(10,4)) ]
=> sortByKey =>
[ (1,(1,1)),(2,(1,3)),(3,(2,4)),
(5,(5,3)),(10,(10,2)),(11,(10,4)) ]
问题 2:行的顺序在此过程中丢失。
问题的要点是找到一种方法来保持行的顺序,同时能够为该行中的每个键提供一个值进行排序,以便每行的元组对按元组的键整数值排序。我的方法可能不是解决方案。我也是 Apache Spark 的新手,所以希望对其内部工作有更多了解的人可以提供一些有关是否有办法实现这一目标的见解。
自从我在 Spark 工作已经很长时间了,但据我所知,这不是一个sorted
非分布式问题,因为它会在调用 inside 时按每个 RDD 分区应用map()
。
不过,如果你真的想避免sorted
,这里有一个相当笨拙的方法来实现你的目标:
import pyspark
sc = pyspark.SparkContext()
# load data
data = [('a','me',[(1,1),(10,2),(5,3)]),
('b','dog',[(1,3),(10,4),(2,4)])]
rdd = sc.parallelize(data)
# perform sorting
(rdd.map(lambda x: (x[0],x[1]))
.zipWithIndex()
.map(lambda x: (x[1],x[0]))
.join(
rdd.map(lambda x: x[2])
.zipWithIndex()
.flatMap(lambda x: [(x[1],y) for y in x[0]])
.map(lambda x: (x[1][0], (x[1][1], x[0])))
.sortByKey()
.map(lambda x: (x[1][1], (x[0], x[1][0])))
.groupByKey()
.map(lambda x: (x[0], list(x[1])))
)
.map(lambda x: (x[1][0][0], x[1][0][1], x[1][1]))
.collect()
)
里面的代码join()
对元组的内部列表进行排序。在join()
和周围的代码是用来连接排序的元组回的字符串项(“A”,“我”,等等),他们开始使用。
更新
响应关于效率的评论问题,sorted
肯定比我上面提供的解决方案快。下面是一些示例数据来演示,子列表中有 10,000 个元组:
import numpy as np
minval = 1
maxval = 11
N = 10000
tup_list1 = zip(np.random.randint(minval,maxval,N),
np.random.randint(minval,maxval,N))
tup_list2 = zip(np.random.randint(minval,maxval,N),
np.random.randint(minval,maxval,N))
data = [('a','me',tup_list1),
('b','dog',tup_list2)]
rdd = sc.parallelize(data)
无需排序,使用上述方法:
%timeit (rdd.map(lambda x: (x[0],x[1]))
.zipWithIndex()
.map(lambda x: (x[1],x[0]))
.join(rdd.map(lambda x: x[2])
.zipWithIndex()
.flatMap(lambda x: [(x[1],y) for y in x[0]])
.map(lambda x: (x[1][0], (x[1][1],x[0])))
.sortByKey()
.map(lambda x: (x[1][1], (x[0],x[1][0])))
.groupByKey()
.map(lambda x: (x[0],list(x[1]))))
.map(lambda x: (x[1][0][0], x[1][0][1], x[1][1]))
.collect())
回复:
# The slowest run took 25.94 times longer than the fastest.
# This could mean that an intermediate result is being cached.
# 1 loop, best of 3: 1.18 s per loop
OP 的原始方法,使用sorted
:
%timeit (rdd.map(lambda x: (x[0],x[1], sorted(x[2], key=lambda tup: int(tup[0]))))
.collect())
回复:
# 1 loop, best of 3: 193 ms per loop
小心使用 可以提高一些速度cache()
,但仍然sorted
是这里更简单和更快的解决方案。
这对于 map/reduce 范式来说并不是一个很好的用例;在我的回答中,我有点强迫它。当有许多重复键时,Map/reduce 会更强大,并且可以应用有用的函数来聚合这些键的各自值。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句