我有一个键/值RDD,我想“迭代”其中的实体,键/值,并创建或映射到另一个RDD,该RDD可能具有比第一个RDD更多或更少的条目。
例子:
我有积累记录,代表对绘画中颜色的观察。观察实体/对象保存有关绘画名称和绘画颜色的数据。
Observation
public String getPaintingName() {return paintingName;}
public List<String> getObservedColors() {return colorList}
我将 accumulo 中的观察结果作为 RDD 提取到我的代码中。
val observationRDD: RDD[(Text, Observation)] = getObservationsFromAccumulo();
我想用这个 RDD 并创建一个 (Color,paintingName) 形式的 RDD,其中键是观察到的颜色,值是观察到颜色的绘画名称。
val colorToPaintingRDD: RDD[(String, String)] = observationRDD.somefunction({ case (_, observation) =>
for(String color : observations.getObservedColors()) {
// Some how output a entry into a new RDD
//output/map (color, observation.getPaintingName)
})
我知道 map 不能工作,因为它是 1 比 1,我想可能是 ObservationRDD.flatmap(some function) 但似乎找不到任何关于如何创建新的、更大或更小的 RDD 的示例。
有人可以帮助我并告诉我 flatmap 是否正确,如果是,请使用我提供的这个示例给我一个示例,或者告诉我我是否离基地很远?
请理解这只是一个简单的例子,它不是我要问的内容,而是如何将 RDD 转换为具有更多或更少条目的 RDD。
您应该使用 flatmap 并为 RDD 中的每个元素返回一个 List[(String, String)] 。FlatMap 会将结果弄平,你会得到一个 RDD[(String, String)]
我没有尝试代码,但它会是这样的:
val colorToPaintingRDD: RDD[(String, String)] = observationRDD.flatMap { case (_, observation) =>
observations.getObservedColors().map(color => (color, observation.getPaintingName))
}
可能如果 getObservedColors 方法在 Java 中,您必须导入 JavaConversions 并更改为 Scala 列表。
import scala.collection.JavaConversions._
observations.getObservedColors().toList
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句