如何处理来自 Dataframe 的 ValueError 使用 Scala

布莱恩·K。

我正在使用 Scala 开发 Spark,但我没有任何 Scala 背景。我还没有收到 ValueError,但我正在为我的代码准备 ValueError 处理程序。

|location|arrDate|deptDate|
|JFK     |1201   |1209    |
|LAX     |1208   |1212    |
|NYC     |       |1209    |
|22      |1201   |1209    |
|SFO     |1202   |1209    |

如果我们有这样的数据,我想将第三行和第四行存储到 Error.dat 中,然后再次处理第五行。在错误日志中,我想放数据的信息,例如哪个文件,行数,错误的详细信息。对于记录器,我现在使用 log4j。

实现该功能的最佳方法是什么?你们能帮我吗?

流氓

我假设所有三列都是字符串类型。在这种情况下,我将使用以下代码段解决此问题。我创建了两个 udf 来检查错误记录。

  • 如果字段只有数字字符 [ isNumber]
  • 如果字符串字段为空 [ isEmpty]

代码段

 import org.apache.spark.sql.functions.row_number
 import org.apache.spark.sql.expressions.Window
 import org.apache.spark.sql.functions.udf

 val df = rdd.zipWithIndex.map({case ((x,y,z),index) => (index+1,x,y,z)}).toDF("row_num", "c1", "c2", "c3")
 val isNumber = udf((x: String) => x.replaceAll("\\d","") == "")
 val isEmpty = udf((x: String) => x.trim.length==0)
 val errDF = df.filter(isNumber($"c1") || isEmpty($"c2"))
 val validDF = df.filter(!(isNumber($"c1") || isEmpty($"c2")))


scala> df.show()
+-------+---+-----+-----+
|row_num| c1|   c2|   c3|
+-------+---+-----+-----+
|      1|JFK| 1201| 1209|
|      2|LAX| 1208| 1212|
|      3|NYC|     | 1209|
|      4| 22| 1201| 1209|
|      5|SFO| 1202| 1209|
+-------+---+-----+-----+

scala> errDF.show()
+-------+---+----+----+
|row_num| c1|  c2|  c3|
+-------+---+----+----+
|      3|NYC|    |1209|
|      4| 22|1201|1209|
+-------+---+----+----+

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

如何处理ValueError?

来自分类Dev

如何使用spark / scala像show()对dataframe那样显示来自列函数的结果结果

来自分类Dev

如何使用spark / scala像show()对dataframe那样显示来自列函数的结果结果

来自分类Dev

如何使用Databricks的PySpark中Scala中创建的DataFrame

来自分类Dev

scala - 如何使用 concat_ws 连接 DataFrame 的列?

来自分类Dev

如何使用嵌套的案例类模式模拟 Spark Scala DataFrame?

来自分类Dev

如何使用流查询处理来自Kafka的Scala案例类对象?

来自分类Dev

Akka / scala-OversizedPayloadException如何处理?

来自分类Dev

Scala如何处理同伴对象?

来自分类Dev

播放2.2.2(Scala),如何处理HttpServletRequests

来自分类Dev

如何处理sbt,scala,库的版本

来自分类Dev

Scala:如何处理列表中的对象

来自分类Dev

使用Java配置的Spring Security:如何处理来自自定义提供程序的BadCredentialsException

来自分类Dev

使用XSL合并后如何处理来自多个XML文件的所有数据

来自分类Dev

在 Python 中使用 json.loads 时,如何处理来自 CSV 的非 ascii 字符?

来自分类Dev

与python一起使用时,如何处理来自谷歌地图距离矩阵api的输出?

来自分类Dev

来自 DataFrame 的 Spark Scala 动态列选择

来自分类Dev

使用Scala的API替换DataFrame的值

来自分类Dev

Scala-使用“ endsWith”过滤DataFrame

来自分类Dev

使用 DataFrame 类型在 Scala 中定义函数

来自分类Dev

如何处理ValueError:Python中float()的无效文字

来自分类Dev

如何使用Scala在Spark中处理日期?

来自分类Dev

如何使用Scala在Spark DataFrame中将每一行分成多行

来自分类Dev

Spark Scala:如何使用DataFrame.Select在UDF中传递列名

来自分类Dev

如何使用Scala将DataSet传递给在Apache Spark中接受DataFrame作为参数的函数?

来自分类Dev

如何在 Spark 中使用 scala 将 RDD[DataFrame] 中的所有 DataFrame 联合到一个 DataFrame 而不使用 for 循环?

来自分类Dev

Spark DataFrame如何处理大于内存的Pandas DataFrame

来自分类Dev

测试Dataframe内容的真值时如何解决ValueError?Python

来自分类Dev

如何处理来自使用Spring集成的AWS SQS FIFO队列超过10个并发邮件

Related 相关文章

  1. 1

    如何处理ValueError?

  2. 2

    如何使用spark / scala像show()对dataframe那样显示来自列函数的结果结果

  3. 3

    如何使用spark / scala像show()对dataframe那样显示来自列函数的结果结果

  4. 4

    如何使用Databricks的PySpark中Scala中创建的DataFrame

  5. 5

    scala - 如何使用 concat_ws 连接 DataFrame 的列?

  6. 6

    如何使用嵌套的案例类模式模拟 Spark Scala DataFrame?

  7. 7

    如何使用流查询处理来自Kafka的Scala案例类对象?

  8. 8

    Akka / scala-OversizedPayloadException如何处理?

  9. 9

    Scala如何处理同伴对象?

  10. 10

    播放2.2.2(Scala),如何处理HttpServletRequests

  11. 11

    如何处理sbt,scala,库的版本

  12. 12

    Scala:如何处理列表中的对象

  13. 13

    使用Java配置的Spring Security:如何处理来自自定义提供程序的BadCredentialsException

  14. 14

    使用XSL合并后如何处理来自多个XML文件的所有数据

  15. 15

    在 Python 中使用 json.loads 时,如何处理来自 CSV 的非 ascii 字符?

  16. 16

    与python一起使用时,如何处理来自谷歌地图距离矩阵api的输出?

  17. 17

    来自 DataFrame 的 Spark Scala 动态列选择

  18. 18

    使用Scala的API替换DataFrame的值

  19. 19

    Scala-使用“ endsWith”过滤DataFrame

  20. 20

    使用 DataFrame 类型在 Scala 中定义函数

  21. 21

    如何处理ValueError:Python中float()的无效文字

  22. 22

    如何使用Scala在Spark中处理日期?

  23. 23

    如何使用Scala在Spark DataFrame中将每一行分成多行

  24. 24

    Spark Scala:如何使用DataFrame.Select在UDF中传递列名

  25. 25

    如何使用Scala将DataSet传递给在Apache Spark中接受DataFrame作为参数的函数?

  26. 26

    如何在 Spark 中使用 scala 将 RDD[DataFrame] 中的所有 DataFrame 联合到一个 DataFrame 而不使用 for 循环?

  27. 27

    Spark DataFrame如何处理大于内存的Pandas DataFrame

  28. 28

    测试Dataframe内容的真值时如何解决ValueError?Python

  29. 29

    如何处理来自使用Spring集成的AWS SQS FIFO队列超过10个并发邮件

热门标签

归档