我有2个RDD,每个RDD是一组包含重复项的字符串。我想找到保留重复项的两个集合的交集。例子:
RDD1 : a, b, b, c, c, c, c
RDD2 : a, a, b, c, c
我想要的交集是集合,a, b, c, c
即交集将包含每个元素出现在两个集合中的最小时间。
默认intersection
转换不保留重复的AFAIK。有没有一种方法可以使用其他一些转换和/或交集变换来有效地计算交集?我正在尝试避免通过算法进行操作,这不太可能像执行Spark方法那样高效。(对于感兴趣的人,我正在尝试为一组文件计算Jaccard bag的相似度)。
从的实现中借用一点intersection
,您可以执行以下操作:
(val rdd1 = sc.parallelize(Seq("a", "b", "b", "c", "c", "c", "c")))
(val rdd2 = sc.parallelize(Seq("a", "a", "b", "c", "c")))
val cogrouped = rdd1.map(k => (k, null)).cogroup(rdd2.map(k => (k, null)))
val groupSize = cogrouped.map { case (key, (buf1, buf2)) => (key, math.min(buf1.size, buf2.size)) }
val finalSet = groupSize.flatMap { case (key, size) => List.fill(size)(key) }
(finalSet.collect = Array(a, b, c, c))
之所以cogroup
可行,是因为它将为每个分组重复出现一对值(在这种情况下,所有空值)。另请注意,在这里,我们所进行的改组操作不会比最初使用时多intersection
。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句