我在Scala中使用Spark。
我有一个包含3列的数据框:ID,时间,RawHexdata。我有一个用户定义的函数,该函数接受RawHexData并将其扩展为X个以上的列。重要的是要声明,对于每行X都是相同的(列不变化)。但是,在接收第一个数据之前,我不知道这些列是什么。但是一旦有了头,我就可以推断出来。
我想要第二个带有所述列的数据框:Id,Time,RawHexData,NewCol1,...,NewCol3。
我可以想到的“最简单”的方法是:1.将每行反序列化为json(每个数据类型都可以在此处序列化)2.添加我的新列,3.从更改后的json反序列化新的数据帧,
但是,这似乎很浪费,因为它涉及2个昂贵且冗余的json序列化步骤。我正在寻找一种更清洁的图案。
使用案例类似乎是个坏主意,因为我不知道列数,也不知道列名。
动态扩展您可以做的DataFrame
是对RDD行进行操作,您可以通过调用获得该行dataFrame.rdd
。有了Row
实例,您可以访问该RawHexdata
列并解析包含的数据。通过将新解析的列添加到结果中Row
,您几乎解决了您的问题。将a转换RDD[Row]
回a的唯一必要步骤DataFrame
就是为新列生成模式数据。您可以通过RawHexdata
在驱动程序上收集单个值,然后提取列类型来做到这一点。
以下代码说明了这种方法。
object App {
case class Person(name: String, age: Int)
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Test").setMaster("local[4]")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val input = sc.parallelize(Seq(Person("a", 1), Person("b", 2)))
val dataFrame = input.df
dataFrame.show()
// create the extended rows RDD
val rowRDD = dataFrame.rdd.map{
row =>
val blob = row(1).asInstanceOf[Int]
val newColumns: Seq[Any] = Seq(blob, blob * 2, blob * 3)
Row.fromSeq(row.toSeq.init ++ newColumns)
}
val schema = dataFrame.schema
// we know that the new columns are all integers
val newColumns = StructType{
Seq(new StructField("1", IntegerType), new StructField("2", IntegerType), new StructField("3", IntegerType))
}
val newSchema = StructType(schema.init ++ newColumns)
val newDataFrame = sqlContext.createDataFrame(rowRDD, newSchema)
newDataFrame.show()
}
}
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句