Kafka Producer Java API未将消息分发到所有主题分区

拉胡尔·古普塔(Rahul Gupta)

我是Kafka的新手,今天我尝试创建Java Producer,以在不同分区上产生有关Kafka主题的消息。

首先,我创建了一个程序包,raggieKafka在该程序包下创建了2个类:TestProducerSimplePartitioner

TestProducer类具有以下代码:

package raggieKafka;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.*;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class TestProducer{

    public static void main(String args[]) throws Exception
    {
        long events = 0;

        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        events = Integer.parseInt(reader.readLine());
        Random rnd = new Random();

        Properties props = new Properties();
        props.put("metadata.broker.list", "localhost:9092");
        props.put("topic.metadata.refresh.interval.ms", "1");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("partitioner.class", "raggieKafka.SimplePartitioner");
        props.put("request.required.acks", "1");

        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> prod = new Producer<String, String>(config);

        for(long i = 0; i < events; i++)
        {
            long runtime = new Date().getTime();
            String ip = "192.168.2." + rnd.nextInt(255);
            String msg = runtime + ",www.example.com, " + ip;
            KeyedMessage<String,String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
            prod.send(data);
        }
        prod.close();
    }
}

并且SimplePartitioner类具有以下代码:

package raggieKafka;

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class SimplePartitioner implements Partitioner{

    public SimplePartitioner(VerifiableProperties props)
    {

    }

    public int partition(Object Key, int a_numPartitions)
    {
        int partition = 0;
        String stringKey = (String) Key;
        int offset = stringKey.indexOf(stringKey);

        if(offset > 0)
        {
            partition = Integer.parseInt(stringKey.substring(offset+1)) % a_numPartitions;
        }
        return partition;
    }   
}

在编译这些Java程序之前,我在Kafka Broker上创建了主题:

C:\kafka_2.11-0.9.0.1>.\bin\windows\kafka-topics.bat --create --topic page_visit
s --zookeeper localhost:2181 --partitions 5 --replication-factor 1
WARNING: Due to limitations in metric names, topics with a period ('.') or under
score ('_') could collide. To avoid issues it is best to use either, but not bot
h.
Created topic "page_visits".

现在,当我编译Java程序时,它将所有消息仅放入1个分区,即page_visits-0,在该分区下发布所有消息,而其余所有其他分区保持为空。

有人可以告诉我为什么我的Java生产者不将我所有的消息分发到其他分区吗?

实际上,我查看了google,然后又添加了一个属性:

props.put("topic.metadata.refresh.interval.ms", "1");

但Producer仍未针对所有主题生成消息。

请帮忙。

四月

您的SimplePartitioner代码在以下行中有bug

int offset = stringKey.indexOf(stringKey);

它总是返回,0因此您的偏移量始终等于,0并且它永远不会大于0,因此if块将不会执行。最后,它总是返回您的partition 0

解决方案:由于您的密钥是IP地址,因此以下更改可能会按预期进行。

int offset = stringKey.lastIndexOf('.');

希望这可以帮助!

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

Java/Scala Kafka Producer 不向主题发送消息

来自分类Dev

Kafka Java Producer和kerberos

来自分类Dev

Kafka Console Producer发送消息失败

来自分类Dev

Kafka Producer消费者API问题

来自分类Dev

perl Kafka :: Producer,“ Kafka :: Exception :: Producer”,“代码”,-1000,“消息”,“无效的参数

来自分类Dev

Kafka Producer(具有多个实例)写入同一主题

来自分类Dev

Kafka:Java Producer发送消息后,在控制台使用者上未看到任何消息

来自分类Dev

来自Kafka Producer的过多控制台消息

来自分类Dev

Kafka Producer:发送消息后断开连接与保持连接打开

来自分类Dev

Kafka docs Producer可能会丢失消息

来自分类Dev

来自Kafka Producer的过多控制台消息

来自分类Dev

