我正在尝试使用PySpark查找相邻元组列表之间的平均差。
例如,如果我有这样的RDD
vals = [(2,110),(2,130),(2,120),(3,200),(3,206),(3,206),(4,150),(4,160),(4,170)]
我想找到每个键的平均差异。
例如对于键值“ 2”
平均差异为(abs(110-130)+ abs(130-120))/ 2 = 15。
到目前为止,这是我的方法。我正在尝试更改平均计算代码以适应此情况。但这似乎不起作用。
from pyspark import SparkContext
aTuple = (0,0)
interval = vals.aggregateByKey(aTuple, lambda a,b: (abs(a[0] - b),a[1] + 1),
lambda a,b: (a[0] + b[0], a[1] + b[1]))
finalResult = interval.mapValues(lambda v: (v[0]/v[1])).collect()
我想使用RDD函数,不使用Spark SQL或任何其他附加程序包来执行此操作。
最好的方法是什么?
请让我知道,如果你有任何问题。
感谢您的时间。
我想出了一个幼稚的方法。我不确定这是否在所有情况下都适用。它像这样。
首先让我们计算一下移动平均值。如果这不是计算移动平均线的正确方法,请纠正我。
def get_abs(num_list):
'''
>>> get_abs([110, 130, 120])
15.0
'''
acc = 0
num_pairs = 0
for i in range(len(num_list)-1):
acc += abs(num_list[i]-num_list[i+1])
num_pairs +=1
return acc/num_pairs
接下来,我们并行化列表
>>> vals = [(2,110),(2,130),(2,120),(3,200),(3,206),(3,206),(4,150),(4,160),(4,170)]
>>> rdd = sc.parallelize(vals)
>>> rdd.collect()
[(2, 110),
(2, 130),
(2, 120),
(3, 200),
(3, 206),
(3, 206),
(4, 150),
(4, 160),
(4, 170)]
然后,将属于同一列表的值分组。
>>> vals = rdd.groupByKey().mapValues(list)
>>> vals.collect()
[(4, [150, 160, 170]), (2, [110, 130, 120]), (3, [200, 206, 206])]
然后,我们只需要调用上面定义的函数来计算分组值的移动平均值。
>>> vals.mapValues(get_abs).collect()
[(4, 10.0), (2, 15.0), (3, 3.0)]
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句