Kafkaトピックからストリームを読み取るときに、Spark Structured Streamingにタイムアウトの問題がありますか?

yyuankm

構造化ストリーミングでforeachbatchを使用してkafkaトピックからストリームを読み取るSparkジョブを実装しました。

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "mykafka.broker.io:6667")
  .option("subscribe", "test-topic")
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.ssl.truststore.location", "/home/hadoop/cacerts")
  .option("kafka.ssl.truststore.password", tspass)
  .option("kafka.ssl.truststore.type", "JKS")
  .option("kafka.sasl.kerberos.service.name", "kafka")
  .option("kafka.sasl.mechanism", "GSSAPI")
  .option("groupIdPrefix","MY_GROUP_ID")
  .load()

val streamservice = df.selectExpr("CAST(value AS STRING)")
  .select(from_json(col("value"), schema).as("data"))
  .select("data.*")


var stream_df = streamservice
  .selectExpr("cast(id as string) id", "cast(x as int) x")

val monitoring_stream = stream_df.writeStream
  .trigger(Trigger.ProcessingTime("120 seconds"))
  .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    if(!batchDF.isEmpty) { }
  }
  .start()
  .awaitTermination()

以下の質問があります。

  1. kafkaトピックに長期間データがない場合、stream_df.writeStreamは自動的に終了しますか?これにタイムアウト制御はありますか?

  2. kafkaトピックがkafkaブローカーから削除された場合、stream_df.writeStreamは終了しますか?

上記の2つのケースで、sparkジョブが終了せずにkafkaトピックを監視し続けることを願っています。kafkaコネクタやstream_df.writerstreamに特別な設定が必要ですか?

マイク
  1. kafkaトピックに長期間データがない場合、stream_df.writeStreamは自動的に終了しますか?これにタイムアウト制御はありますか?

クエリの終了は、処理されているデータとは無関係です。Kafkaトピックに対して新しいメッセージが生成されない場合でも、クエリはストリームとして実行されているため、実行を続けます。

私はそれがあなたがテスト中にあなた自身がすでに理解したことだと思います。構造化ストリーミングクエリを使用してKafkaからのデータを処理しており、長時間アイドル状態になることはありません(たとえば、週末の営業時間外)。

  1. kafkaトピックがkafkaブローカーから削除された場合、stream_df.writeStreamは終了しますか?

デフォルトでは、クエリの実行中にKafkaトピックを削除すると、例外がスローされます。

ERROR MicroBatchExecution: Query [id = b1f84242-d72b-4097-97c9-ee603badc484, runId = 752b0fe4-2762-4fff-8912-f4cffdbd7bdc] terminated with error
java.lang.IllegalStateException: Partition test-0's offset was changed from 1 to 0, some data may have been missed. 
Some data may have been lost because they are not available in Kafka any more; either the
 data was aged out by Kafka or the topic may have been deleted before all the data in the
 topic was processed. If you don't want your streaming query to fail on such cases, set the
 source option "failOnDataLoss" to "false".

クエリオプションのfailOnDataLossデフォルトがtrue。であるため、「デフォルト」について説明しました例外メッセージで説明されているように、これをfalseに設定して、ストリーミングクエリを実行させることができます。このオプションは、構造化ストリーミング+ Kafka統合ガイドで次のように説明されています

「データが失われる可能性がある場合(トピックが削除されている、オフセットが範囲外であるなど)にクエリを失敗させるかどうか。これは誤報の可能性があります。期待どおりに機能しない場合は無効にできます。」

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

Related 関連記事

ホットタグ

アーカイブ