这个问题来自(Spark-以编程方式创建具有不同数据类型的模式)的参考
我正在尝试从rdd到Dataframe推断架构,下面是我的代码
def inferType(field: String) = field.split(":")(1) match {
case "Integer" => IntegerType
case "Double" => DoubleType
case "String" => StringType
case "Timestamp" => TimestampType
case "Date" => DateType
case "Long" => LongType
case _ => StringType
}
val header = c1:String|c2:String|c3:Double|c4:Integer|c5:String|c6:Timestamp|c7:Long|c8:Date
val df1 = Seq(("a|b|44.44|5|c|2018-01-01 01:00:00|456|2018-01-01")).toDF("data")
val rdd1 = df1.rdd.map(x => Row(x.getString(0).split("\\|"): _*))
val schema = StructType(header.split("\\|").map(column => StructField(column.split(":")(0), inferType(column), true)))
val df = spark.createDataFrame(rdd1, schema)
df.show()
当我表演节目时,抛出以下错误。我必须在较大规模的数据上执行此操作,并且在找到正确的解决方案时遇到了麻烦,请问有人可以帮助我找到用于此目的或其他任何方式的解决方案,我可以在其中实现此目的。
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of int
提前致谢
简短答案:不能使用自定义类型/格式指定字符串/文本。
您想要做的是将字符串解析为sql列。与其他示例的区别在于,您正在尝试从csv加载。工作版本可以这样实现:
// skipped other details such as schematype, spark session...
val header = "c1:String|c2:String|c3:Double|c4:Integer"
// Create `Row` from `Seq`
val row = Row.fromSeq(Seq("a|b|44.44|12|"))
// Create `RDD` from `Row`
val rdd: RDD[Row] = spark.sparkContext
.makeRDD(List(row))
.map { row =>
row.getString(0).split("\\|") match {
case Array(col1, col2, col3, col4) =>
Row.fromTuple(col1, col2, col3.toDouble, col4.toInt)
}
}
val stt: StructType = StructType(
header
.split("\\|")
.map(column => StructField(column, inferType(column), true))
)
val dataFrame = spark.createDataFrame(rdd, stt)
dataFrame.show()
从Scala类型创建Row的原因是Row
在此处引入了兼容类型或受尊重的类型。
注意我跳过了与日期和时间相关的字段,所以日期转换很棘手。你可以检查我的另一个答案如何使用格式化的日期和时间戳这里
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句