如何在Spark中将结构化数据映射到schemaRDD?

肖恩

我之前问过这个问题的方式有所不同,但是有一些变化,所以我想作为一个新问题再次提出。我有一个结构化的数据,其中只有一部分是json格式,但我需要将整个数据映射到schemaRDD。数据如下所示:

03052015 04:13:20 {“ recordType”:“ NEW”,“ data”:{“ keycol”:“ val1”,“ col2”:“ val2”,“ col3”:“ val3”}

每行以日期开头,后跟时间和json格式的文本。我不仅需要将json中的文本映射,还需要将日期和时间映射到相同的结构中。

我在Python中尝试过,但显然行不通,因为Row不接受RDD(在这种情况下为jsonRDD)。

    from pyspark.sql import SQLContext, Row
    sqlContext = SQLContext(sc)
    orderFile  = sc.textFile(myfile)
    orderLine  = orderFile.map(lambda line: line.split(" ", 2))
    anotherOrderLine = orderLine.map(lambda p: Row(date=p[0], time=p[1], content=sqlContext.jsonRDD(p[3])))
    schemaOrder = sqlContext.inferSchema(anotherOrderLine)
    schemaOrder.printSchema()
    for x in schemaOrder.collect():
        print x

目标是能够针对schemaRDD运行如下查询:

select date, time, data.keycol, data.val1, data.val2, data.val3 from myOrder

如何将整行映射到schemaRDD?

任何帮助表示赞赏吗?

0x0FFF

最简单的选择是将此字段添加到JSON并使用jsonRDD

我的资料:

03052015 04:13:20 {"recordType":"NEW","data":{"keycol":"val1","col1":"val5","col2":"val3"}}
03062015 04:13:20 {"recordType":"NEW1","data":{"keycol":"val2","col1":"val6","col2":"val3"}}
03072015 04:13:20 {"recordType":"NEW2","data":{"keycol":"val3","col1":"val7","col2":"val3"}}
03082015 04:13:20 {"recordType":"NEW3","data":{"keycol":"val4","col1":"val8","col2":"val3"}}

代码:

import json

def transform(data):
    ts  = data[:18].strip()
    jss = data[18:].strip()
    jsj = json.loads(jss)
    jsj['ts'] = ts
    return json.dumps(jsj)

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
rdd = sc.textFile('/sparkdemo/sample.data')
tbl = sqlContext.jsonRDD(rdd.map(transform))
tbl.registerTempTable("myOrder")

sqlContext.sql("select ts, recordType, data.keycol, data.col1, data.col2 data from myOrder").collect()

结果:

[Row(ts=u'03052015 04:13:20', recordType=u'NEW', keycol=u'val1', col1=u'val5', data=u'val3'), Row(ts=u'03062015 04:13:20', recordType=u'NEW1', keycol=u'val2', col1=u'val6', data=u'val3'), Row(ts=u'03072015 04:13:20', recordType=u'NEW2', keycol=u'val3', col1=u'val7', data=u'val3'), Row(ts=u'03082015 04:13:20', recordType=u'NEW3', keycol=u'val4', col1=u'val8', data=u'val3')]

在您的代码中,存在一个问题,您正在为每行调用jsonRDD,这是不正确的-它接受RDD并返回SchemaRDD。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

如何在Spark中将结构化数据映射到schemaRDD?

来自分类Dev

如何在 Python 中将 JSON 结构化数据写入文本文件?

来自分类Dev

如何在C ++中将结构映射到char适当的方法

来自分类Dev

如何删除Spark结构化流创建的旧数据?

来自分类Dev

如何在Moodle中将数据根映射到Azure存储

来自分类Dev

在Spark结构化流中将数据内部联接到左联接的DataFrame时丢失条目

来自分类Dev

如何在Spark结构化流中基于时间戳字段重复数据删除并保持最新?

来自分类Dev

如何在cython中将C结构从C结构映射到int?

来自分类Dev

如何在Rust中将一个结构映射到另一个结构?

来自分类Dev

Knockoutjs映射和非结构化数据

来自分类Dev

Knockoutjs映射和非结构化数据

来自分类Dev

如何将静态数据帧与Spark结构化流中的流数据进行比较?

来自分类Dev

如何在Spring MVC中将jsp视图中的复杂结构映射到模型对象

来自分类Dev

如何在PLC的结构化文本中将ASCII值写入字符串?

来自分类Dev

如何在Postgres中将JavaScript对象转换为结构化jsonb?

来自分类Dev

如何在Rust中将C / C ++ void *取消引用以进行结构化或回调?

来自分类Dev

如何在PLC的结构化文本中将ASCII值写入字符串?

来自分类Dev

如何在Kafka Direct Stream中使用Spark结构化流?

来自分类Dev

如何在(Py)Spark结构化流中捕获不正确的(损坏的)JSON记录?

来自分类Dev

如何在 Marshmallow 中反序列化不同的结构化 JSON 数据?

来自分类Dev

在Spark RDD中将Cassandra行映射到参数化类型

来自分类Dev

在Spark RDD中将Cassandra行映射到参数化类型

来自分类Dev

如何将这些非结构化数据转化为结构化数据?

来自分类Dev

Apache Spark:使用结构化数据是否好

来自分类Dev

从Spark结构化流以JSON数组形式写入数据

来自分类Dev

如何在ggvis中将数值映射到映射的填充属性?

来自分类Dev

如何在数组中添加结构化数据模式

来自分类Dev

如何在C ++中加载数据库以进行结构化?

来自分类Dev

如何在非结构化数据中的特定字符串之前提取日期?

Related 相关文章

  1. 1

    如何在Spark中将结构化数据映射到schemaRDD?

  2. 2

    如何在 Python 中将 JSON 结构化数据写入文本文件?

  3. 3

    如何在C ++中将结构映射到char适当的方法

  4. 4

    如何删除Spark结构化流创建的旧数据?

  5. 5

    如何在Moodle中将数据根映射到Azure存储

  6. 6

    在Spark结构化流中将数据内部联接到左联接的DataFrame时丢失条目

  7. 7

    如何在Spark结构化流中基于时间戳字段重复数据删除并保持最新?

  8. 8

    如何在cython中将C结构从C结构映射到int?

  9. 9

    如何在Rust中将一个结构映射到另一个结构?

  10. 10

    Knockoutjs映射和非结构化数据

  11. 11

    Knockoutjs映射和非结构化数据

  12. 12

    如何将静态数据帧与Spark结构化流中的流数据进行比较?

  13. 13

    如何在Spring MVC中将jsp视图中的复杂结构映射到模型对象

  14. 14

    如何在PLC的结构化文本中将ASCII值写入字符串?

  15. 15

    如何在Postgres中将JavaScript对象转换为结构化jsonb?

  16. 16

    如何在Rust中将C / C ++ void *取消引用以进行结构化或回调?

  17. 17

    如何在PLC的结构化文本中将ASCII值写入字符串?

  18. 18

    如何在Kafka Direct Stream中使用Spark结构化流?

  19. 19

    如何在(Py)Spark结构化流中捕获不正确的(损坏的)JSON记录?

  20. 20

    如何在 Marshmallow 中反序列化不同的结构化 JSON 数据?

  21. 21

    在Spark RDD中将Cassandra行映射到参数化类型

  22. 22

    在Spark RDD中将Cassandra行映射到参数化类型

  23. 23

    如何将这些非结构化数据转化为结构化数据?

  24. 24

    Apache Spark:使用结构化数据是否好

  25. 25

    从Spark结构化流以JSON数组形式写入数据

  26. 26

    如何在ggvis中将数值映射到映射的填充属性?

  27. 27

    如何在数组中添加结构化数据模式

  28. 28

    如何在C ++中加载数据库以进行结构化?

  29. 29

    如何在非结构化数据中的特定字符串之前提取日期?

热门标签

归档