Does anyone know if there is a way during a transform to look at neighboring elements in a sorted RDD? I know I can collect and then do such an operation as the one in the below example, however it kind of defeats the purpose of a distributed system and I'm trying to leverage the fact that it's distributed.
Example:
RDD of (string name, int val) map to RDD of (string name, int val, int diff)
such that:
name | val becomes -> name | val | diff (current - prior)
a | 3 a | 3 | 3
b | 6 b | 6 | 3
c | 4 c | 4 | -2
d | 20 d | 20 | 16
Probably the most efficient approach simplest approach is to convert a RDD to data frame and use lag:
case class NameValue(name: String, value: Int)
val rdd = sc.parallelize(
NameValue("a", 3) :: NameValue("b", 6) ::
NameValue("c", 4) :: NameValue("d", 20) :: Nil)
val df = sqlContext.createDataFrame(rdd)
df.registerTempTable("df")
sqlContext.sql("""SELECT name, value,
value - lag(value) OVER (ORDER BY name, value) lag
FROM df""").show
Unfortunately at this moment window functions without PARTITION BY
clause move all data to a single partition so it is particularly useful if you have large dataset.
Using low level operations you could use zipWithIndex
followed by flatMap
and groupByKey
:
case class NameValueWithLag(name: String, value: Int, lag: Int)
val cnt = rdd.count() - 1
rdd.
zipWithIndex.
flatMap{case (x, i) => (0 to 1).map(lag => (i - lag, (i, x)))}.
groupByKey.
filter{ case (k, v) => k != cnt}.
values.
map(vals => {
val sorted = vals.toArray.sortBy(_._1).map(_._2)
if (sorted.length == 1) {
NameValueWithLag(sorted(0).name, sorted(0).value, sorted(0).value)
} else {
NameValueWithLag(
sorted(1).name, sorted(1).value,
sorted(1).value - sorted(0).value
)
}
})
Edit:
If you don't mind using developers API there you can try RDDFunctions.sliding
but it requires manual processing
import org.apache.spark.mllib.rdd.RDDFunctions._
val first = rdd.first match {
case NameValue(name, value) => NameValueWithLag(name, value, value)
}
sc.parallelize(Seq(first)).union(rdd
.sliding(2)
.map(a => NameValueWithLag(a(1).name, a(1).value, a(1).value - a(0).value)))
Collected from the Internet
Please contact [email protected] to delete if infringement.
Comments