Spark Dataframe基于动态选择的列提取列

酷鹅

输入数据框的架构

- employeeKey (int)  
- employeeTypeId (string) 
- loginDate (string)
- employeeDetailsJson (string)
{"Grade":"100","ValidTill":"2021-12-01","Supervisor":"Alex","Vendor":"technicia","HourlyRate":29}

对于彼尔姆员工,某些属性可用,而有些则不可用。与签约员工相同。

因此,寻找一种有效的方法来仅基于选定的列来构建数据框,而不是转换所有列并选择我需要的列。

另外请告知这是基于键从json字符串中提取值的最佳方法。由于字符串中的属性是动态的,因此无法基于它构建StructSchema。所以用好旧的get_json_object

(火花2.45,将来会使用spark 3)

  val dfSelectColumns=List("Employee-Key", "Employee-Type","Login-Date","cont.Vendor-Name","cont.Hourly-Rate" )

//val dfSelectColumns=List("Employee-Key", "Employee-Type","Login-Date","perm.Level","perm-Validity","perm.Supervisor" )

 val resultDF = inputDF.get
        .withColumn("Employee-Key", col("employeeKey"))
        .withColumn("Employee-Type", when(col("employeeTypeId") === 1, "Permanent")
          .when(col("employeeTypeId") === 2, "Contractor")
          .otherwise("unknown"))  
        .withColumn("Login-Date", to_utc_timestamp(to_timestamp(col("loginDate"), "yyyy-MM-dd'T'HH:mm:ss"), ""America/Chicago""))
        .withColumn("perm.Level", get_json_object(col("employeeDetailsJson"), "$.Grade"))
        .withColumn("perm.Validity", get_json_object(col("employeeDetailsJson"), "$.ValidTill"))
        .withColumn("perm.SuperVisor", get_json_object(col("employeeDetailsJson"), "$.Supervisor"))
        .withColumn("cont.Vendor-Name", get_json_object(col("employeeDetailsJson"), "$.Vendor"))
        .withColumn("cont.Hourly-Rate", get_json_object(col("employeeDetailsJson"), "$.HourlyRate"))
        .select(dfSelectColumns.head, dfSelectColumns.tail: _*)

空中黄油

我看到您有2个模式,一个用于永久模式,另一个用于承包商。您可以有2个架构。

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

val schemaBase = new StructType().add("Employee-Key", IntegerType).add("Employee-Type", StringType).add("Login-Date", DateType)
val schemaPerm = schemaBase.add("Level", IntegerType).add("Validity", StringType)// Permanent attributes
val schemaCont = schemaBase.add("Vendor", StringType).add("HourlyRate", DoubleType)  // Contractor attributes

然后,您可以使用2个架构将数据加载到数据框中。
对于永久雇员:

val jsonPermDf = Seq( // Construct sample dataframe
  (2, """{"Employee-Key":2, "Employee-Type":"Permanent", "Login-Date":"2021-11-01", "Level":3, "Validity":"ok"}""")
  , (3, """{"Employee-Key":3, "Employee-Type":"Permanent", "Login-Date":"2020-10-01", "Level":2, "Validity":"ok-yes"}""")
).toDF("key", "raw_json")

val permDf = jsonPermDf.withColumn("data", from_json(col("raw_json"),schemaPerm)).select($"data.*")
permDf.show()

对于承包商:

val jsonContDf = Seq(  // Construct sample dataframe
  (1, """{"Employee-Key":1, "Employee-Type":"Contractor", "Login-Date":"2021-12-01", "Vendor":"technicia", "HourlyRate":29}""")
  , (4, """{"Employee-Key":4, "Employee-Type":"Contractor", "Login-Date":"2019-09-01", "Vendor":"Minis", "HourlyRate":35}""")
).toDF("key", "raw_json")

val contDf = jsonContDf.withColumn("data", from_json(col("raw_json"),schemaCont)).select($"data.*")
contDf.show()

这是永久的结果数据:

+------------+-------------+----------+-----+--------+
|Employee-Key|Employee-Type|Login-Date|Level|Validity|
+------------+-------------+----------+-----+--------+
|           2|    Permanent|2021-11-01|    3|      ok|
|           3|    Permanent|2020-10-01|    2|  ok-yes|
+------------+-------------+----------+-----+--------+

这是承包商的结果数据框:

