我是Scala和Spark的新手。我正在尝试在地图转换期间返回多个键值对。我的输入数据是一个简单的CSV文件。
1,2,3 4,5,6 7,8,9
我的Scala脚本如下所示。
class Key(_i:Integer, _j:Integer) {
def i = _i
def j = _j
}
class Val(_x:Double, _y:Double) {
def x = _x
def y = _y
}
val arr = "1,2,3".split(",")
for(i <- 0 until arr.length) {
val x = arr(i).toDouble
for(j <- 0 until arr.length) {
val y = arr(j).toDouble
val k = new Key(i, j)
val v = new Val(x, y)
//note that i want to return the tuples, (k, v)
}
}
我希望能够使用上面的for循环和数据结构返回多个元组(k,v)。类似于下面的代码。
val file = sc.textFile("/path/to/test.csv")
file.map(line => {
val arr = line.split(",")
for(i <- 0 until arr.length) {
val x = arr(i).toDouble
for(j <- (i+1) until arr.length) {
val y = arr(j).toDouble
val k = new Index(i,j)
val v = new Val(x,y)
(k,v)
}
}
}).collect //reduceByKey is not there, reduce is there, but not what i want
当我将上面的代码复制/粘贴到lambda表达式中(并在Scala REPL shell上运行)时,出现以下错误:
错误:简单表达式的非法启动 val arr = line.split(“,”) ^
我也意识到我仍然停留在命令式/过程式编程思想中,所以请耐心等待(和Scala / Spark的新手)。
您忘记了箭头后面的括号。如果它是一个简单表达式(一个表达式),则只能忽略它们。
file.map(line => {
//multiple lines of code here
})
编辑后的完整答案:
case class Index(i:Integer, j:Integer)
case class Val(x:Double, y:Double)
val data = sc.parallelize(List("1,2,3", "4,5,6", "7,8,9"))
data.flatMap(line=>{
val arr = line.split(",")
val doubleSeq = for(i <- 0 until arr.length) yield {
val x = arr(i).toDouble
for(j <- (i+1) until arr.length) yield {
val y = arr(j).toDouble
val k = Index(i,j)
val v = Val(x,y)
(k,v)
}
}
doubleSeq.flatten
})
实际上有很多问题:
Serializable
map
为flatMap
,以及flatten
ed数组,因为flatMap
仍然会留下一个内部数组。现在,两者的结合将为您带来,您RDD[(Index, Val)]
现在可以将其隐式地与reduceByKey
for
循环变成了一种for
理解yield
。您得到的最终类型是Unit
因为for
循环的返回类型是Unit
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句