Pyspark 高效的 map-reduce 算法在分布式系统中对子列表进行排序

多宝

鉴于 RDD:

+----------+----------+------------------------+
| a        | me       | [(1;1); (10;2); (5;3)] |
|          |          |                        |
| b        | dog      | [(1;3); (10;4); (2;4)] |
+----------+----------+------------------------+

我希望映射每一行,以便它的子数组按键(每个元组的第一个元素)排序,而不使用内置的 Python 函数,因为这些不是分布式函数。子列表的大小也很大。排序后的输出将如下所示:

+----------+----------+------------------------+
| a        | me       | [(1;1); (5;3); (10;2)] |
|          |          |                        |
| b        | dog      | [(1;3); (2;4); (10;4)] |
+----------+----------+------------------------+

你可以对待分号“;” 作为逗号。它们被使用,所以我使用的表生成器不会将数组分成几部分。

原始输入:

(a,me,[(1,1),(10,2),(5,3)])
(b,dog,[(1,3),(10,4),(2,4)])

原始输出:

(a,me,[(1,1),(5,3),(10,2)])
(b,dog,[(1,3),(2,4),(10,4)])

目前我正在使用它以及一个简单的 map() 调用来对子列表进行排序:

def sort_sublist(row):
    return (row[0], row[1], sorted(row[2], key=lambda tup: int(tup[0])))
...
my_rdd = my_rdd.map(lambda row: sort_sublist(row))

该函数使用未分发的 Python 的 sorted() 函数。为了使 map-reduce 算法更高效,我需要找到一种方法来使用 Apache Spark 的函数(map()、reduce() 等)来完成上述函数中的操作。

一个主意:

我已经完成了以下伪代码:

[ (a,me,[(1,1),(5,3),(10,2)]),
(b,dog,[(1,3),(2,4),(10,4)]) ]

=> map =>

[ [(1,1),(5,3),(10,2)],
[(1,3),(2,4),(10,4)] ]

=> zipWithIndex =>

[ ([(1,1),(5,3),(10,2)], 0),
([(1,3),(2,4),(10,4)], 1) ]

=> flatMap =>

[ ( (0,(1,1)),(0,(5,3)),(0,(10,2)) ),
( (1,(1,3)),(1,(2,4)),(1,(10,4)) ) ]

这是我遇到麻烦的地方。如果我使用字符串连接创建唯一的字符串键:

尝试 1:

=> map =>

[ (0+1,(1,1)),(0+5,(5,3)),(0+10,(10,2)),
(1+1,(1,3)),(1+2,(2,4)),(1+10,(10,4)) ]

=> key string concatenation =>

[ (01,(1,1)),(05,(5,3)),(010,(10,2)),
(11,(1,3)),(12,(2,4)),(110,(10,4)) ]

=> sortByKey =>

[ (01,(1,1)),(05,(5,3)),(010,(10,2)),
(11,(1,3)),(110,(10,4)),(12,(2,4)) ]

ISSUE 1:第二行的顺序不对。如果我使用整数创建唯一键:

尝试 2:

=> map =>

[ (0+1,(1,1)),(0+5,(5,3)),(0+10,(10,2)),
(1+1,(1,3)),(1+2,(2,4)),(1+10,(10,4)) ]

=> key integer sum =>

[ (1,(1,1)),(5,(5,3)),(10,(10,2)),
(2,(1,3)),(3,(2,4)),(11,(10,4)) ]

=> sortByKey =>

[ (1,(1,1)),(2,(1,3)),(3,(2,4)),
(5,(5,3)),(10,(10,2)),(11,(10,4)) ]

问题 2:行的顺序在此过程中丢失。

问题的要点是找到一种方法来保持行的顺序,同时能够为该行中的每个键提供一个值进行排序,以便每行的元组对按元组的键整数值排序。我的方法可能不是解决方案。我也是 Apache Spark 的新手,所以希望对其内部工作有更多了解的人可以提供一些有关是否有办法实现这一目标的见解。

安德鲁里斯

自从我在 Spark 工作已经很长时间了,但据我所知,这不是一个sorted非分布式问题,因为它会在调用 inside 时按每个 RDD 分区应用map()

不过,如果你真的想避免sorted,这里有一个相当笨拙的方法来实现你的目标:

import pyspark
sc = pyspark.SparkContext() 

# load data
data = [('a','me',[(1,1),(10,2),(5,3)]),
        ('b','dog',[(1,3),(10,4),(2,4)])]
rdd = sc.parallelize(data)