Kafka Producer无法验证没有PK的记录并返回InvalidRecordException

来自分类Dev

Apache Kafka 0.9 Java API消耗主题开头的所有消息

来自分类Dev

Spring-Kafka:如何在Producer Kafka中插入application.yml主题

来自分类Dev

如何使用kafka-console-producer将消息生成到所选分区?

来自分类Dev

Apache Kafka Producer配置错误

来自分类Dev

如何在Maven项目中将依赖项org.apache.kafka.clients.producer.Producer解析为apache kafka的Java生产者

来自分类Dev

Kafka Producer的Java客户端示例,send方法不接受KeyedMessage

来自分类Dev

Java Apache Kafka Producer元数据更新程序和重试逻辑

来自分类Dev

引起:java.io.NotSerializableException:org.apache.kafka.clients.producer.KafkaProducer

来自分类Dev

Kafka Producer Thread,即使没有发送消息,也有大量线程

来自分类Dev

当所有经纪人都关闭时,Spring-Kafka Producer重试

来自分类Dev

Apache Kafka列出所有主题

来自分类Dev

Kafka producer.send()被producer.close()停止

来自分类Dev

Kafka 2.9.2-0.8.1.1没有producer.send的KeyedMessage参数

来自分类Dev

使用java向kafka主题发送消息

来自分类Dev

通过不同的线程使用Kafka Producer

来自分类Dev

如何降低Kafka Producer的写入速度?

来自分类Dev

如何为Kafka Producer示例修复NoClassDefFoundError?

Related 相关文章

  1. 1

    Java/Scala Kafka Producer 不向主题发送消息

  2. 2

    Kafka Java Producer和kerberos

  3. 3

    Kafka Console Producer发送消息失败

  4. 4

    Kafka Producer消费者API问题

  5. 5

    perl Kafka :: Producer,“ Kafka :: Exception :: Producer”,“代码”,-1000,“消息”,“无效的参数

  6. 6

    Kafka Producer(具有多个实例)写入同一主题

  7. 7

    Kafka:Java Producer发送消息后,在控制台使用者上未看到任何消息

  8. 8

    来自Kafka Producer的过多控制台消息

  9. 9

    Kafka Producer:发送消息后断开连接与保持连接打开

  10. 10

    Kafka docs Producer可能会丢失消息

  11. 11

    来自Kafka Producer的过多控制台消息

  12. 12

    Kafka Producer无法验证没有PK的记录并返回InvalidRecordException

  13. 13

    Apache Kafka 0.9 Java API消耗主题开头的所有消息

  14. 14

    Spring-Kafka:如何在Producer Kafka中插入application.yml主题

  15. 15

    如何使用kafka-console-producer将消息生成到所选分区?

  16. 16

    Apache Kafka Producer配置错误

  17. 17

    如何在Maven项目中将依赖项org.apache.kafka.clients.producer.Producer解析为apache kafka的Java生产者

  18. 18

    Kafka Producer的Java客户端示例,send方法不接受KeyedMessage

  19. 19

    Java Apache Kafka Producer元数据更新程序和重试逻辑

  20. 20

    引起:java.io.NotSerializableException:org.apache.kafka.clients.producer.KafkaProducer

  21. 21

    Kafka Producer Thread,即使没有发送消息,也有大量线程

  22. 22

    当所有经纪人都关闭时,Spring-Kafka Producer重试

  23. 23

    Apache Kafka列出所有主题

  24. 24

    Kafka producer.send()被producer.close()停止

  25. 25

    Kafka 2.9.2-0.8.1.1没有producer.send的KeyedMessage参数

  26. 26

    使用java向kafka主题发送消息

  27. 27

    通过不同的线程使用Kafka Producer

  28. 28

    如何降低Kafka Producer的写入速度?

  29. 29

    如何为Kafka Producer示例修复NoClassDefFoundError?

热门标签

归档