我正在使用 Scala 开发 Spark,但我没有任何 Scala 背景。我还没有收到 ValueError,但我正在为我的代码准备 ValueError 处理程序。
|location|arrDate|deptDate|
|JFK |1201 |1209 |
|LAX |1208 |1212 |
|NYC | |1209 |
|22 |1201 |1209 |
|SFO |1202 |1209 |
如果我们有这样的数据,我想将第三行和第四行存储到 Error.dat 中,然后再次处理第五行。在错误日志中,我想放数据的信息,例如哪个文件,行数,错误的详细信息。对于记录器,我现在使用 log4j。
实现该功能的最佳方法是什么?你们能帮我吗?
我假设所有三列都是字符串类型。在这种情况下,我将使用以下代码段解决此问题。我创建了两个 udf 来检查错误记录。
isNumber
]isEmpty
]代码段
import org.apache.spark.sql.functions.row_number
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.udf
val df = rdd.zipWithIndex.map({case ((x,y,z),index) => (index+1,x,y,z)}).toDF("row_num", "c1", "c2", "c3")
val isNumber = udf((x: String) => x.replaceAll("\\d","") == "")
val isEmpty = udf((x: String) => x.trim.length==0)
val errDF = df.filter(isNumber($"c1") || isEmpty($"c2"))
val validDF = df.filter(!(isNumber($"c1") || isEmpty($"c2")))
scala> df.show()
+-------+---+-----+-----+
|row_num| c1| c2| c3|
+-------+---+-----+-----+
| 1|JFK| 1201| 1209|
| 2|LAX| 1208| 1212|
| 3|NYC| | 1209|
| 4| 22| 1201| 1209|
| 5|SFO| 1202| 1209|
+-------+---+-----+-----+
scala> errDF.show()
+-------+---+----+----+
|row_num| c1| c2| c3|
+-------+---+----+----+
| 3|NYC| |1209|
| 4| 22|1201|1209|
+-------+---+----+----+
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句