我有微风矢量的RDD,想计算它们的平均值。我的第一种方法是使用aggregate
:
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.rdd.RDD
import org.scalatest.{ BeforeAndAfterAll, FunSuite, Matchers, Suite }
import org.scalatest.prop.GeneratorDrivenPropertyChecks
import breeze.linalg.{ Vector => BreezeVector }
class CalculateMean extends FunSuite with Matchers with GeneratorDrivenPropertyChecks with SparkSpec {
test("Calculate mean") {
type U = (BreezeVector[Double], Int)
type T = BreezeVector[Double]
val rdd: RDD[T] = sc.parallelize(List(1.0, 2, 3, 4, 5, 6).map { x => BreezeVector(x, x * x) }, 2)
val zeroValue = (BreezeVector.zeros[Double](2), 0)
val seqOp = (agg: U, x: T) => (agg._1 + x, agg._2 + 1)
val combOp = (xs: U, ys: U) => (xs._1 + ys._1, xs._2 + ys._2)
val mean = rdd.aggregate(zeroValue)(seqOp, combOp)
println(mean._1 / mean._2.toDouble)
}
}
/**
* Setup and tear down spark context
*/
trait SparkSpec extends BeforeAndAfterAll {
this: Suite =>
private val master = "local[2]"
private val appName = this.getClass.getSimpleName
private var _sc: SparkContext = _
def sc: org.apache.spark.SparkContext = _sc
val conf: SparkConf = new SparkConf()
.setMaster(master)
.setAppName(appName)
override def beforeAll(): Unit = {
super.beforeAll()
_sc = new SparkContext(conf)
}
override def afterAll(): Unit = {
if (_sc != null) {
_sc.stop()
_sc = null
}
super.afterAll()
}
}
但是,此算法可能在数值上不稳定(请参阅https://stackoverflow.com/a/1346890/1037094)。
如何在Spark中为微风矢量实现Knuths算法,rdd.aggregate
推荐的方法是这样做?
如何在Spark中为微风矢量实现Knuths算法,并且rdd.aggregate是推荐的方法?
aggregate
如果Knuth描述的算法是正确的选择,那么这可能是一个很好的方法。不幸的是,如果没有一些调整,它不是,或者至少不是。它本身就是顺序流算法,它所应用的功能不是关联的。让我们假设您有一个功能knuth_mean
。应该清楚的是(忽略计数和单元素案例):
(knuth_mean (knuth_mean (knuth_mean 1 2) 3) 4)
与...不同
(knuth_mean (knuth_mean 1 2) (knuth_mean 3 4))
不过,您仍然可以使用Knuth算法获得每个分区的平均值:
def partMean(n: Int)(iter: Iterator[BreezeVector[Double]]) = {
val partialMean = iter.foldLeft((BreezeVector.zeros[Double](n), 0.0))(
(acc: (BreezeVector[Double], Double), v: BreezeVector[Double]) =>
(acc._1 + (v - acc._1) / (acc._2 + 1.0), acc._2 + 1.0))
Iterator(partialMean)
}
val means = rdd.mapPartitions(partMean(lengthOfVector))
问题仍然在于如何汇总部分结果。Knuth算法的直接应用将需要展开分区,这完全超出了使用Spark的整个目的。您可以通过StatCounter.merge
方法来查看如何在Spark内部对其进行处理。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句