在Apache Kafka中延迟使用消费者的消息

安基塔

我正在使用Kafka 0.8.0并尝试实现以下提到的方案。

JCA API(充当生产者并向其发送数据)--->消费者------> HBase

使用JCA客户端获取数据后,我便立即将每条消息发送给使用者。例如,一旦生产者发送消息号1,我就想从消费者那里获取相同的消息并“放入” HBase。但是我的使用者在经过一些随机的n条消息后开始获取消息。我想使生产者和消费者同步,以便他们两者开始一起工作。

我用过了:

1个经纪人

1个主题

1个单一生产者和高级消费者

谁能建议我需要做些什么来达到同样的目的?

编辑:

添加一些相关的代码片段。

消费者.java

public class Consumer extends Thread {
    private final ConsumerConnector consumer;
    private final String topic;
    PrintWriter pw = null;
    int t = 0;
    StringDecoder kd = new StringDecoder(null);
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    Map<String, List<KafkaStream<String, Signal>>> consumerMap;
    KafkaStream<String, Signal> stream;
    ConsumerIterator<String, Signal> it;

    public Consumer(String topic) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());

        this.topic = topic;
        topicCountMap.put(topic, new Integer(1));
        consumerMap = consumer.createMessageStreams(topicCountMap, kd, new Serializer(
                new VerifiableProperties()));
        stream = consumerMap.get(topic).get(0);
        it = stream.iterator();

    }

    private static ConsumerConfig createConsumerConfig() {
        Properties props = new Properties();
        props.put("zookeeper.connect", KafkaProperties.zkConnect);
        props.put("group.id", KafkaProperties.groupId);
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("fetch.size", "1024");

        return new ConsumerConfig(props);

    }

    synchronized public void run() {

        while (it.hasNext()) {
            t = (it.next().message()).getChannelid();
            System.out.println("In Consumer received msg" + t);
        }
    }
}

生产者

public class Producer {
    public final kafka.javaapi.producer.Producer<String, Signal> producer;
    private final String topic;
    private final Properties props = new Properties();

    public Producer(String topic)
    {
        props.put("serializer.class", "org.bigdata.kafka.Serializer");
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");
        props.put("metadata.broker.list", "localhost:9092");
        // Use random partitioner. Don't need the key type. Just set it to Integer.
        // The message is of type userdefined Object .
        producer = new kafka.javaapi.producer.Producer<String,Signal(newProducerConfig(props));
        this.topic = topic;
    }
}

KafkaProperties.java

public interface KafkaProperties {
    final static String zkConnect = "127.0.0.1:2181";
    final static String groupId = "group1";
    final static String topic = "test00";
    final static String kafkaServerURL = "localhost";
    final static int kafkaServerPort = 9092;
    final static int kafkaProducerBufferSize = 64 * 1024;
    final static int connectionTimeOut = 100000;
    final static int reconnectInterval = 10000;
    final static String clientId = "SimpleConsumerDemoClient";
}

消费者对前10条消息的行为方式是这样的,它不会使消费者收到的消息失效,但是从第11条消息开始,它将开始正常运行。

     producer sending msg1

     producer sending msg2

     producer sending msg3

     producer sending msg4

     producer sending msg5

     producer sending msg6

     producer sending msg7

     producer sending msg8

     producer sending msg9

     producer sending msg10

     producer sending msg11

     producer sending msg12
     In Consumer received msg12

     producer sending msg13
     In Consumer received msg13

     producer sending msg14
     In Consumer received msg14

     producer sending msg15
     In Consumer received msg15

     producer sending msg16
     In Consumer received msg16

     producer sending msg17
     In Consumer received msg17

     producer sending msg18
     In Consumer received msg18

     producer sending msg19
     In Consumer received msg19

     producer sending msg20
     In Consumer received msg20

     producer sending msg21
     In Consumer received msg21

编辑:在生产者向消费者发送消息的地方添加侦听器功能。我正在使用默认的生产者配置没有覆盖它

public synchronized void onValueChanged(final MonitorEvent event_) {


    // Get the value from the DBR
    try {
        final DBR dbr = event_.getDBR();

        final String[] val = (String[]) dbr.getValue();

        producer1.producer.send(new KeyedMessage<String, Signal>         
                    (KafkaProperties.topic,new Signal(messageNo)));
        System.out.println("producer sending msg"+messageNo);

        messageNo++;


    } catch (Exception ex) {
        ex.printStackTrace();
    }
}
德米特里
  1. 尝试添加props.put("request.required.acks", "1")到生产者配置。默认情况下,生产者不等待确认,也不保证消息传递。因此,如果在测试之前就启动代理,则生产者可能会在代理完全初始化之前开始发送消息,并且前几条消息可能会丢失。

  2. 尝试添加props.put("auto.offset.reset", "smallest")到使用者配置。它等于--from-beginningkafka-console-consumer.sh的选项。如果您的使用者启动时间比生产者晚,并且Zookeeper中没有保存偏移量数据,则默认情况下它将仅使用新消息(请参阅文档中的使用者配置)。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

