ロケーションイベントを送信するKafkaトピックがあります(key = user_id、value = user_location)。私はそれを読んで処理することができますKStream
:
KStreamBuilder builder = new KStreamBuilder();
KStream<String, Location> locations = builder
.stream("location_topic")
.map((k, v) -> {
// some processing here, omitted form clarity
Location location = new Location(lat, lon);
return new KeyValue<>(k, location);
});
それはうまくいきますが、私KTable
は各ユーザーの最後の既知の位置を持つ必要があります。どうすればできますか?
中間トピックへの書き込みと中間トピックからの読み取りを行うことができます。
// write to intermediate topic
locations.to(Serdes.String(), new LocationSerde(), "location_topic_aux");
// build KTable from intermediate topic
KTable<String, Location> table = builder.table("location_topic_aux", "store");
KTable
からを取得する簡単な方法はありKStream
ますか?これはKafka Streamsを使用する私の最初のアプリなので、おそらく何か明らかなものが見当たらないでしょう。
更新:
カフカ2.5では、新しいメソッドがKStream#toTable()
追加され、それは、変換するための便利な方法を提供しますKStream
にしますKTable
。詳細については、https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSLを参照してください。
元の答え:
現在、これを行う簡単な方法はありません。Confluent FAQで説明されているように、あなたのアプローチは絶対的に有効です:http : //docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an -集約ステップ
これは、コードに関して最も簡単な方法です。ただし、(a)追加のトピックを管理する必要があり、(b)Kafkaにデータが書き込まれ、Kafkaからデータが再読み取りされるため、ネットワークトラフィックが増えるという欠点があります。
「ダミーリデュース」を使用する1つの代替方法があります。
KStreamBuilder builder = new KStreamBuilder();
KStream<String, Long> stream = ...; // some computation that creates the derived KStream
KTable<String, Long> table = stream.groupByKey().reduce(
new Reducer<Long>() {
@Override
public Long apply(Long aggValue, Long newValue) {
return newValue;
}
},
"dummy-aggregation-store");
このアプローチは、オプション1に比べてコードに関しては多少複雑ですが、(a)手動によるトピック管理が不要であり、(b)Kafkaからデータを再度読み取る必要がないという利点があります。
全体として、あなたは自分で決める必要があります。
オプション2では、Kafka Streamsは内部の変更ログトピックを作成して、フォールトトレランスのためにKTableをバックアップします。したがって、どちらのアプローチでも、Kafkaに追加のストレージが必要になり、ネットワークトラフィックが増加します。全体として、これはオプション2のやや複雑なコードとオプション1の手動によるトピック管理の間のトレードオフです。
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加