Kafka Streams API:KStreamからKTable

グイド:

ロケーションイベントを送信するKafkaトピックがあります(key = user_id、value = user_location)。私はそれを読んで処理することができますKStream

KStreamBuilder builder = new KStreamBuilder();

KStream<String, Location> locations = builder
        .stream("location_topic")
        .map((k, v) -> {
            // some processing here, omitted form clarity
            Location location = new Location(lat, lon);
            return new KeyValue<>(k, location);
        });

それはうまくいきますが、私KTableは各ユーザーの最後の既知の位置を持つ必要があります。どうすればできますか?

中間トピックへの書き込みと中間トピックからの読み取りを行うことができます。

// write to intermediate topic
locations.to(Serdes.String(), new LocationSerde(), "location_topic_aux");

// build KTable from intermediate topic
KTable<String, Location> table = builder.table("location_topic_aux", "store");

KTableからを取得する簡単な方法はありKStreamますか?これはKafka Streamsを使用する私の最初のアプリなので、おそらく何か明らかなものが見当たらないでしょう。

Matthias J. Sax:

更新:

カフカ2.5では、新しいメソッドがKStream#toTable()追加され、それは、変換するための便利な方法を提供しますKStreamにしますKTable詳細については、https//cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSLを参照してください。

元の答え:

現在、これを行う簡単な方法はありません。Confluent FAQで説明されているように、あなたのアプローチは絶対的に有効です:http : //docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an -集約ステップ

これは、コードに関して最も簡単な方法です。ただし、(a)追加のトピックを管理する必要があり、(b)Kafkaにデータが書き込まれ、Kafkaからデータが再読み取りされるため、ネットワークトラフィックが増えるという欠点があります。

「ダミーリデュース」を使用する1つの代替方法があります。

KStreamBuilder builder = new KStreamBuilder();
KStream<String, Long> stream = ...; // some computation that creates the derived KStream

KTable<String, Long> table = stream.groupByKey().reduce(
    new Reducer<Long>() {
        @Override
        public Long apply(Long aggValue, Long newValue) {
            return newValue;
        }
    },
    "dummy-aggregation-store");

このアプローチは、オプション1に比べてコードに関しては多少複雑ですが、(a)手動によるトピック管理が不要であり、(b)Kafkaからデータを再度読み取る必要がないという利点があります。

全体として、あなたは自分で決める必要があります。

オプション2では、Kafka Streamsは内部の変更ログトピックを作成して、フォールトトレランスのためにKTableをバックアップします。したがって、どちらのアプローチでも、Kafkaに追加のストレージが必要になり、ネットワークトラフィックが増加します。全体として、これはオプション2のやや複雑なコードとオプション1の手動によるトピック管理の間のトレードオフです。

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

Kafka Streams API:KStreamからKTable

分類Dev

Kafka Streams API:KStreamからKTable

分類Dev

Kafka Streams –同じトピックでKTableとKStreamを取得するための最良の方法は?

分類Dev

Kafka Streams:RocksDB TTL

分類Dev

Kafka Connect and Streams

分類Dev

Kafka Streams TimestampExtractor

分類Dev

Message pre-processing (topic - topic) - Kafka Connect API vs. Streams vs Kafka Consumer?

分類Dev

Scala-KStream(Kafka Streams)をフィルタリングする方法

分類Dev

Kafka Streams local state stores

分類Dev

Kafka streams aggregate throwing Exception

分類Dev

Kafka Streams GlobalKTable sychronization to applications

分類Dev

Kafka Streams: POJO serialization/deserialization

分類Dev

Kafka Connect vs Streams for Sinks

分類Dev

Kafka Streams - missing source topic

分類Dev

Kafka:コンシューマーAPIとStreams API

分類Dev

メッセージハブでのKafka Streams KTable構成エラー

分類Dev

Kafka Streams API:セッションウィンドウの例外

分類Dev

Kafka Streams APIのArrayList Serdeに関する問題

分類Dev

Is it possible to use @KafkaStreamsStateStore annotation on Spring Cloud Stream Kafka Streams 3.0 Binder style API methods?

分類Dev

Kafka Streamsプロセッサを.Netに実装しますか?

分類Dev

Kafka Streamsで別々のトピックに参加しますか?

分類Dev

Kafka Streams:GlobalStoreはスレッドセーフですか?

分類Dev

Kafka Streams:ConsumerRebalanceListenerの実装

分類Dev

Why is windowing now working for Kafka Streams?

分類Dev

NPE while deserializing avro messages in kafka streams

分類Dev

Kafka Streams:1つのレコードから複数のレコード

分類Dev

kafka-streamsの最新のオフセットから常に消費する方法

分類Dev

メッセージの前処理(トピック-トピック)-Kafka Connect API vs. Streams vs Kafka Consumer?

分類Dev

Kafka Streams KTable Storeは、この場合、圧縮された入力トピック、代替案には役立ちませんか?

Related 関連記事

  1. 1

    Kafka Streams API:KStreamからKTable

  2. 2

    Kafka Streams API:KStreamからKTable

  3. 3

    Kafka Streams –同じトピックでKTableとKStreamを取得するための最良の方法は?

  4. 4

    Kafka Streams:RocksDB TTL

  5. 5

    Kafka Connect and Streams

  6. 6

    Kafka Streams TimestampExtractor

  7. 7

    Message pre-processing (topic - topic) - Kafka Connect API vs. Streams vs Kafka Consumer?

  8. 8

    Scala-KStream(Kafka Streams)をフィルタリングする方法

  9. 9

    Kafka Streams local state stores

  10. 10

    Kafka streams aggregate throwing Exception

  11. 11

    Kafka Streams GlobalKTable sychronization to applications

  12. 12

    Kafka Streams: POJO serialization/deserialization

  13. 13

    Kafka Connect vs Streams for Sinks

  14. 14

    Kafka Streams - missing source topic

  15. 15

    Kafka:コンシューマーAPIとStreams API

  16. 16

    メッセージハブでのKafka Streams KTable構成エラー

  17. 17

    Kafka Streams API:セッションウィンドウの例外

  18. 18

    Kafka Streams APIのArrayList Serdeに関する問題

  19. 19

    Is it possible to use @KafkaStreamsStateStore annotation on Spring Cloud Stream Kafka Streams 3.0 Binder style API methods?

  20. 20

    Kafka Streamsプロセッサを.Netに実装しますか?

  21. 21

    Kafka Streamsで別々のトピックに参加しますか?

  22. 22

    Kafka Streams:GlobalStoreはスレッドセーフですか?

  23. 23

    Kafka Streams:ConsumerRebalanceListenerの実装

  24. 24

    Why is windowing now working for Kafka Streams?

  25. 25

    NPE while deserializing avro messages in kafka streams

  26. 26

    Kafka Streams:1つのレコードから複数のレコード

  27. 27

    kafka-streamsの最新のオフセットから常に消費する方法

  28. 28

    メッセージの前処理(トピック-トピック)-Kafka Connect API vs. Streams vs Kafka Consumer?

  29. 29

    Kafka Streams KTable Storeは、この場合、圧縮された入力トピック、代替案には役立ちませんか?

ホットタグ

アーカイブ