Kafkaトピック内の特定のパーティションからSparkを使用してデータをストリーミングする

Amr916

私はすでにここをクリックして同様の質問を見ました

しかし、それでも特定のパーティションからデータをストリーミングできないかどうか知りたいですか?SparkStreamingサブスクライブメソッドでKafkaConsumerStrategiesを使用しました

ConsumerStrategies.Subscribe [String、String](topics、kafkaParams、offsets)

これは、トピックとパーティションをサブスクライブするために試したコードスニペットです。

val topics = Array("cdc-classic")
val topic="cdc-classic"
val partition=2;
val offsets= 
Map(new TopicPartition(topic, partition) -> 2L)//I am not clear with this line, (I tried to set topic and partition number as 2)
val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams,offsets))

しかし、このコードを実行すると、次の例外が発生します。

     Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 0.0 failed 1 times, most recent failure: Lost task 5.0 in stage 0.0 (TID 5, localhost, executor driver): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {cdc-classic-2=2}
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:878)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1110)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)
Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {cdc-classic-2=2}
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:878)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1110)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)

PS:cdc-classicは、17個のパーティションを持つトピック名です

Amr916

この行にデータをストリーミングするには、パーティション番号とパーティションの開始オフセットを指定します。

Map(new TopicPartition(topic, partition) -> 2L)

どこ、

  • パーティションはパーティション番号です

  • 2Lは、パーティションの開始オフセット番号を示します。

次に、選択したパーティションからデータをストリーミングできます。

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

トピック内の特定のパーティションからのストリーミング(Kafka Streams)

分類Dev

Sparkストリーミング-Kafkaトピックの特定のパーティションを消費することは可能ですか?

分類Dev

kafkaとsparkストリーミングを使用して直接ストリームを作成する前に、トピックのパーティション数を取得しますか?

分類Dev

Kafkaコンシューマー-特定のkafkaトピックパーティションからのイベントのポーリングを一時停止して、それを遅延キューとして使用します

分類Dev

KafkaトピックパーティションをSparkストリーミングに

分類Dev

Kafkaを使用したデータモデリング?トピックとパーティション

分類Dev

複数のワーカー(同じ数のパーティション)を使用して、同じトピックでkafkaコンシューマーアプリケーションをスケーリングする方法

分類Dev

パターンマッチングを使用して特定のディレクトリをコピーするBashスクリプト

分類Dev

spring-kafkaを使用して、特定のオフセットで特定のトピックとパーティションから古いkafkaメッセージを再送信(読み取り)する方法は?

分類Dev

ターゲットディレクトリのパーミッションをmvして採用する方法

分類Dev

KafkaクライアントからのKafkaトピックのパーティション数を増やす

分類Dev

特定のディレクトリをあるコンピュータから別のコンピュータにミラーリングしますか?

分類Dev

NodeJSの複数のKafkaトピックパーティションにデータを送信する方法

分類Dev

Kafkaは、トピック内の複数のオブジェクトをストリーミングし、非セリゼーションを行います

分類Dev

Spark Structured Streaming Kafka Writerは、特定のパーティションへのデータの書き込みをサポートしていますか?

分類Dev

kafkaトピックのパーティションのメモリ使用量を確認する方法は?

分類Dev

Kafkaトピックのパーティション数を減らす方法は?

分類Dev

TreeMapを使用して、ディレクトリ内のテキストファイルのコレクションから読み取っているデータをキャッシュするにはどうすればよいですか?

分類Dev

特定のインターフェースを介してデーモンからのトラフィックをルーティングする

分類Dev

Spring Bootを使用したkafkaトピックのカスタムパーティション

分類Dev

カートにリダイレクトするカスタムテーブルからのセッションデータを設定して使用する

分類Dev

スクリプトユーティリティを使用してすべてのターミナルセッションを自動的に記録する方法

分類Dev

kafkaトピックでデータパーティションを賢く見ることは可能ですか?

分類Dev

kafkaがトピックのパーティション数を取得する

分類Dev

Linuxでディスクをデタッチする方法(パーティション/ディスクのクラッシュをシミュレートする)?

分類Dev

プログラムで(JAVA)Kafkaトピックからパーティションを作成/削除します

分類Dev

kafkaで他のVMからSparkストリーミングを使用する方法

分類Dev

Kafkaを使用したSpark構造化ストリーミングで、sparkが複数のトピックのオフセットを管理する方法

分類Dev

Sparkを使用してS3データセットから最新のパーティションを効率的に見つける方法

Related 関連記事

  1. 1

    トピック内の特定のパーティションからのストリーミング(Kafka Streams)

  2. 2

    Sparkストリーミング-Kafkaトピックの特定のパーティションを消費することは可能ですか?

  3. 3

    kafkaとsparkストリーミングを使用して直接ストリームを作成する前に、トピックのパーティション数を取得しますか?

  4. 4

    Kafkaコンシューマー-特定のkafkaトピックパーティションからのイベントのポーリングを一時停止して、それを遅延キューとして使用します

  5. 5

    KafkaトピックパーティションをSparkストリーミングに

  6. 6

    Kafkaを使用したデータモデリング?トピックとパーティション

  7. 7

    複数のワーカー(同じ数のパーティション)を使用して、同じトピックでkafkaコンシューマーアプリケーションをスケーリングする方法

  8. 8

    パターンマッチングを使用して特定のディレクトリをコピーするBashスクリプト

  9. 9

    spring-kafkaを使用して、特定のオフセットで特定のトピックとパーティションから古いkafkaメッセージを再送信(読み取り)する方法は?

  10. 10

    ターゲットディレクトリのパーミッションをmvして採用する方法

  11. 11

    KafkaクライアントからのKafkaトピックのパーティション数を増やす

  12. 12

    特定のディレクトリをあるコンピュータから別のコンピュータにミラーリングしますか?

  13. 13

    NodeJSの複数のKafkaトピックパーティションにデータを送信する方法

  14. 14

    Kafkaは、トピック内の複数のオブジェクトをストリーミングし、非セリゼーションを行います

  15. 15

    Spark Structured Streaming Kafka Writerは、特定のパーティションへのデータの書き込みをサポートしていますか?

  16. 16

    kafkaトピックのパーティションのメモリ使用量を確認する方法は?

  17. 17

    Kafkaトピックのパーティション数を減らす方法は?

  18. 18

    TreeMapを使用して、ディレクトリ内のテキストファイルのコレクションから読み取っているデータをキャッシュするにはどうすればよいですか?

  19. 19

    特定のインターフェースを介してデーモンからのトラフィックをルーティングする

  20. 20

    Spring Bootを使用したkafkaトピックのカスタムパーティション

  21. 21

    カートにリダイレクトするカスタムテーブルからのセッションデータを設定して使用する

  22. 22

    スクリプトユーティリティを使用してすべてのターミナルセッションを自動的に記録する方法

  23. 23

    kafkaトピックでデータパーティションを賢く見ることは可能ですか?

  24. 24

    kafkaがトピックのパーティション数を取得する

  25. 25

    Linuxでディスクをデタッチする方法(パーティション/ディスクのクラッシュをシミュレートする)?

  26. 26

    プログラムで(JAVA)Kafkaトピックからパーティションを作成/削除します

  27. 27

    kafkaで他のVMからSparkストリーミングを使用する方法

  28. 28

    Kafkaを使用したSpark構造化ストリーミングで、sparkが複数のトピックのオフセットを管理する方法

  29. 29

    Sparkを使用してS3データセットから最新のパーティションを効率的に見つける方法

ホットタグ

アーカイブ