내가 달성하려는 것은 정확히이 질문에 대한 ( Here )입니다. 제 경우에는 Scala가 아닌 Python / Pyspark를 사용하고 있습니다.
스키마를 포함하는 Kafka 연결 메시지의 "페이로드"부분을 추출하려고합니다.
샘플 메시지 :
{"schema":{"type":"struct","name":"emp_table","fields":[{"field":"emp_id","type":"string"},{"field":"emp_name","type":"String"},{"field":"city","type":"string"},{"field":"emp_sal","type":"string"},{"field":"manager_name","type":"string"}]},"payload":{"emp_id":"1","emp_name":"abc","city":"NYK","emp_sal":"100000","manager_name":"xyz"}}
1 단계- "페이로드"부분에 대한 스키마 정의 :
payload_schema = StructType([
StructField("emp_id", StringType(), False),
StructField("emp_name", StringType(), True),
StructField("city", StringType(), True),
StructField("emp_sal", StringType(), True),
StructField("manager_name", StringType(), True)])
2 단계-Kafka에서 읽기 :
df =spark.readStream.format("kafka")
3 단계-Kafka 메시지에서 메시지 값 가져 오기 :
kafka_df = df.selectExpr("CAST(value AS STRING)")
4 단계- "페이로드"만 추출 ( 여기에 갇혀 있음 ) :
import pyspark.sql.functions as psf
emp_df = kafka_df\
.select(psf.from_json(psf.col('value'), payload_schema).alias("DF"))\
.select("DF.*")
from_json () 함수에 전달하기 전에 JSON 문자열에서 페이로드를 추출하는 방법을 알아낼 수 없었기 때문에이 부분에 갇혀 있습니다.
참고 : 그러나 from_json ()에서 사용하기 전에 전체 메시지에 대한 전체 스키마를 정의해야한다는 것을 알고 있습니다. "페이로드"json 문자열 부분 만 얻으려고합니다.
SQL 함수를 사용할 수 있습니다 get_json_object
.
import pyspark.sql.functions as psf
kafka_df
.select(psf.get_json_object(kafka_df['value'],"$.payload").alias('payload'))
.select(psf.from_json(psf.col('payload'), payload_schema).alias("DF"))
.select("DF.*")
또는에서 사용하기 전에 전체 메시지에 대한 전체 스키마를 정의해야합니다 from_json
.
즉, 스키마가 아래와 같이 표시되어야합니다.
full_schema = StructType([
StructField("schema", StructType([
StructField("type", StringType(), False),
StructField("name", StringType(), False),
StructField("fields", StructType([
StructField("field", StringType(), False),
StructField("type", StringType(), False)
]),
StructField("payload", StructType([
StructField("emp_id", StringType(), False),
StructField("emp_name", StringType(), True),
StructField("city", StringType(), True),
StructField("emp_sal", StringType(), True),
StructField("manager_name", StringType(), True)
])
])
파이썬에서 스키마 내에서 배열을 정의하는 방법을 완전히 모르기 때문에이 스키마 정의를 다시 확인하십시오. 그러나 아이디어가 명확하기를 바랍니다.
이 작업이 완료되면 다음을 통해 페이로드 필드를 선택할 수 있습니다.
import pyspark.sql.functions as psf
emp_df = kafka_df\
.select(psf.from_json(psf.col('value'), full_schema).alias("DF"))\
.select("DF.payload.*")
이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.
침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제
몇 마디 만하겠습니다