如何在Spark SQL Java中将CSV类型字符串转换为数据帧?

黄若瑟

我使用Spark结构化的流API制作Spark Java客户端代码。这些代码从kafka中提取csv类型字符串

SparkSession spark = SparkSession.builder().master("local[*]").appName("KafkaMongoStream").getOrCreate();
        
Dataset<Row> df = spark.read().format("kafka").option("kafka.bootstrap.servers", "localhost:9092"))
            .option("subscribe", "topicForMongoDB")
            .option("startingOffsets", "earliest")
            .load()
            .selectExpr("CAST(value AS STRING)");
            
df.show();

返回的结果成功。这些代码打印csv类型字符串。

+--------------------+
|               value|
+--------------------+
|realtime_start,re...|
|2021-01-27,2021-0...|
|2021-01-27,2021-0...|
|2021-01-27,2021-0...|
|2021-01-27,2021-0...|
|2021-01-27,2021-0...|

然后,我尝试将这些字符串转换为spark sql中的spark数据框。首先,以下代码是java pojo类

public class EntityMongoDB implements Serializable {

    private Date date;
    private float value;
    private String id;
    private String title;
    private String state;
    private String frequency_short;
    private String units_short;
    private String seasonal_adjustment_short;
    
    private static StructType structType = DataTypes.createStructType(new StructField[] {
              
              DataTypes.createStructField("date", DataTypes.DateType, false),
              DataTypes.createStructField("value", DataTypes.FloatType, false),
              DataTypes.createStructField("id", DataTypes.StringType, false),
              DataTypes.createStructField("title", DataTypes.StringType, false),
              DataTypes.createStructField("state", DataTypes.StringType, false),
              DataTypes.createStructField("frequency_short", DataTypes.StringType, false),
              DataTypes.createStructField("units_short", DataTypes.StringType, false),
              DataTypes.createStructField("seasonal_adjustment_short", DataTypes.StringType, false)
    });
    
    public static StructType getStructType() {
        return structType;
    }
}

我编写代码将这些csv类型的字符串转换为数据帧

Dataset<Row> dfs = df.select(from_json(col("value"), EntityMongoDB.getStructType())
        .as("entityMongoDB"))
        .selectExpr("entityMongoDB.date", "entityMongoDB.value", "entityMongoDB.id", 
                "entityMongoDB.title", "entityMongoDB.state", "entityMongoDB.frequency_short", 
                "entityMongoDB.units_short", "entityMongoDB.seasonal_adjustment_short").toDF();

dfs.show();
dfs.printSchema();

打印的架构是正确的。

 |-- date: date (nullable = true)
 |-- value: float (nullable = true)
 |-- id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- state: string (nullable = true)
 |-- frequency_short: string (nullable = true)
 |-- units_short: string (nullable = true)
 |-- seasonal_adjustment_short: string (nullable = true)

但是生成的列充满了空值

+----+-----+----+-----+-----+---------------+-----------+-------------------------+
|date|value|  id|title|state|frequency_short|units_short|seasonal_adjustment_short|
+----+-----+----+-----+-----+---------------+-----------+-------------------------+
|null| null|null| null| null|           null|       null|                     null|
|null| null|null| null| null|           null|       null|                     null|
|null| null|null| null| null|           null|       null|                     null|
|null| null|null| null| null|           null|       null|                     null|
|null| null|null| null| null|           null|       null|                     null|

我认为dataframe的架构生成正确,但是提取数据部分存在一些问题。任何答复将是感激的。最好的祝福

黑主教

您在该value列中使用的字符串不是有效的JSON,因此from_json在这里不起作用。

对于Spark 3+,您可以使用from_csv@mck的注释中指出的内容:

Dataset<Row> dfs = df.select(from_csv(col("value"), EntityMongoDB.getStructType())
        .as("entityMongoDB"))
        .selectExpr("entityMongoDB.*").toDF(); 

对于3之前的Spark版本,您可以split使用逗号将值,然后将结果数组转换成多列:

Dataset<Row> dfs = df.select(split(col("value"), ",").as("values"))
        .select(IntStream.range(0, 7).map(i -> col("values").getItem(i)).toArray())
        .toDF("date", "value", "id", "title", "state", "frequency_short", "units_short", "seasonal_adjustment_short"); 

另外,似乎您在值中包含列名,您可以过滤掉该行。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

在spark中将字符串名称转换为sql数据类型

来自分类Dev

如何在 PL/SQL 中将 CSV 字符串转换为多列结果?

来自分类Dev

如何在python中将字符串数据帧转换为csv?

来自分类Dev

