在Spark Scala中从rdd到Schema推断架构

巴布

这个问题来自(Spark-以编程方式创建具有不同数据类型的模式)的参考

我正在尝试从rdd到Dataframe推断架构,下面是我的代码

 def inferType(field: String) = field.split(":")(1) match {
    case "Integer" => IntegerType
    case "Double" => DoubleType
    case "String" => StringType
    case "Timestamp" => TimestampType
    case "Date" => DateType
    case "Long" => LongType
    case _ => StringType
 }


val header = c1:String|c2:String|c3:Double|c4:Integer|c5:String|c6:Timestamp|c7:Long|c8:Date

val df1 = Seq(("a|b|44.44|5|c|2018-01-01 01:00:00|456|2018-01-01")).toDF("data")
val rdd1 = df1.rdd.map(x => Row(x.getString(0).split("\\|"): _*))

val schema = StructType(header.split("\\|").map(column => StructField(column.split(":")(0), inferType(column), true)))
val df = spark.createDataFrame(rdd1, schema)
df.show()

当我表演节目时,抛出以下错误。我必须在较大规模的数据上执行此操作,并且在找到正确的解决方案时遇到了麻烦,请问有人可以帮助我找到用于此目的或其他任何方式的解决方案,我可以在其中实现此目的。

java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of int

提前致谢

鲍勃

简短答案:不能使用自定义类型/格式指定字符串/文本。

您想要做的是将字符串解析为sql列。与其他示例的区别在于,您正在尝试从csv加载。工作版本可以这样实现:

// skipped other details such as schematype, spark session...

val header = "c1:String|c2:String|c3:Double|c4:Integer"

// Create `Row` from `Seq`
val row = Row.fromSeq(Seq("a|b|44.44|12|"))

// Create `RDD` from `Row`
val rdd: RDD[Row] = spark.sparkContext
  .makeRDD(List(row))
  .map { row =>
    row.getString(0).split("\\|") match {
      case Array(col1, col2, col3, col4) =>
        Row.fromTuple(col1, col2, col3.toDouble, col4.toInt)
    }
  }
val stt: StructType = StructType(
  header
    .split("\\|")
    .map(column => StructField(column, inferType(column), true))
)

val dataFrame = spark.createDataFrame(rdd, stt)
dataFrame.show()

从Scala类型创建Row的原因是Row在此处引入了兼容类型或受尊重的类型。
注意我跳过了与日期和时间相关的字段,所以日期转换很棘手。你可以检查我的另一个答案如何使用格式化的日期和时间戳这里

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

在Scala Spark中嵌套RDD

来自分类Dev

Spark-Scala RDD

来自分类Dev

将读取文件的架构存储到Spark Scala中的csv文件中

来自分类Dev

根据SPARK scala中的条件处理RDD

来自分类Dev

Scala Spark中的RDD过滤器

来自分类Dev

使用scala在spark中创建对RDD

来自分类Dev

RDD scala spark中的全外连接

来自分类Dev

模式匹配-Spark Scala RDD

来自分类Dev

模式匹配-Spark Scala RDD

来自分类Dev

使用Scala在Spark DataFrame中从JSON重用架构

来自分类Dev

Spark Scala: retrieve the schema and store it

来自分类Dev

如何在Scala Spark中对RDD进行排序?

来自分类Dev

在Scala Spark中未调用RDD的Map函数

来自分类常见问题

如何在Scala的Spark RDD中避免使用collect?

来自分类Dev

Spark&Scala-无法从RDD中过滤空值

来自分类Dev

使用Scala在Apache Spark中连接不同RDD的数据集

来自分类Dev

Spark:Scala RDD中的组Concat等效项

来自分类Dev

如何在Scala的Spark RDD中避免使用collect?

来自分类Dev

在rdd spark scala中split()之后如何过滤?

来自分类Dev

Spark Scala中的Array [RDD [(String,Set [String])]]转换

来自分类Dev

Spark,Scala:如何根据键对减去RDD对中的值?

来自分类Dev

在apache-spark scala中访问Array RDD的特定元素

来自分类Dev

如何在 Spark (Scala) 中组合两个 RDD?

来自分类Dev

在 Scala-Spark1.5.2 中递归过滤 RDD

来自分类Dev

如何使用 spark-scala 删除 rdd 中的 unicode?

来自分类Dev

从Spark RDD中删除元素

来自分类Dev

从Spark RDD中删除元素

来自分类Dev

在Spark Scala中旋转

来自分类Dev

在Spark Scala中合并