私の現在のプロジェクトでは、Kafka、Kafka Connect、Elasticsearchを使用してデータパイプラインを作成しました。データは最終的にトピック「signal-topic」になり、フォームから外れます
KeyValue<id:String, obj:Signal>
今、私はKafka Streamsを導入して、KafkaからElasticsearchへの途中でデータの処理を実行できるようにしようとしています。
私の最初の目標は、さまざまな種類の副次情報を使用してデータを拡張できるようにすることです。典型的なシナリオは、データにすでに存在するいくつかの情報に基づいて、データに別のフィールドを添付することです。たとえば、データには「rawevent」フィールドが含まれており、それに基づいて「event-description」を追加してから、別のトピックに出力したいと思います。
これを実装する「正しい」方法は何でしょうか?
私はマビーがカフカの別のトピックに関するサイドデータを持っていることを考えていました
KeyValue<rawEvent:String, eventDesc:String>
ストリームが2つのトピックに参加していますが、それを実現する方法がわかりません。
これは可能でしょうか?私が遭遇したすべての例では、データソースのキーが同じである必要があるようです。私のものはそうではないので、それが可能かどうかはわかりません。誰かがこれをどのように行うことができるかについてのスニペットを持っているなら、それは素晴らしいでしょう。
前もって感謝します。
2つの可能性があります。
rawEvent
から抽出しSignal
て新しいキーとして設定し、に対して結合を行うことができKTable<rawEvent:String, eventDesc:String>
ます。何かのようなものKStream#selectKey(...)#join(KTable...)
rawEvent
、結合を計算するためにGlobalKTableルックアップを実行するために使用されるKStream(この場合)から非キー結合属性を抽出できます。KStream-KTable結合は時間どおりに同期されますが、KStream-GlobalKTable結合は同期されないため、両方の結合は異なるセマンティクスを提供することに注意してください。詳細については、このブログ投稿を確認してください:https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加