Kafka Consumer挂在Java中的.hasNext

曼格斯兄弟

我在Java中有一个简单的Kafka Consumer,带有以下代码

    public void run() {
        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
        while (it.hasNext()&& !done){
            try {
                System.out.println("Parsing data");
                byte[] data = it.next().message();
                System.out.println("Found data: "+data);
                values.add(data); // array list
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }
        }
        done = true;
    }

发布消息后,将成功读取数据,但是当它返回检查它时。hasNext(),它将保持待处理状态,再也不会返回。

什么会拖延这个?

m_stream是通过以下方式获得的KafkaStream:

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
executor = Executors.newFixedThreadPool(a_numThreads);
for (final KafkaStream stream : streams) {
   // m_stream is one of these streams
}
曼格斯兄弟

解决方案是添加属性

“ consumer.timeout.ms”

现在,当达到超时时,将引发ConsumerTimeoutException

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

Kafka consumer stuck (in fact pauses and resumes) at iterator.hasNext() even though there are plenty of messages to consume in the topic

来自分类Dev

Java Collections hasnext()方法

来自分类Dev

kafka Java客户端不使用-只是挂在consumer.poll上

来自分类Dev

C ++中的Kafka Consumer

来自分类Dev

Java-使用ListIterator .hasNext()的无限循环

来自分类Dev

如何使用Scanner类中的hasNext()?

来自分类Dev

Scala 中的类型不匹配(hasNext 函数)

来自分类Dev

迭代的.hasNext();的用法 和.next();和 Java中的方法

来自分类Dev

Kafka Consumer-Java实现

来自分类Dev

了解Java Kafka Consumer API

来自分类常见问题

使用Scanner.hasNext()时出现Java NullPointerException

来自分类Dev

hasNext()的Java正则表达式与next的末尾匹配

来自分类Dev

尝试在Java中使用hasNext时出现错误

来自分类Dev

Java迭代器hasNext()方法始终为false

来自分类Dev

使用scanner.hasNext()时的Java NullPointerException异常;

来自分类Dev

hasNext()的Java正则表达式与next的末尾匹配

来自分类Dev

Cursor.hasNext引发java.util.NoSuchElementException

来自分类Dev

如何在 Java 中使用“Scanner.hasNext”完成程序

来自分类Dev

gRPC Java 客户端 - 在 onNext 期间 hasNext?

来自分类Dev

自动标记如何在扫描仪中退出hasNext

来自分类Dev

迭代器中的hasnext()无法按预期工作

来自分类Dev

Kafka Consumer-Java(0.9 API)

来自分类Dev

如何在Scala中实施Kafka Consumer

来自分类Dev

如何处理 Kafka Consumer 中的错误

来自分类Dev

Java System.in中的hasNext方法仅在首次调用时起作用

来自分类Dev

while(input.hasNext())内部的多个input.next()触发Java中的NoSuchElementException

来自分类Dev

Kafka Consumer poll and reconnection

来自分类Dev

尽管主题中有很多消息需要使用,但Kafka消费者仍停留在iterator.hasNext()上(实际上是暂停并恢复)

来自分类Dev

流到迭代器的Java 8迭代器导致对hasNext()的冗余调用

Related 相关文章

  1. 1

    Kafka consumer stuck (in fact pauses and resumes) at iterator.hasNext() even though there are plenty of messages to consume in the topic

  2. 2

    Java Collections hasnext()方法

  3. 3

    kafka Java客户端不使用-只是挂在consumer.poll上

  4. 4

    C ++中的Kafka Consumer

  5. 5

    Java-使用ListIterator .hasNext()的无限循环

  6. 6

    如何使用Scanner类中的hasNext()?

  7. 7

    Scala 中的类型不匹配(hasNext 函数)

  8. 8

    迭代的.hasNext();的用法 和.next();和 Java中的方法

  9. 9

    Kafka Consumer-Java实现

  10. 10

    了解Java Kafka Consumer API

  11. 11

    使用Scanner.hasNext()时出现Java NullPointerException

  12. 12

    hasNext()的Java正则表达式与next的末尾匹配

  13. 13

    尝试在Java中使用hasNext时出现错误

  14. 14

    Java迭代器hasNext()方法始终为false

  15. 15

    使用scanner.hasNext()时的Java NullPointerException异常;

  16. 16

    hasNext()的Java正则表达式与next的末尾匹配

  17. 17

    Cursor.hasNext引发java.util.NoSuchElementException

  18. 18

    如何在 Java 中使用“Scanner.hasNext”完成程序

  19. 19

    gRPC Java 客户端 - 在 onNext 期间 hasNext?

  20. 20

    自动标记如何在扫描仪中退出hasNext

  21. 21

    迭代器中的hasnext()无法按预期工作

  22. 22

    Kafka Consumer-Java(0.9 API)

  23. 23

    如何在Scala中实施Kafka Consumer

  24. 24

    如何处理 Kafka Consumer 中的错误

  25. 25

    Java System.in中的hasNext方法仅在首次调用时起作用

  26. 26

    while(input.hasNext())内部的多个input.next()触发Java中的NoSuchElementException

  27. 27

    Kafka Consumer poll and reconnection

  28. 28

    尽管主题中有很多消息需要使用,但Kafka消费者仍停留在iterator.hasNext()上(实际上是暂停并恢复)

  29. 29

    流到迭代器的Java 8迭代器导致对hasNext()的冗余调用

热门标签

归档