如何使用 Spark RDD 生成或映射到另一个 RDD

绝地小子

我有一个键/值RDD,我想“迭代”其中的实体,键/值,并创建或映射到另一个RDD,该RDD可能具有比第一个RDD更多或更少的条目。

例子:

我有积累记录,代表对绘画中颜色的观察。观察实体/对象保存有关绘画名称和绘画颜色的数据。

Observation
public String getPaintingName() {return paintingName;}
public List<String> getObservedColors() {return colorList}

我将 accumulo 中的观察结果作为 RDD 提取到我的代码中。

val observationRDD: RDD[(Text, Observation)] = getObservationsFromAccumulo();

我想用这个 RDD 并创建一个 (Color,paintingName) 形式的 RDD,其中键是观察到的颜色,值是观察到颜色的绘画名称。

 val colorToPaintingRDD: RDD[(String, String)] = observationRDD.somefunction({ case (_, observation) =>
    for(String color : observations.getObservedColors()) {
       // Some how output a entry into a new RDD
       //output/map (color, observation.getPaintingName)
 })

我知道 map 不能工作,因为它是 1 比 1,我想可能是 ObservationRDD.flatmap(some function) 但似乎找不到任何关于如何创建新的、更大或更小的 RDD 的示例。

有人可以帮助我并告诉我 flatmap 是否正确,如果是,请使用我提供的这个示例给我一个示例,或者告诉我我是否离基地很远?

请理解这只是一个简单的例子,它不是我要问的内容,而是如何将 RDD 转换为具有更多或更少条目的 RDD。

气体

您应该使用 flatmap 并为 RDD 中的每个元素返回一个 List[(String, String)] 。FlatMap 会将结果弄平,你会得到一个 RDD[(String, String)]

我没有尝试代码,但它会是这样的:

val colorToPaintingRDD: RDD[(String, String)] = observationRDD.flatMap { case (_, observation) =>
    observations.getObservedColors().map(color => (color, observation.getPaintingName))
}

可能如果 getObservedColors 方法在 Java 中,您必须导入 JavaConversions 并更改为 Scala 列表。

import scala.collection.JavaConversions._
observations.getObservedColors().toList

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

Apache Spark-使用2个RDD:RDD的补充

来自分类Dev

Spark:如何按时间范围加入RDD

来自分类Dev

与两个RDD一起使用Apache Spark

来自分类Dev

如何根据基于Spark中另一个RDD的函数过滤RDD?

来自分类Dev

根据Spark中的另一个RDD进行过滤

来自分类Dev

持久性Spark RDD在另一个Spark Shell中不可用

来自分类Dev

Spark RDD-它们如何工作

来自分类Dev

Spark:如何将RDD的Seq转换为RDD

来自分类Dev

如何获得Spark RDD的第n行?

来自分类Dev

如何取消Spark Hadoop RDD计算

来自分类Dev

如何在Spark中将单个RDD划分为多个RDD

来自分类Dev

使用SSD进行SPARK RDD

来自分类Dev

Spark:如何将List <RDD>联合到RDD

来自分类Dev

从SPARK中的另一个RDD返回最大N个值的RDD

来自分类Dev

如何从Spark中的RDD和DataFrame过滤?

来自分类Dev

Spark如何决定如何对RDD进行分区?

来自分类Dev

如何使两个Spark RDD并行运行

来自分类Dev

如何从RDD创建Spark数据集

来自分类Dev

Spark Streaming:如何定期刷新缓存的RDD?

来自分类Dev

使用必需的特定列从另一个rdd创建rdd

来自分类Dev

使用正则表达式基于另一个 RDD 过滤一个 RDD

来自分类Dev

RDD 将每个子串映射到另一个 RDD

来自分类Dev

如何在 Spark (Scala) 中组合两个 RDD?

来自分类Dev

spark:根据另一个 rdd 的顺序加入 rdd

来自分类Dev

如何在另一个 RDD 中压缩带有相应元素的 RDD?

来自分类Dev

如何在 spark rdd 生成中处理 CSV 文件列?

来自分类Dev

如何将随机 rdd 加入另一个 rdd?

来自分类Dev

如何使用 Spark Scala 加入 3 个 RDD

来自分类Dev

Scala 嵌套映射到 Spark RDD