# perform sorting
(rdd.map(lambda x: (x[0],x[1]))
    .zipWithIndex()
    .map(lambda x: (x[1],x[0]))
    .join(
        rdd.map(lambda x: x[2])
           .zipWithIndex()
           .flatMap(lambda x: [(x[1],y) for y in x[0]])
           .map(lambda x: (x[1][0], (x[1][1], x[0])))
           .sortByKey()
           .map(lambda x: (x[1][1], (x[0], x[1][0])))
           .groupByKey()
           .map(lambda x: (x[0], list(x[1])))
    )
    .map(lambda x: (x[1][0][0], x[1][0][1], x[1][1]))
    .collect()
)

里面的代码join()对元组的内部列表进行排序。join()和周围的代码是用来连接排序的元组回的字符串项(“A”,“我”,等等),他们开始使用。

更新
响应关于效率的评论问题,sorted肯定比我上面提供的解决方案快。下面是一些示例数据来演示,子列表中有 10,000 个元组:

import numpy as np
minval = 1
maxval = 11
N = 10000
tup_list1 = zip(np.random.randint(minval,maxval,N),
                np.random.randint(minval,maxval,N))
tup_list2 = zip(np.random.randint(minval,maxval,N),
                np.random.randint(minval,maxval,N))

data = [('a','me',tup_list1),
        ('b','dog',tup_list2)]
rdd = sc.parallelize(data)

无需排序,使用上述方法:

%timeit (rdd.map(lambda x: (x[0],x[1]))
            .zipWithIndex()
            .map(lambda x: (x[1],x[0]))
            .join(rdd.map(lambda x: x[2])
                     .zipWithIndex()
                     .flatMap(lambda x: [(x[1],y) for y in x[0]])
                     .map(lambda x: (x[1][0], (x[1][1],x[0]))) 
                     .sortByKey()
                     .map(lambda x: (x[1][1], (x[0],x[1][0])))
                     .groupByKey()
                     .map(lambda x: (x[0],list(x[1]))))
                     .map(lambda x: (x[1][0][0], x[1][0][1], x[1][1]))
                     .collect())

回复:

# The slowest run took 25.94 times longer than the fastest.  
# This could mean that an intermediate result is being cached.
# 1 loop, best of 3: 1.18 s per loop

OP 的原始方法,使用sorted

%timeit (rdd.map(lambda x: (x[0],x[1], sorted(x[2], key=lambda tup: int(tup[0]))))
            .collect())

回复:

# 1 loop, best of 3: 193 ms per loop

小心使用 可以提高一些速度cache(),但仍然sorted是这里更简单和更快的解决方案。

这对于 map/reduce 范式来说并不是一个很好的用例;在我的回答中,我有点强迫它。当有许多重复键时,Map/reduce 会更强大,并且可以应用有用的函数来聚合这些键的各自值。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

Map Reduce分布式缓存

来自分类Dev

在pyspark中高效地以分布式方式生成大型DataFrame(无需pyspark.sql.Row)

来自分类Dev

RavenDB map reduce,reduce中的重复条目

来自分类Dev

使用UnderscoreJS进行Map Reduce

来自分类Dev

使用UnderscoreJS进行Map Reduce

来自分类Dev

Map Reduce程序中的问题

来自分类Dev

Map Reduce中的关键值

来自分类Dev

用于日志分析的 Map Reduce 作业不在 Hadoop 2.7.3 伪分布式模式下运行

来自分类Dev

使用Map Reduce算法创建Rtree?

来自分类Dev

对map reduce中的reduce函数的行为感到困惑

来自分类Dev

在PySpark / Delta数据帧上高效执行

来自分类Dev

完成MongoDB Map-Reduce中的步骤

来自分类Dev

完成MongoDB Map-Reduce中的步骤

来自分类Dev

高效的分布式算法,用于合并具有公共元素的集合

来自分类Dev

高效的分布式算法,用于合并具有公共元素的集合

来自分类Dev

向量迭代排序的高效算法

来自分类Dev

高效的knn算法

来自分类Dev

高效的混叠算法

来自分类Dev

矩阵计算的高效算法

来自分类Dev

MAP(PySpark)返回的元组列表(或迭代器)

来自分类Dev

高效的列MultiIndex排序

来自分类Dev

在C ++中的自定义unordered_map容器上实现高效插入

来自分类Dev

高效搜索嵌套列表

来自分类Dev

高效搜索嵌套列表

来自分类Dev

使用map / reduce在列表中添加数字对的差

来自分类Dev

pyspark中map函数内部的操作

来自分类Dev

高效反转排序列表

来自分类Dev

网格中的高效方法

来自分类Dev

堆中的高效簿记