Reactive-Kafka Stream Consumer:デッドレターが発生しました

anshul_cached

akkaのリアクティブkafkaライブラリを使用してKafkaからのメッセージを消費しようとしています。1つのメッセージが印刷され、その後、

[INFO] [01/24/2017 10:36:52.934] [CommittableSourceConsumerMain-akka.actor.default-dispatcher-5] [akka://CommittableSourceConsumerMain/system/kafka-consumer-1] Message [akka.kafka.KafkaConsumerActor$Internal$Stop$] from Actor[akka://CommittableSourceConsumerMain/deadLetters] to Actor[akka://CommittableSourceConsumerMain/system/kafka-consumer-1#-1726905274] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

これは私が実行しているコードです

import akka.actor.ActorSystem
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import org.apache.kafka.clients.consumer.ConsumerConfig
import play.api.libs.json._
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}

object CommittableSourceConsumerMain extends App {

  implicit val system = ActorSystem("CommittableSourceConsumerMain")
  implicit val materializer = ActorMaterializer()
  val consumerSettings =ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer).withBootstrapServers("localhost:9092").withGroupId("CommittableSourceConsumer").withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  val done =
    Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
      .mapAsync(1) { msg =>
        val record=(msg.record.value())
        val data=Json.parse(record)

         val recordType=data \ "data" \"event" \"type"

        val actualData=data \ "data" \ "row"

        if(recordType.as[String]=="created"){
          "Some saving logic"
      }

      else{

        "Some logic"

      }
        msg.committableOffset.commitScaladsl()
      }
      .runWith(Sink.ignore)
}
anshul_cached

私はついに解決策を見つけました。ストリームの実行時例外Futureが原因で、失敗のaが返され、ストリームがすぐに終了します。Akka-streamは、ランタイム例外を提供または表示しません。例外を知るために

done.onFailure{
        case NonFatal(e)=>println(e)
      }

例外はif-elseブロックにありました。また、例外が発生した場合、アクターストラテジーを使用してストリームを再開できます。

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

Kafka Stream: Consumer commit frequency

分類Dev

Kafka Stream Ksql Json

分類Dev

Kafka Stream BoundedMemoryRocksDBConfig

分類Dev

debezium-postgresのkafka-streamからkafka-streamデータを読み取ることができません

分類Dev

Kafka stream processor thread safe?

分類Dev

Kafka stream count distinct in scala?

分類Dev

Kafkaテンプレート:kafkaテンプレートによるメッセージの送信中にタイムアウト例外が発生しました

分類Dev

Kafka Stream:グレースフルシャットダウン

分類Dev

KafkaでSpring Cloud Streamを使用した重複メッセージ処理

分類Dev

Spark KafkaStreamに対するFlinkKafka Streamの利点は?そして、Flink上のKafka Stream?

分類Dev

Spring Cloud Stream Kafka with Confluentは、Spring Kafka with Confluentと同じメッセージを生成しません

分類Dev

0.7から0.8.1.1にアップグレードした後、埋め込みkafkaキューへの生成中にエラーが発生しました

分類Dev

Kafka Stream:アプリケーションの再起動時のKafka WindowedStreamの動作

分類Dev

Confluent.Kafkaユニットテストメッセージ.NETCoreのアップグレード中にエラーが発生しました

分類Dev

How to set offset committed by the consumer group using Spark's Direct Stream for Kafka?

分類Dev

spring-cloud-streamのDLQの分割されたKafkaトピック

分類Dev

特定のフィールドでのKafka Stream-GlobalKTable結合

分類Dev

Spring Cloud Stream埋め込みヘッダー形式(Kafka)

分類Dev

kafkaでトピックを作成中にエラーが発生しました

分類Dev

Spring 5 Web Reactive - Web Client - Use of flatmap() on the response stream

分類Dev

System.Reactive - Combining 2 observables into a stream with all permutations

分類Dev

Spring Cloud Stream Kafkaは、1つのバインディングにのみヘッダーを追加します

分類Dev

Kafka_consumer_offsetsはレコードの一貫性を生み出しました

分類Dev

Kafkaの起動中にエラーが発生しました

分類Dev

Kafka Stream Suppressは、ウィンドウ処理後に出力を生成しません

分類Dev

Spring Cloud Stream(Hoxton)Kafka Producer / ConsumerがEmbeddedKafkaとの統合テストで機能しない

分類Dev

Spring-kafkaを使用したKafkaのデッドレターキュー(DLQ)

分類Dev

kafka-stream:CorruptRecordExceptionを取得

分類Dev

External system queries during Kafka Stream processing

Related 関連記事

  1. 1

    Kafka Stream: Consumer commit frequency

  2. 2

    Kafka Stream Ksql Json

  3. 3

    Kafka Stream BoundedMemoryRocksDBConfig

  4. 4

    debezium-postgresのkafka-streamからkafka-streamデータを読み取ることができません

  5. 5

    Kafka stream processor thread safe?

  6. 6

    Kafka stream count distinct in scala?

  7. 7

    Kafkaテンプレート:kafkaテンプレートによるメッセージの送信中にタイムアウト例外が発生しました

  8. 8

    Kafka Stream:グレースフルシャットダウン

  9. 9

    KafkaでSpring Cloud Streamを使用した重複メッセージ処理

  10. 10

    Spark KafkaStreamに対するFlinkKafka Streamの利点は?そして、Flink上のKafka Stream?

  11. 11

    Spring Cloud Stream Kafka with Confluentは、Spring Kafka with Confluentと同じメッセージを生成しません

  12. 12

    0.7から0.8.1.1にアップグレードした後、埋め込みkafkaキューへの生成中にエラーが発生しました

  13. 13

    Kafka Stream:アプリケーションの再起動時のKafka WindowedStreamの動作

  14. 14

    Confluent.Kafkaユニットテストメッセージ.NETCoreのアップグレード中にエラーが発生しました

  15. 15

    How to set offset committed by the consumer group using Spark's Direct Stream for Kafka?

  16. 16

    spring-cloud-streamのDLQの分割されたKafkaトピック

  17. 17

    特定のフィールドでのKafka Stream-GlobalKTable結合

  18. 18

    Spring Cloud Stream埋め込みヘッダー形式(Kafka)

  19. 19

    kafkaでトピックを作成中にエラーが発生しました

  20. 20

    Spring 5 Web Reactive - Web Client - Use of flatmap() on the response stream

  21. 21

    System.Reactive - Combining 2 observables into a stream with all permutations

  22. 22

    Spring Cloud Stream Kafkaは、1つのバインディングにのみヘッダーを追加します

  23. 23

    Kafka_consumer_offsetsはレコードの一貫性を生み出しました

  24. 24

    Kafkaの起動中にエラーが発生しました

  25. 25

    Kafka Stream Suppressは、ウィンドウ処理後に出力を生成しません

  26. 26

    Spring Cloud Stream(Hoxton)Kafka Producer / ConsumerがEmbeddedKafkaとの統合テストで機能しない

  27. 27

    Spring-kafkaを使用したKafkaのデッドレターキュー(DLQ)

  28. 28

    kafka-stream:CorruptRecordExceptionを取得

  29. 29

    External system queries during Kafka Stream processing

ホットタグ

アーカイブ