我有一个 RDD[(Int, ListBuffer[Byte])] 并且我喜欢执行“wordcount”但是对于列表中的每个数字。例如,RDD是:
(31000,ListBuffer(1, 1, 0, 1, 0, 1, 1, 1, 1))
(21010,ListBuffer(0, 0, 0))
(23000,ListBuffer(1, 1, 1, 1, 1))
(01000,ListBuffer(1, 1))
(34000,ListBuffer(0))
我想得到这个:
(31000,(0,2),(1,7)) // this could be a Map[0=>2, 1=>7]
(21010,(0,3))
(23000,(1,5))
(01000,(1,2))
(34000,(0,3))
任何指导?先感谢您
编辑:有人建议我的问题是重复的,但问题是建议的帖子只是一个列表,但我想申请一对(Int,List)。
在 Scala 集合中获取值的直方图的最惯用的方法是使用groupBy
后跟一个映射,该映射采用每个结果组的大小:
scala> import collection.mutable.ListBuffer
import collection.mutable.ListBuffer
scala> val values = ListBuffer(1, 1, 0, 1, 0, 1, 1, 1, 1)
values: scala.collection.mutable.ListBuffer[Int] = ListBuffer(1, 1, 0, 1, 0, 1, 1, 1, 1)
scala> values.groupBy(identity).mapValues(_.size)
res0: scala.collection.immutable.Map[Int,Int] = Map(1 -> 7, 0 -> 2)
在您的情况下,该部分完全独立于 Spark 部分 - 您只是碰巧对 RDD 中的值执行此操作,但完整的解决方案如下所示:
scala> val counts = myRdd.mapValues(_.groupBy(identity).mapValues(_.size))
counts: org.apache.spark.rdd.RDD[(Int, scala.collection.immutable.Map[Int,Int])] = MapPartitionsRDD[1] at mapValues at <console>:26
scala> counts.foreach(println)
(1000,Map(1 -> 2))
(21010,Map(0 -> 3))
(23000,Map(1 -> 5))
(34000,Map(0 -> 1))
(31000,Map(1 -> 7, 0 -> 2))
值得注意的是,mapValues
Scala 上的集合是惰性的,这意味着每次在 RDD 中使用映射时,都会重新计算值。这可能没问题,但如果您担心,可以将其替换为以下内容:
values.groupBy(identity).map { case (k, v) => k -> v.size }
...这将返回一个严格评估的地图。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句