How to count the frequency of each ID in kafka Topic

Fury Fazu

I have a Kafka Topic Tranfer_History in which I streamed a CSV file. Now I want to count the occurrence of each PARTY_ID. Then after I want to apply the transformation: if the count is less than 20 put it to the new topic CHURN and if greater than 20 put it to topic LOYAL #I am using JAVA

public class FirstFilterer {

public static void main(String[] args) {

    final StreamsBuilder builder = new StreamsBuilder();

    /*input messages example
     {"155555","11.11.2016 11:12}
     {"155555","11.11.2016 13:12}
     {"155556","11.11.2016 13:12}
     result to be achived:
     {"155555","2"}
     {"155556","1"}
     */
    builder.stream("test_topic_3")
//                .map()
                  .groupByKey()
//                .windowedBy(Window) // This may or may not be required
                  .count()
                  .toStream()
                  .map(
                    (key,count) -> new KeyValue<>(key.toString(),count)
            )
            .to("test_output_filtered_3");//this topic is empty after the run

I am new to Kafka don't know much plz help me out

Sammy

This can be achieved through Kafka Streams very easily. First ensure that you have a background of KStream & KTable. You need to follow below steps.

 builder.<Keytype, ValueType>stream(YourInputTopic))
    .map()
    .groupByKey()
    .windowedBy(TimeWindows.of(Duration.ofSeconds(10))) // This may or may not be required 
                                                           in your case
    .count()
    .toStream()
    .map((Windowed<String> key, Long count) -> new KeyValue<>(key.key(),count.toString()))
    .filter((k,v)-> Long.parseLong(v) > 20) // This is the filter
    .to(TopicName));

Note : This is just a pseudocode which will give you an idea of how to achieve this feat

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

How to see a topic creation and alteration timestamp in Kafka

分類Dev

How to get the processing kafka topic name dynamically in Flink Kafka Consumer?

分類Dev

How to count the frequency of words with CountVectorizer in spark ML?

分類Dev

How to spread existing kafka topic partitions into more directories?

分類Dev

How can I sort by word frequency and then sort alphabetically within each frequency in Ruby?

分類Dev

SQL frequency count

分類Dev

Count frequency of words and string

分類Dev

Kafka topic partitions to Spark streaming

分類Dev

Cannot reassign kafka topic partition

分類Dev

Kafka Streams - missing source topic

分類Dev

Kafka MirrorMaker - Deletion of a topic is not replicated

分類Dev

Is it possible to specify a kafka topic in a Kafka Connect config?

分類Dev

Kafka Stream: Consumer commit frequency

分類Dev

How to count occurrences for each value in MultiMap ? (java)

分類Dev

How to count an occurance of a specific value in each partition?

分類Dev

How to count nonblank values in each dataframe row

分類Dev

How to count the number of a specific character in each line?

分類Dev

WARN Error while fetching metadata with correlation id 1 : {MY_TOPIC?=INVALID_TOPIC_EXCEPTION} (org.apache.kafka.clients.NetworkClient)

分類Dev

Kafka Consumer get assigned partitions for a specific topic

分類Dev

Kafka topic creation best-practice

分類Dev

C# Unable to consume message on Kafka topic?

分類Dev

Kafka topic data is not getting deleted in windows

分類Dev

Custom compaction for Kafka topic on the broker side?

分類Dev

How to add the cluster ID to each component (igraph)

分類Dev

How to vectorize a function for each ID# in pandas

分類Dev

How to find the statistical mode of each ID

分類Dev

Message pre-processing (topic - topic) - Kafka Connect API vs. Streams vs Kafka Consumer?

分類Dev

generate field-value frequency count with jq

分類Dev

How to count the values corresponding to each unique value in another column in a dataframe?

Related 関連記事

  1. 1

    How to see a topic creation and alteration timestamp in Kafka

  2. 2

    How to get the processing kafka topic name dynamically in Flink Kafka Consumer?

  3. 3

    How to count the frequency of words with CountVectorizer in spark ML?

  4. 4

    How to spread existing kafka topic partitions into more directories?

  5. 5

    How can I sort by word frequency and then sort alphabetically within each frequency in Ruby?

  6. 6

    SQL frequency count

  7. 7

    Count frequency of words and string

  8. 8

    Kafka topic partitions to Spark streaming

  9. 9

    Cannot reassign kafka topic partition

  10. 10

    Kafka Streams - missing source topic

  11. 11

    Kafka MirrorMaker - Deletion of a topic is not replicated

  12. 12

    Is it possible to specify a kafka topic in a Kafka Connect config?

  13. 13

    Kafka Stream: Consumer commit frequency

  14. 14

    How to count occurrences for each value in MultiMap ? (java)

  15. 15

    How to count an occurance of a specific value in each partition?

  16. 16

    How to count nonblank values in each dataframe row

  17. 17

    How to count the number of a specific character in each line?

  18. 18

    WARN Error while fetching metadata with correlation id 1 : {MY_TOPIC?=INVALID_TOPIC_EXCEPTION} (org.apache.kafka.clients.NetworkClient)

  19. 19

    Kafka Consumer get assigned partitions for a specific topic

  20. 20

    Kafka topic creation best-practice

  21. 21

    C# Unable to consume message on Kafka topic?

  22. 22

    Kafka topic data is not getting deleted in windows

  23. 23

    Custom compaction for Kafka topic on the broker side?

  24. 24

    How to add the cluster ID to each component (igraph)

  25. 25

    How to vectorize a function for each ID# in pandas

  26. 26

    How to find the statistical mode of each ID

  27. 27

    Message pre-processing (topic - topic) - Kafka Connect API vs. Streams vs Kafka Consumer?

  28. 28

    generate field-value frequency count with jq

  29. 29

    How to count the values corresponding to each unique value in another column in a dataframe?

ホットタグ

アーカイブ