可以说我有以下数据框:
var randomData = Seq(("a",8),("h",5),("f",3),("a",2),("b",8),("c",3)
val df = sc.parallelize(randomData,2).toDF()
而且我有这个功能,它将作为输入mapPartition
:
def trialIterator(row:Iterator[(String,Int)]): Iterator[(String,Int)] =
row.toArray.tail.toIterator
并使用地图分区:
df.mapPartition(trialIterator)
我收到以下错误消息:
类型不匹配,预期的(Iterator [Row])=> Iterator [NotInferedR],实际:Iterator [(String,Int)=> Iterator [(String,Int)]
我知道这是由于函数的输入,输出类型引起的,但是如何解决呢?
如果你想获得强类型输入不使用Dataset[Row]
(DataFrame
),但Dataset[T]
其中T
在此特定情形的(String, Int)
。同样不要在不知道分区是否为空的情况下转换为Array
盲目调用tail
:
def trialIterator(iter: Iterator[(String, Int)]) = iter.drop(1)
randomData
.toDS // org.apache.spark.sql.Dataset[(String, Int)]
.mapPartitions(trialIterator _)
或者
randomData.toDF // org.apache.spark.sql.Dataset[Row]
.as[(String, Int)] // org.apache.spark.sql.Dataset[(String, Int)]
.mapPartitions(trialIterator _)
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句