在Spark Streaming中查找中位数

米哈尔

我正在尝试编写最简单的代码示例:

from numpy import median
from pyspark.streaming import StreamingContext

ssc = StreamingContext(sc, 30)

qs = ssc.queueStream([[1,2,3],[4,5],[6,7,8,9,9]])
output = qs.foreachRDD(median)

output.pprint()

ssc.start(); ssc.awaitTermination()

我想为流中的每个rdd生成中位数。我的流每30秒出现一次。为了测试我的代码,我做了一个queueStream

当我查看输出的类型时,我得到以下信息:

 type(output)
<type 'NoneType'>

为什么会这样呢?当我尝试median使用map应用于流时,它会将一次中值函数一次应用于列表的每个成员。我想将中值函数作为一个整体应用于整个RDD,因此该map函数是不可能的。

如何在Spark Streaming中计算流的中位数?

斯蒂芬·博斯

扩展@Justin的答案:发生了什么:

median()

应用于每个单独DSTREAM。但是结果没有被任何人使用..为什么?foreachRdd()是一个动作,而不是一个转换。

您应该查看DStream转换:例如map():这是尚未100%调试的代码-但它提供了一种结构:

from pyspark.streaming import *
ssc = StreamingContext(sc, 30)
dataRdd = [sc.parallelize(d, 1) for d in [[1,2,3],[4,5],[6,7,8,9,9]]]
qs = ssc.queueStream(dataRdd)

def list_median((med,mylist),newval):
    mylist = [newval] if not mylist else mylist.append(newval)
    mylist = sorted(mylist)
    return (mylist[int(len(mylist)/2)], mylist)

medians = qs.reduce(list_median).map(lambda (med,list): med)
def printRec(rdd):
    import sys
    rdd.foreach(lambda rec: sys.stderr.write(repr(rec)))

medians.foreachRDD(printRec)
ssc.start(); ssc.awaitTermination()

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

在Spark Streaming中查找中位数

来自分类Dev

在python中查找中位数时出错

来自分类Dev

如何在Spark Dataframe中分组并获取中位数

来自分类Dev

查找数组的中位数

来自分类Dev

查找数组的中位数

来自分类Dev

Spark Streaming + Spark SQL

来自分类Dev

Spark Streaming + Spark SQL

来自分类Dev

从Excel文件中查找Python中的中位数

来自分类Dev

在二进制搜索树中查找中位数

来自分类Dev

在tcl中查找列表的中位数和平均值

来自分类Dev

PHP函数在MySQL中查找列的中位数

来自分类Dev

在C加号中通过函数查找中位数

来自分类Dev

在 C++ 数组中查找中位数(双精度)?

来自分类Dev

Spark Streaming中的顺序处理

来自分类Dev

Spark Streaming中的并发操作

来自分类Dev

在Spark Streaming中缓存DStream

来自分类Dev

查找图像列表的中位数

来自分类Dev

查找arrayList中位数的问题

来自分类Dev

按组在Spark-Scala中查找百分位数

来自分类Dev

如何使用Python Dataframe API在Apache Spark中找到中位数?

来自分类Dev

Scala中的Spark Streaming代码中的错误

来自分类Dev

Spark和Spark Streaming中的时间序列预测

来自分类Dev

Spark Structured Streaming / Spark SQL 中的条件爆炸

来自分类Dev

Python中列表的中位数

来自分类Dev

R中的群组中位数

来自分类Dev

组中SQLITE的中位数

来自分类Dev

Python中列表的中位数

来自分类Dev

Javascript中数组的中位数?

来自分类Dev

在Spark-Streaming中解析JSON