Spark:在Scala中使用mapPartition

米皮佐斯·迪米特里斯(Mpizos Dimitris)

可以说我有以下数据框:

var randomData = Seq(("a",8),("h",5),("f",3),("a",2),("b",8),("c",3)
val df = sc.parallelize(randomData,2).toDF()

而且我有这个功能,它将作为输入mapPartition

def trialIterator(row:Iterator[(String,Int)]): Iterator[(String,Int)] =
    row.toArray.tail.toIterator

并使用地图分区:

df.mapPartition(trialIterator)

我收到以下错误消息:

类型不匹配,预期的(Iterator [Row])=> Iterator [NotInferedR],实际:Iterator [(String,Int)=> Iterator [(String,Int)]

我知道这是由于函数的输入,输出类型引起的,但是如何解决呢?

零323

如果你想获得强类型输入不使用Dataset[Row]DataFrame),但Dataset[T]其中T在此特定情形的(String, Int)同样不要在不知道分区是否为空的情况下转换为Array盲目调用tail

def trialIterator(iter: Iterator[(String, Int)]) = iter.drop(1)

randomData
  .toDS // org.apache.spark.sql.Dataset[(String, Int)]
  .mapPartitions(trialIterator _)

或者

randomData.toDF // org.apache.spark.sql.Dataset[Row] 
  .as[(String, Int)] // org.apache.spark.sql.Dataset[(String, Int)]
  .mapPartitions(trialIterator _)

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

Apache Spark:何时不使用 mapPartition 和 foreachPartition?

来自分类Dev

在Apache Spark(Scala)中使用reduceByKey

来自分类Dev

在Spark Scala中使用struct创建架构

来自分类Dev

在Apache Spark(Scala)中使用reduceByKey

来自分类Dev

在Spark SQL查询中使用Scala列表

来自分类Dev

在Spark Scala中使用map()对键值对重新排序

来自分类Dev

如何在Spark中使用Scala中的countDistinct?

来自分类Dev

在Scala中使用Regex和Spark过滤DataFrame

来自分类Dev

如何在Spark / Scala解释器(REPL)中使用JDBC?

来自分类Dev

保存Spark StandardScaler以供以后在Scala中使用

来自分类Dev

在没有Spark的Scala中使用Parquet-Mr

来自分类Dev

Gradle:在Eclipse中使用Apache Spark设置Scala项目

来自分类Dev

Spark:如何使用mapPartition并在每个分区上创建/关闭连接

来自分类Dev

如何使Apache Spark mapPartition正常工作?

来自分类Dev

Apache Spark mapPartition 奇怪的行为(懒惰评估?)

来自分类Dev

在Spark中使用日期

来自分类Dev

使用Scala在Spark中使用广播变量的正确语法是什么?

来自分类Dev

在Scala中使用注释

来自分类常见问题

在Spark Scala中使用blob元素编写字符串的JSON数组

来自分类Dev

如何在Apache Spark上的Scala中使用可变映射?找不到金钥错误

来自分类Dev

如何在Spark Streaming中使用无限Scala流作为源?

来自分类Dev

如何在Scala Spark文档中使用<A,U> RDD <U>解释方法?

来自分类Dev

如何在JSON中使用read.schema仅指定特定字段:SPARK Scala

来自分类Dev

在Spark / Scala ML中使用RegexTokenizer()后,StopWords()无法正常工作

来自分类Dev

在Spark Scala中使用blob元素编写字符串的JSON数组

来自分类Dev

如何在Scala Spark中使用窗口滞后来查找更改

来自分类Dev

如何在Scala Spark文档中使用<A,U> RDD <U>解释方法?

来自分类Dev

如何在集群环境中使用Spark Scala分发处理以在csv中找到waldos?

来自分类Dev

如何在JSON中使用read.schema仅指定特定字段:SPARK Scala

Related 相关文章

  1. 1

    Apache Spark:何时不使用 mapPartition 和 foreachPartition?

  2. 2

    在Apache Spark(Scala)中使用reduceByKey

  3. 3

    在Spark Scala中使用struct创建架构

  4. 4

    在Apache Spark(Scala)中使用reduceByKey

  5. 5

    在Spark SQL查询中使用Scala列表

  6. 6

    在Spark Scala中使用map()对键值对重新排序

  7. 7

    如何在Spark中使用Scala中的countDistinct?

  8. 8

    在Scala中使用Regex和Spark过滤DataFrame

  9. 9

    如何在Spark / Scala解释器(REPL)中使用JDBC?

  10. 10

    保存Spark StandardScaler以供以后在Scala中使用

  11. 11

    在没有Spark的Scala中使用Parquet-Mr

  12. 12

    Gradle:在Eclipse中使用Apache Spark设置Scala项目

  13. 13

    Spark:如何使用mapPartition并在每个分区上创建/关闭连接

  14. 14

    如何使Apache Spark mapPartition正常工作?

  15. 15

    Apache Spark mapPartition 奇怪的行为(懒惰评估?)

  16. 16

    在Spark中使用日期

  17. 17

    使用Scala在Spark中使用广播变量的正确语法是什么?

  18. 18

    在Scala中使用注释

  19. 19

    在Spark Scala中使用blob元素编写字符串的JSON数组

  20. 20

    如何在Apache Spark上的Scala中使用可变映射?找不到金钥错误

  21. 21

    如何在Spark Streaming中使用无限Scala流作为源?

  22. 22

    如何在Scala Spark文档中使用<A,U> RDD <U>解释方法?

  23. 23

    如何在JSON中使用read.schema仅指定特定字段:SPARK Scala

  24. 24

    在Spark / Scala ML中使用RegexTokenizer()后,StopWords()无法正常工作

  25. 25

    在Spark Scala中使用blob元素编写字符串的JSON数组

  26. 26

    如何在Scala Spark中使用窗口滞后来查找更改

  27. 27

    如何在Scala Spark文档中使用<A,U> RDD <U>解释方法?

  28. 28

    如何在集群环境中使用Spark Scala分发处理以在csv中找到waldos?

  29. 29

    如何在JSON中使用read.schema仅指定特定字段:SPARK Scala

热门标签

归档