pyspark矩阵累加器

罗曼

我想相加填充与来自推断值的矩阵rdd使用pyspark累加器; 我发现文档有点不清楚。添加一些背景,以防万一。
MyrddData包含必须将一个计数添加到矩阵的索引列表。例如,此列表映射到索引:
[1,3,4] -> (11), (13), (14), (33), (34), (44)

现在,这是我的累加器:

from pyspark.accumulators import AccumulatorParam
class MatrixAccumulatorParam(AccumulatorParam):
    def zero(self, mInitial):
        import numpy as np
        aaZeros = np.zeros(mInitial.shape)
        return aaZeros

    def addInPlace(self, mAdd, lIndex):
        mAdd[lIndex[0], lIndex[1]] += 1
        return mAdd

这是我的映射器函数:

def populate_sparse(lIndices):
    for i1 in lIndices:
        for i2 in lIndices:
            oAccumilatorMatrix.add([i1, i2])

然后运行数据:

oAccumilatorMatrix = oSc.accumulator(aaZeros, MatrixAccumulatorParam())

rddData.map(populate_sparse).collect()

现在,当我查看数据时:

sum(sum(oAccumilatorMatrix.value))
#= 0.0

不应该的。我想念什么?

EDIT首先使用稀疏矩阵对此进行了尝试,得到了不支持稀疏矩阵的回溯。更改了稠密numpy矩阵的问题:

...

    raise IndexError("Indexing with sparse matrices is not supported"
IndexError: Indexing with sparse matrices is not supported except boolean indexing where matrix and index are equal shapes.
罗曼

啊哈!我想我明白了。最终,累加器仍需要自己添加一些内容。因此,更改addInPlace为:

def addInPlace(self, mAdd, lIndex):
    if type(lIndex) == list:
        mAdd[lIndex[0], lIndex[1]] += 1
    else:
        mAdd += lIndex
    return mAdd

因此,现在在给定列表时添加索引,并在populate_sparse函数循环后添加自身以创建最终矩阵。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章