ドキュメントからApacheSparkにデータストリームを読み込む標準的な方法は次のとおりです。
events = spark.readStream \
.format("json") \ # or parquet, kafka, orc...
.option() \ # format specific options
.schema(my_schema) \ # required
.load("path/to/data")
しかし、スキーマを適用する前に、いくつかのデータをクリーンアップしていくつかのフィールドを再配置する必要があります。
events = spark.readStream \
.format("json") \ # or parquet, kafka, orc...
.option() \ # format specific options
.schema(my_schema) \ # required
**.map(custom_function)** # apply a custom function to the json object
.load("path/to/data")
構造化ストリーミングを使用してApacheSparkでこれを行う効率的な方法はありますか?
tl; dr簡単に言えば、データセットをロードする前にこれを行うことはできません。
私の頭に浮かぶ唯一の方法は、データセットを文字列のセットとしてロードし、一連の変換withColumn
またはselect
変換でクリーンアップして、事実上あなたになること.map(custom_function)
です。
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加