CSVファイルをストリーミングしたKafkaトピックTranfer_Historyがあります。ここで、各PARTY_IDの発生をカウントしたいと思います。次に、変換を適用した後:カウントが20未満の場合は、新しいトピックCHURNに配置し、20を超える場合は、トピックLOYALに配置します。#JAVAを使用しています。
パブリッククラスFirstFilterer {
public static void main(String[] args) {
final StreamsBuilder builder = new StreamsBuilder();
/*input messages example
{"155555","11.11.2016 11:12}
{"155555","11.11.2016 13:12}
{"155556","11.11.2016 13:12}
result to be achived:
{"155555","2"}
{"155556","1"}
*/
builder.stream("test_topic_3")
// .map()
.groupByKey()
// .windowedBy(Window) // This may or may not be required
.count()
.toStream()
.map(
(key,count) -> new KeyValue<>(key.toString(),count)
)
.to("test_output_filtered_3");//this topic is empty after the run
私はカフカに不慣れです多くのplzが私を助けてくれることを知りません
これは、KafkaStreamsを介して非常に簡単に実現できます。まず、KStreamとKTableのバックグラウンドがあることを確認します。以下の手順に従う必要があります。
builder.<Keytype, ValueType>stream(YourInputTopic))
.map()
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(10))) // This may or may not be required
in your case
.count()
.toStream()
.map((Windowed<String> key, Long count) -> new KeyValue<>(key.key(),count.toString()))
.filter((k,v)-> Long.parseLong(v) > 20) // This is the filter
.to(TopicName));
注:これは、この偉業を達成する方法のアイデアを与える単なる擬似コードです
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加