我有一个RDD,已连接到HBase表。每行(键)代表一个GPS位置。现在,我编写了一个函数来计算两点之间的距离。该函数应与当前行及其前任[i-1]一起调用
现在,我正在努力使用RDD函数以功能方式完成此任务,以便可以并行化它。
我快速而肮脏的方法是首先创建一个数组
val rows = rdd.collect()
val rowCount = rdd.count() - 1 //since the first row has no distance
val rowArray = new Array[(String, Point, Point)](rowCount.asInstanceOf[Int])
var i = 0 //can be better solved in scala, I know ;)
rows.foreach(row => {
if (predecssorPoint == null) {
predecssorPoint = getPointByRow(row._2)
}
else {
currentPoint = getPointByRow(row._2)
rowArray(i) = Tuple3(row._1, predecssorPoint, currentPoint)
i += 1
predecssorPoint = currentPoint
}
})
return rowArray
然后我将数组并行化并计算距离
//create a parallel-enabled data set
val parallelDataSet = sc.parallelize(rows)
parallelDataSet.foreach(row => {
Functions.logDistance(row)
})
那行得通,但是效率低下,而且效率低下。
我的想法是使用rdd.reduce()摆脱foreach循环,如果distance函数处理无法保证(a + b)排序的问题,则这可能有效。
无论如何,有没有更好的解决方案?我的理解是,在使用RDD时,不可能(高效)进行索引访问。
谢谢。
鉴于排序在这里很关键,一个好的进行方法可能是首先为RDD编制索引。然后,使用索引,我们可以模拟一个zip,并在集群上对元组进行分区。像这样:
val indexed = rdd.zipWithIndex.map(_.swap) //
val shifted = indexed.map{case (k,v) => (k-1,v)}
val joined = indexed.join(shifted)
val distanceRDD = joined.map{(k,(v1,v2)) => distanceFunction(v1,v2)}
(*)示例代码-未测试
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句