+------------+-------------+----------+---------+----------+
|Employee-Key|Employee-Type|Login-Date|   Vendor|HourlyRate|
+------------+-------------+----------+---------+----------+
|           1|   Contractor|2021-12-01|technicia|      29.0|
|           4|   Contractor|2019-09-01|    Minis|      35.0|
+------------+-------------+----------+---------+----------+

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

来自 DataFrame 的 Spark Scala 动态列选择

来自分类Dev

基于Spark中列值的动态regexp_extract

来自分类Dev

从 Spark 列中提取单词

来自分类常见问题

在Apache Spark中将Dataframe的列值提取为列表

来自分类Dev

Scala Spark->从DataFrame中选择前15列

来自分类Dev

Spark dataframe 列内容修改

来自分类Dev

如何基于Spark Scala中的列dtypes返回DataFrame列的子集

来自分类Dev

如何在Spark DataFrame中基于B列获取A列的5条记录

来自分类Dev

带有Spark的Elasticsearch,基于数据框列的动态索引创建

来自分类Dev

Spark:将 UDF 应用于 Dataframe 基于 DF 中的值生成新列

来自分类Dev

根据Spark中的列值选择文字

来自分类Dev

选择列RDD scala-spark

来自分类Dev

提取Spark Dataframe中已解码列的特定结构字段

来自分类Dev

如何将时间戳的小时部分提取到DataFrame Spark中的新列

来自分类Dev

基于 Spark 中 2 列组合的新列

来自分类Dev

Scala + Spark + Dataframe异常当我尝试动态转换列并分配排序顺序时

来自分类Dev

Spark DataFrame 不允许在 group by 中具有窗口函数的多个动态列列表

来自分类Dev

Spark DataFrame将多行转换为列

来自分类Dev

Spark DataFrame使列空值变为空

来自分类Dev

Spark:保存由“虚拟”列划分的DataFrame

来自分类Dev

Spark DataFrame从子查询添加列

来自分类Dev

遍历C#中的spark dataframe列

来自分类Dev

Spark Dataframe中多列的每行排名

来自分类Dev

Spark Hive:无法检索 DataFrame 的列

来自分类Dev

Spark:将 DataFrame 列转换为向量

来自分类Dev

在 Spark Scala 中转置 DataFrame 列

来自分类Dev

Spark DataFrame选择空值

来自分类Dev

在Spark SQL中连接两个DataFrame并选择仅一个的列

来自分类Dev

从Spark Scala DataFrame中选择名称包含特定字符串的列

Related 相关文章

  1. 1

    来自 DataFrame 的 Spark Scala 动态列选择

  2. 2

    基于Spark中列值的动态regexp_extract

  3. 3

    从 Spark 列中提取单词

  4. 4

    在Apache Spark中将Dataframe的列值提取为列表

  5. 5

    Scala Spark->从DataFrame中选择前15列

  6. 6

    Spark dataframe 列内容修改

  7. 7

    如何基于Spark Scala中的列dtypes返回DataFrame列的子集

  8. 8

    如何在Spark DataFrame中基于B列获取A列的5条记录

  9. 9

    带有Spark的Elasticsearch,基于数据框列的动态索引创建

  10. 10

    Spark:将 UDF 应用于 Dataframe 基于 DF 中的值生成新列

  11. 11

    根据Spark中的列值选择文字

  12. 12

    选择列RDD scala-spark

  13. 13

    提取Spark Dataframe中已解码列的特定结构字段

  14. 14

    如何将时间戳的小时部分提取到DataFrame Spark中的新列

  15. 15

    基于 Spark 中 2 列组合的新列

  16. 16

    Scala + Spark + Dataframe异常当我尝试动态转换列并分配排序顺序时

  17. 17

    Spark DataFrame 不允许在 group by 中具有窗口函数的多个动态列列表

  18. 18

    Spark DataFrame将多行转换为列

  19. 19

    Spark DataFrame使列空值变为空

  20. 20

    Spark:保存由“虚拟”列划分的DataFrame

  21. 21

    Spark DataFrame从子查询添加列

  22. 22

    遍历C#中的spark dataframe列

  23. 23

    Spark Dataframe中多列的每行排名

  24. 24

    Spark Hive:无法检索 DataFrame 的列

  25. 25

    Spark:将 DataFrame 列转换为向量

  26. 26

    在 Spark Scala 中转置 DataFrame 列

  27. 27

    Spark DataFrame选择空值

  28. 28

    在Spark SQL中连接两个DataFrame并选择仅一个的列

  29. 29

    从Spark Scala DataFrame中选择名称包含特定字符串的列

热门标签

归档