顺序RDD处理中的功能性方法[Apache Spark]

马可

我有一个RDD,已连接到HBase表。每行(键)代表一个GPS位置。现在,我编写了一个函数来计算两点之间的距离。该函数应与当前行及其前任[i-1]一起调用

现在,我正在努力使用RDD函数以功能方式完成此任务,以便可以并行化它。

我快速而肮脏的方法是首先创建一个数组

val rows = rdd.collect()
val rowCount = rdd.count() - 1 //since the first row has no distance
val rowArray = new Array[(String, Point, Point)](rowCount.asInstanceOf[Int])
var i = 0 //can be better solved in scala, I know ;)

rows.foreach(row => {
  if (predecssorPoint == null) {
    predecssorPoint = getPointByRow(row._2)
  }
  else {
    currentPoint = getPointByRow(row._2)
    rowArray(i) = Tuple3(row._1, predecssorPoint, currentPoint)

    i += 1
    predecssorPoint = currentPoint
  }
})

return rowArray

然后我将数组并行化并计算距离

  //create a parallel-enabled data set
  val parallelDataSet = sc.parallelize(rows)

  parallelDataSet.foreach(row => {     
  Functions.logDistance(row)
})

那行得通,但是效率低下,而且效率低下。

我的想法是使用rdd.reduce()摆脱foreach循环,如果distance函数处理无法保证(a + b)排序的问题,则这可能有效。

无论如何,有没有更好的解决方案?我的理解是,在使用RDD时,不可能(高效)进行索引访问。

谢谢。

马斯格

鉴于排序在这里很关键,一个好的进行方法可能是首先为RDD编制索引。然后,使用索引,我们可以模拟一个zip,并在集群上对元组进行分区。像这样:

val indexed = rdd.zipWithIndex.map(_.swap) // 
val shifted = indexed.map{case (k,v) => (k-1,v)}
val joined = indexed.join(shifted)
val distanceRDD = joined.map{(k,(v1,v2)) => distanceFunction(v1,v2)}

(*)示例代码-未测试

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

Apache Spark 中 RDD 方法 persist() 和 cache() 的默认持久性级别是多少

来自分类Dev

Apache Spark:在RDD中处理Option / Some / None

来自分类Dev

Spark Streaming中的顺序处理

来自分类Dev

将批处理RDD中的结果与Apache Spark中的流式RDD合并

来自分类Dev

Apache Spark RDD替代

来自分类Dev

根据SPARK scala中的条件处理RDD

来自分类Dev

等效于Apache Spark RDD中的getLines

来自分类Dev

如何减少RDD在Apache Spark中的工作

来自分类Dev

Apache Spark: reading RDD from Spark Cluster

来自分类Dev

Apache Spark-处理基于临时RDD的Windows

来自分类Dev

Spark中的键值对顺序

来自分类Dev

apache spark中的批处理API调用?

来自分类Dev

Spark-如何正确处理RDD.map()方法中的错误情况?

来自分类Dev

Apache Spark中的“哪里”

来自分类Dev

Flink中Spark RDD.persist(..)的等效功能吗?

来自分类Dev

巧妙地处理Spark RDD中的Option [T]

来自分类Dev

如何在 spark rdd 生成中处理 CSV 文件列?

来自分类Dev

从Spark RDD中删除元素

来自分类Dev

在Scala Spark中嵌套RDD

来自分类Dev

从Spark RDD中删除元素

来自分类Dev

Apache Spark MultilayerPerceptronClassifier设置功能

来自分类Dev

Apache Spark方法返回RDD(具有尾递归)

来自分类Dev

Apache Spark处理偏斜数据

来自分类Dev

使用Scala在Apache Spark中连接不同RDD的数据集

来自分类Dev

在Apache Spark中,如何使RDD / DataFrame操作变得懒惰?

来自分类Dev

在Apache Spark中缓存RDD的目的是什么?

来自分类Dev

是否可以在Apache Spark中创建嵌套的RDD?

来自分类Dev

如何在Apache Spark的RDD中传递血统

来自分类Dev

Tachyon是否默认由Apache Spark中的RDD实现?