轮询前使用本地Apache Kafka消费者api过滤消息

来自分类Dev

消费者组中的Apache Beam KafkaIO消费者正在阅读相同的消息

来自分类Dev

Apache kafka高级消费者-了解

来自分类Dev

Apache kafka 消费者停止和启动之间丢失的消息

来自分类Dev

Apache Kafka 消费者-生产者混淆

来自分类Dev

Apache Kafka 2.0 获得消费者滞后

来自分类Dev

Apache Nifi - 如何指定分区以从 Kafka 消费者中读取数据

来自分类Dev

Apache Kafka- 用于由单个消费者从同一主题的不同分区提取消息的算法/策略

来自分类Dev

Seda在Apache骆驼中的并发消费者

来自分类Dev

Apache Kafka Streams API 或生产者/消费者 API

来自分类Dev

Apache Kafka 消费者组和在 Kubernetes 上运行的微服务,它们兼容吗?

来自分类Dev

kafka如何确定单个消费者组中的哪个消费者阅读消息?

来自分类Dev

我们可以在 apache kafka 消费者中从 spring boot rest api 应用程序调用服务方法吗?

来自分类Dev

通过Apache Camel从Kafka消费Avro消息

来自分类Dev

Apache Camel Producer消费者术语难题

来自分类Dev

分析来自 Kafka 消费者的消息

来自分类Dev

Tomcat 中的 Kafka 消费者关闭

来自分类Dev

Kafka 中的 LogCompaction 和消费者

来自分类Dev

Spring Boot Kafka消息消费者和丢失的消息

来自分类Dev

在Apache Active MQ的消费者端获取发布者详细信息

来自分类Dev

在Kafka中,如何让消费者从本地分区消费?

来自分类Dev

Kafka没有向消费者发送足够的消息

来自分类Dev

Kafka消费者group.id多条消息

来自分类Dev

kafka 消费者没有显示消息?

来自分类Dev

Kafka消费者轮询最新消息

来自分类Dev

消费者运行时在Confluent.Kafka中使用主题消息

来自分类Dev

如何使用Apache Kafka实现延迟排队?

来自分类Dev

Apache的卡夫卡得到关于特定主题的消费者名单

来自分类Dev

Kafka使用者在kafka.apache.org上运行示例时未收到消息

Related 相关文章

  1. 1

    轮询前使用本地Apache Kafka消费者api过滤消息

  2. 2

    消费者组中的Apache Beam KafkaIO消费者正在阅读相同的消息

  3. 3

    Apache kafka高级消费者-了解

  4. 4

    Apache kafka 消费者停止和启动之间丢失的消息

  5. 5

    Apache Kafka 消费者-生产者混淆

  6. 6

    Apache Kafka 2.0 获得消费者滞后

  7. 7

    Apache Nifi - 如何指定分区以从 Kafka 消费者中读取数据

  8. 8

    Apache Kafka- 用于由单个消费者从同一主题的不同分区提取消息的算法/策略

  9. 9

    Seda在Apache骆驼中的并发消费者

  10. 10

    Apache Kafka Streams API 或生产者/消费者 API

  11. 11

    Apache Kafka 消费者组和在 Kubernetes 上运行的微服务,它们兼容吗?

  12. 12

    kafka如何确定单个消费者组中的哪个消费者阅读消息?

  13. 13

    我们可以在 apache kafka 消费者中从 spring boot rest api 应用程序调用服务方法吗?

  14. 14

    通过Apache Camel从Kafka消费Avro消息

  15. 15

    Apache Camel Producer消费者术语难题

  16. 16

    分析来自 Kafka 消费者的消息

  17. 17

    Tomcat 中的 Kafka 消费者关闭

  18. 18

    Kafka 中的 LogCompaction 和消费者

  19. 19

    Spring Boot Kafka消息消费者和丢失的消息

  20. 20

    在Apache Active MQ的消费者端获取发布者详细信息

  21. 21

    在Kafka中,如何让消费者从本地分区消费?

  22. 22

    Kafka没有向消费者发送足够的消息

  23. 23

    Kafka消费者group.id多条消息

  24. 24

    kafka 消费者没有显示消息?

  25. 25

    Kafka消费者轮询最新消息

  26. 26

    消费者运行时在Confluent.Kafka中使用主题消息

  27. 27

    如何使用Apache Kafka实现延迟排队?

  28. 28

    Apache的卡夫卡得到关于特定主题的消费者名单

  29. 29

    Kafka使用者在kafka.apache.org上运行示例时未收到消息

热门标签

归档