如何在Spark中将数据框的列类型从字符串转换为(数组和结构)

来自分类Dev

如何在SQL Server 2008中将加密的数据转换为字符串并解密数据

来自分类Dev

如何在SQL Server 2008中将加密的数据转换为字符串并解密数据

来自分类Dev

如何在Java中将java.sql.blob转换为base64编码的字符串

来自分类Dev

如何在Java中将java.sql.blob转换为base64编码的字符串

来自分类Dev

如何在SQL Server 2008中将字符串转换为整数?

来自分类Dev

如何在包含GROUP BY的SQL查询中将月份从数字转换为字符串形式?

来自分类Dev

如何在SQL Server中将XML字符串值转换为日期时间

来自分类Dev

如何在SQL中将字符串YYYYMM转换为datetime?

来自分类Dev

如何在Hive SQL中将数组转换为字符串?

来自分类Dev

如何在Databricks SQL中将字符串转换为日期

来自分类Dev

如何在SQL Server中将字符串转换为日期时间

来自分类Dev

如何在SQL Server中将表的整数行转换为逗号分隔的字符串

来自分类Dev

如何在SQL Server 2008中将字符串转换为整数?

来自分类Dev

如何在 SQL Server 中将字符串从 dd mmm yyyy 格式转换为日期

来自分类Dev

如何在Spark上将JSON字符串转换为数据帧

来自分类Dev

如何在熊猫中将列的数据类型从字符串转换为列表?

来自分类Dev

如何在Postgres中将字符串类型数据转换为日期格式?

来自分类Dev

在SQL Server中将图像数据类型转换为字符串

来自分类Dev

如何在Java中将char []转换为字符串?

来自分类Dev

如何在Java中将字符串转换为身份

来自分类Dev

如何在SQL Server中将字符串日期转换为另一种日期格式?

来自分类Dev

如何在SQL Server 2008中将字符串日期转换为有效的日期格式?

来自分类Dev

在Spark Dataframe中将字符串数据类型列转换为MapType

来自分类Dev

如何在Java或android中将URL中存在的JSON数据转换为JSON字符串?

来自分类Dev

如何在Java中将RFID标签数据(Askii)转换为字符串

Related 相关文章

  1. 1

    在spark中将字符串名称转换为sql数据类型

  2. 2

    如何在 PL/SQL 中将 CSV 字符串转换为多列结果?

  3. 3

    如何在python中将字符串数据帧转换为csv?

  4. 4

    如何在Spark中将数据框的列类型从字符串转换为(数组和结构)

  5. 5

    如何在SQL Server 2008中将加密的数据转换为字符串并解密数据

  6. 6

    如何在SQL Server 2008中将加密的数据转换为字符串并解密数据

  7. 7

    如何在Java中将java.sql.blob转换为base64编码的字符串

  8. 8

    如何在Java中将java.sql.blob转换为base64编码的字符串

  9. 9

    如何在SQL Server 2008中将字符串转换为整数?

  10. 10

    如何在包含GROUP BY的SQL查询中将月份从数字转换为字符串形式?

  11. 11

    如何在SQL Server中将XML字符串值转换为日期时间

  12. 12

    如何在SQL中将字符串YYYYMM转换为datetime?

  13. 13

    如何在Hive SQL中将数组转换为字符串?

  14. 14

    如何在Databricks SQL中将字符串转换为日期

  15. 15

    如何在SQL Server中将字符串转换为日期时间

  16. 16

    如何在SQL Server中将表的整数行转换为逗号分隔的字符串

  17. 17

    如何在SQL Server 2008中将字符串转换为整数?

  18. 18

    如何在 SQL Server 中将字符串从 dd mmm yyyy 格式转换为日期

  19. 19

    如何在Spark上将JSON字符串转换为数据帧

  20. 20

    如何在熊猫中将列的数据类型从字符串转换为列表?

  21. 21

    如何在Postgres中将字符串类型数据转换为日期格式?

  22. 22

    在SQL Server中将图像数据类型转换为字符串

  23. 23

    如何在Java中将char []转换为字符串?

  24. 24

    如何在Java中将字符串转换为身份

  25. 25

    如何在SQL Server中将字符串日期转换为另一种日期格式?

  26. 26

    如何在SQL Server 2008中将字符串日期转换为有效的日期格式?

  27. 27

    在Spark Dataframe中将字符串数据类型列转换为MapType

  28. 28

    如何在Java或android中将URL中存在的JSON数据转换为JSON字符串?

  29. 29

    如何在Java中将RFID标签数据(Askii)转换为字符串

热门标签

归档