Spark:在RDD map()中使用迭代器Lambda函数

插口

我在HDFS上有简单的数据集,正在将其加载到Spark中。看起来像这样:

1 1 1 1 1 1 1
1 1 1 1 1 1 1
1 1 1 1 1 1 1
...

基本上是一个矩阵 我正在尝试实现一些需要对矩阵行进行分组的事情,因此,我试图为每一行添加一个唯一键,如下所示:

(1, [1 1 1 1 1 ... ])
(2, [1 1 1 1 1 ... ])
(3, [1 1 1 1 1 ... ])
...

我尝试了一些天真的尝试:设置全局变量并编写一个lambda函数以遍历全局变量:

# initialize global index
global global_index
global_index = 0

# function to generate keys
def generateKeys(x):
    global_index+=1
    return (global_index,x)

# read in data and operate on it
data = sc.textFile("/data.txt")

...some preprocessing...

data.map(generateKeys)

而且它似乎不认识全局变量的存在。

有没有想到的简单方法可以做到这一点?

谢谢杰克

扬·维尔辛斯基
>>> lsts = [
...     [1, 1, 1, 1, 1, 1],
...     [1, 1, 1, 1, 1, 1],
...     [1, 1, 1, 1, 1, 1],
...     [1, 1, 1, 1, 1, 1],
...     [1, 1, 1, 1, 1, 1],
...     [1, 1, 1, 1, 1, 1],
...     [1, 1, 1, 1, 1, 2],
...     [1, 1, 1, 2, 1, 2]
...     ]
...
>>> list(enumerate(lsts))
[(0, [1, 1, 1, 1, 1, 1]),
 (1, [1, 1, 1, 1, 1, 1]),
 (2, [1, 1, 1, 1, 1, 1]),
 (3, [1, 1, 1, 1, 1, 1]),
 (4, [1, 1, 1, 1, 1, 1]),
 (5, [1, 1, 1, 1, 1, 1]),
 (6, [1, 1, 1, 1, 1, 2]),
 (7, [1, 1, 1, 2, 1, 2])]

enumerate 为可迭代项中的每个项生成唯一索引,并生成具有值的元组 (index, original_item)

如果要使用以外的编号开始编号0,请将起始值enumerate作为第二个参数传递

>>> list(enumerate(lsts, 1))
[(1, [1, 1, 1, 1, 1, 1]),
 (2, [1, 1, 1, 1, 1, 1]),
 (3, [1, 1, 1, 1, 1, 1]),
 (4, [1, 1, 1, 1, 1, 1]),
 (5, [1, 1, 1, 1, 1, 1]),
 (6, [1, 1, 1, 1, 1, 1]),
 (7, [1, 1, 1, 1, 1, 2]),
 (8, [1, 1, 1, 2, 1, 2])]

请注意,list用来获取实值,enumerate实值是从迭代器而不是函数返回列表的。

替代方案:全局可用的ID分配器

enumerate易于使用,但是如果您需要在不同的代码段中添加id,它将变得困难或不可能。在这种情况下,可以使用全球可用的生成器(如OP中的绘图器)。

itertools提供count可以满足我们需求的产品:

>>> from itertools import count
>>> idgen = count()

现在,我们已经idgen准备好(全球可用)生成器来生成唯一的ID。

我们可以通过一个函数prid(打印ID)对其进行测试

>>> def prid():
...     id = idgen.next()
...     print id
...
>>> prid()
0
>>> prid()
1
>>> prid()
2
>>> prid()
3

在工作时,我们可以在值列表上对其进行测试:

>>> lst = ['100', '101', '102', '103', '104', '105', '106', '107', '108', '109']

并定义实际函数,当使用值调用时将返回元组 (id, value)

>>> def assignId(val):
...     return (idgen.next(), val)
...

注意,不需要声明idgen为全局变量,因为我们不会更改其值(idgen调用时只会更改其内部状态,但仍保持相同的生成器)。

测试是否可行:

>>> assignId("ahahah")
(4, 'ahahah')

并尝试在列表上:

>>> map(assignId, lst)
[(5, '100'),
 (6, '101'),
 (7, '102'),
 (8, '103'),
 (9, '104'),
 (10, '105'),
 (11, '106'),
 (12, '107'),
 (13, '108'),
 (14, '109')]

enumerate解决方案的主要区别在于,我们可以在代码中的任何位置一个一个地分配id,而无需在所有处理中全部完成enumerate

>>> assignId("lonely line")
(15, 'lonely line')

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

了解Spark for RDD中的Lambda函数输入

来自分类Dev

使用SSD进行SPARK RDD

来自分类Dev

RDD.map 函数在 Spark 中挂起

来自分类Dev

何时在Spark中使用RDD和DataFrame

来自分类Dev

如何使用非 Lambda 函数定义 Spark RDD 转换

来自分类Dev

Apache Spark-使用2个RDD:RDD的补充

来自分类Dev

映射函数写入全局 spark rdd

来自分类Dev

使用Scala Apache Spark合并RDD

来自分类Dev

Spark:使用Stratio和RDD查询Mongodb

来自分类Dev

使用RDD的Spark流上下文

来自分类Dev

使用scala在spark中创建对RDD

来自分类Dev

在Scala Spark中未调用RDD的Map函数

来自分类Dev

Spark:缓存要在其他作业中使用的RDD

来自分类Dev

Scala/Spark:仅使用 RDD 函数将 DataFrame 展平

来自分类Dev

如何基于if条件在Spark rdd map动作中跳过行

来自分类Dev

除非访问了RDD中的项目,否则Spark的RDD.map()将不会执行

来自分类Dev

如何根据基于Spark中另一个RDD的函数过滤RDD?

来自分类Dev

通过Thrift服务器访问Spark SQL RDD表

来自分类Dev

如何使用 Spark RDD 生成或映射到另一个 RDD

来自分类常见问题

如何在Scala的Spark RDD中避免使用collect?

来自分类Dev

与两个RDD一起使用Apache Spark

来自分类Dev

使用Scala在Apache Spark中连接不同RDD的数据集

来自分类Dev

Spark:从多个文件排序RDD,而无需使用collect

来自分类Dev

如何在Scala的Spark RDD中避免使用collect?

来自分类Dev

将 CSV 转换为 RDD 并使用 Spark/Scala 读取

来自分类Dev

无法使用 spark RDD API 写入序列文件

来自分类Dev

如何使用 spark-scala 删除 rdd 中的 unicode?

来自分类Dev

如何使用 Spark Scala 加入 3 个 RDD

来自分类Dev

如何使用 Scala 语言将 Spark RDD 转换为 JSON

Related 相关文章

热门标签

归档