Spark Structured Streaming (pyspark)을 사용하여 (스키마 및 페이로드)로 Kafka Connect JSONConverter 메시지에서 "페이로드"추출

죽임

내가 달성하려는 것은 정확히이 질문에 대한 ( Here )입니다. 제 경우에는 Scala가 아닌 Python / Pyspark를 사용하고 있습니다.

스키마를 포함하는 Kafka 연결 메시지의 "페이로드"부분을 추출하려고합니다.

샘플 메시지 :

{"schema":{"type":"struct","name":"emp_table","fields":[{"field":"emp_id","type":"string"},{"field":"emp_name","type":"String"},{"field":"city","type":"string"},{"field":"emp_sal","type":"string"},{"field":"manager_name","type":"string"}]},"payload":{"emp_id":"1","emp_name":"abc","city":"NYK","emp_sal":"100000","manager_name":"xyz"}}

1 단계- "페이로드"부분에 대한 스키마 정의 :

payload_schema = StructType([
StructField("emp_id", StringType(), False),
StructField("emp_name", StringType(), True),
StructField("city", StringType(), True),
StructField("emp_sal", StringType(), True),
StructField("manager_name", StringType(), True)])

2 단계-Kafka에서 읽기 :

df =spark.readStream.format("kafka")

3 단계-Kafka 메시지에서 메시지 값 가져 오기 :

kafka_df = df.selectExpr("CAST(value AS STRING)")

4 단계- "페이로드"만 추출 ( 여기에 갇혀 있음 ) :

    import pyspark.sql.functions as psf

    emp_df = kafka_df\
    .select(psf.from_json(psf.col('value'), payload_schema).alias("DF"))\
    .select("DF.*")

from_json () 함수에 전달하기 전에 JSON 문자열에서 페이로드를 추출하는 방법을 알아낼 수 없었기 때문에이 부분에 갇혀 있습니다.

참고 : 그러나 from_json ()에서 사용하기 전에 전체 메시지에 대한 전체 스키마를 정의해야한다는 것을 알고 있습니다. "페이로드"json 문자열 부분 만 얻으려고합니다.

마이크

SQL 함수를 사용할 수 있습니다 get_json_object.

import pyspark.sql.functions as psf

kafka_df
  .select(psf.get_json_object(kafka_df['value'],"$.payload").alias('payload'))
  .select(psf.from_json(psf.col('payload'), payload_schema).alias("DF"))
  .select("DF.*")

또는에서 사용하기 전에 전체 메시지에 대한 전체 스키마를 정의해야합니다 from_json.

즉, 스키마가 아래와 같이 표시되어야합니다.

full_schema = StructType([
  StructField("schema", StructType([
    StructField("type", StringType(), False),
    StructField("name", StringType(), False),
    StructField("fields", StructType([
      StructField("field", StringType(), False),
      StructField("type", StringType(), False)
    ]),
  StructField("payload", StructType([
    StructField("emp_id", StringType(), False),
    StructField("emp_name", StringType(), True),
    StructField("city", StringType(), True),
    StructField("emp_sal", StringType(), True),
    StructField("manager_name", StringType(), True)
  ])
])

파이썬에서 스키마 내에서 배열을 정의하는 방법을 완전히 모르기 때문에이 스키마 정의를 다시 확인하십시오. 그러나 아이디어가 명확하기를 바랍니다.

이 작업이 완료되면 다음을 통해 페이로드 필드를 선택할 수 있습니다.

import pyspark.sql.functions as psf

    emp_df = kafka_df\
    .select(psf.from_json(psf.col('value'), full_schema).alias("DF"))\
    .select("DF.payload.*")

이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.

침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제

에서 수정
0

몇 마디 만하겠습니다

0리뷰
로그인참여 후 검토

관련 기사

Related 관련 기사

뜨겁다태그

보관