我有一张镶木地板,其中一列是
,数组<struct <col1,col2,.. colN >>
可以使用LATERAL VIEW语法在Hive中对此表运行查询。
如何将此表读入RDD,更重要的是如何在Spark中过滤,映射此嵌套集合等?
在Spark文档中找不到对此的任何引用。在此先感谢您提供任何信息!
ps。我觉得在表格上提供一些数据可能会有所帮助。主表中的列数约为600。行数〜200m。嵌套集合中的“列”数约为10。嵌套集合中的平均记录数约为35。
在嵌套集合的情况下没有魔术。Spark将以与aRDD[(String, String)]
和a相同的方式处理RDD[(String, Seq[String])]
。
但是,从Parquet文件中读取此类嵌套集合可能很棘手。
让我们以spark-shell
(1.3.1)为例:
scala> import sqlContext.implicits._
import sqlContext.implicits._
scala> case class Inner(a: String, b: String)
defined class Inner
scala> case class Outer(key: String, inners: Seq[Inner])
defined class Outer
编写实木复合地板文件:
scala> val outers = sc.parallelize(List(Outer("k1", List(Inner("a", "b")))))
outers: org.apache.spark.rdd.RDD[Outer] = ParallelCollectionRDD[0] at parallelize at <console>:25
scala> outers.toDF.saveAsParquetFile("outers.parquet")
读取实木复合地板文件:
scala> import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.expressions.Row
scala> val dataFrame = sqlContext.parquetFile("outers.parquet")
dataFrame: org.apache.spark.sql.DataFrame = [key: string, inners: array<struct<a:string,b:string>>]
scala> val outers = dataFrame.map { row =>
| val key = row.getString(0)
| val inners = row.getAs[Seq[Row]](1).map(r => Inner(r.getString(0), r.getString(1)))
| Outer(key, inners)
| }
outers: org.apache.spark.rdd.RDD[Outer] = MapPartitionsRDD[8] at map at DataFrame.scala:848
重要的是row.getAs[Seq[Row]](1)
。struct
is的嵌套序列的内部表示形式ArrayBuffer[Row]
,可以使用它的任何超类型代替Seq[Row]
。的1
是外部行中的列索引。我在getAs
这里使用了该方法,但最新版本的Spark中有替代方法。请参阅Row特质的源代码。
现在有了RDD[Outer]
,您可以应用任何所需的转换或操作。
// Filter the outers
outers.filter(_.inners.nonEmpty)
// Filter the inners
outers.map(outer => outer.copy(inners = outer.inners.filter(_.a == "a")))
请注意,我们仅使用spark-SQL库读取镶木地板文件。例如,您可以直接在DataFrame上选择所需的列,然后再将其映射到RDD。
dataFrame.select('col1, 'col2).map { row => ... }
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句