이전에이 질문을 다르게했지만 약간의 변화가있어서 새로운 질문으로 다시 한 번 물어 보겠다고 생각했습니다. 일부만 json 형식 인 구조화 된 데이터가 있지만 전체 데이터를 schemaRDD에 매핑해야합니다. 데이터는 다음과 같습니다.
03052015 04:13:20 { "recordType": "NEW", "data": { "keycol": "val1", "col2": "val2", "col3": "val3"}
각 줄은 날짜와 시간, json 형식의 텍스트로 시작합니다. json의 텍스트뿐만 아니라 날짜와 시간도 동일한 구조로 매핑해야합니다.
Python에서 시도했지만 Row가 RDD (이 경우 jsonRDD)를 사용하지 않기 때문에 분명히 작동하지 않습니다.
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
orderFile = sc.textFile(myfile)
orderLine = orderFile.map(lambda line: line.split(" ", 2))
anotherOrderLine = orderLine.map(lambda p: Row(date=p[0], time=p[1], content=sqlContext.jsonRDD(p[3])))
schemaOrder = sqlContext.inferSchema(anotherOrderLine)
schemaOrder.printSchema()
for x in schemaOrder.collect():
print x
목표는 schemaRDD에 대해 다음과 같은 쿼리를 실행할 수있는 것입니다.
select date, time, data.keycol, data.val1, data.val2, data.val3 from myOrder
전체 라인을 schemaRDD에 어떻게 매핑 할 수 있습니까?
도움을 주시면 감사하겠습니다.
가장 간단한 옵션은이 필드를 JSON에 추가하고 jsonRDD를 사용하는 것입니다.
내 데이터 :
03052015 04:13:20 {"recordType":"NEW","data":{"keycol":"val1","col1":"val5","col2":"val3"}}
03062015 04:13:20 {"recordType":"NEW1","data":{"keycol":"val2","col1":"val6","col2":"val3"}}
03072015 04:13:20 {"recordType":"NEW2","data":{"keycol":"val3","col1":"val7","col2":"val3"}}
03082015 04:13:20 {"recordType":"NEW3","data":{"keycol":"val4","col1":"val8","col2":"val3"}}
암호:
import json
def transform(data):
ts = data[:18].strip()
jss = data[18:].strip()
jsj = json.loads(jss)
jsj['ts'] = ts
return json.dumps(jsj)
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
rdd = sc.textFile('/sparkdemo/sample.data')
tbl = sqlContext.jsonRDD(rdd.map(transform))
tbl.registerTempTable("myOrder")
sqlContext.sql("select ts, recordType, data.keycol, data.col1, data.col2 data from myOrder").collect()
결과:
[Row(ts=u'03052015 04:13:20', recordType=u'NEW', keycol=u'val1', col1=u'val5', data=u'val3'), Row(ts=u'03062015 04:13:20', recordType=u'NEW1', keycol=u'val2', col1=u'val6', data=u'val3'), Row(ts=u'03072015 04:13:20', recordType=u'NEW2', keycol=u'val3', col1=u'val7', data=u'val3'), Row(ts=u'03082015 04:13:20', recordType=u'NEW3', keycol=u'val4', col1=u'val8', data=u'val3')]
코드에서 각 행에 대해 jsonRDD를 호출하는 문제가 있습니다. 이것은 올바르지 않습니다. RDD를 받아들이고 SchemaRDD를 반환합니다.
이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.
침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제
몇 마디 만하겠습니다