Kafka Consumer会先阅读直到开始,然后永远挂起

g

我建立了一个由1个生产者和1个消费者使用的Kafka解决方案,并验证了所有连接都是正确的(我可以产生消息并使用它们)。ZK Server和Kakfa Server已启动且稳定。

如前所述,我的问题是,使用者可以从停止的地方读取消息,但是只能读取在开始读取之前创建的消息。此后,直到我杀死使用者并重新启动他之后,才会读取新消息。

相关的消费者Scala代码

  val consumer = Consumer.create(new ConsumerConfig(readConsumerPropertiesFromConfig))
  val filterSpec = new Whitelist("some-valid-topic")

  val stream: KafkaStream[String, String] =
    consumer.createMessageStreamsByFilter(filterSpec, 1, new StringDecoder, new StringDecoder).head

  log.info(s"Consumer started. Listening to topics [$filterSpec].")

  def read() = stream map digest

摘要在其中使用MessageAndMetadata并从中获得乐趣

def digest(messageAndMeta: MessageAndMetadata[String, String]) = {
    log.info(s"processing the message [$messageAndMeta]")

属性是

properties.put("group.id", "default_consumer_group")
properties.put("zookeeper.connect", "localhost:2181")
properties.put("auto.offset.reset", "smallest")
properties.put("consumer.timeout.ms", 2000)

我可以复制的时间表

  • 产生5条信息
  • 开始消费
  • 消费者阅读5条消息
  • 再产生15条消息
  • 消费者忽略新消息并永久挂起
  • 杀死并重启消费者
  • 消费者阅读15条消息,然后再次挂起

有什么想法吗?谢谢。

g

问题是我忽略了使我的消费者崩溃的ConsumerTimeoutException,并将其误认为是“消费者永远悬挂”。

从有关使用者配置的文档中:

默认情况下,此值为-1,如果没有新消息可供使用,则使用者将无限期阻塞。

我将其设置为若干秒,然后将其抛出。通过将其设置为-1,我可以获得所需的行为,尽管(针对我的用例)理想的解决方案是按照此项目的方式实施一些操作:https : //github.com/kciesielski/reactive-kafka

这个线程向我指出了正确的方向

希望它可以帮助别人。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

Kafka如何从__consumer_offsets主题阅读

来自分类Dev

从头开始阅读Kafka主题

来自分类Dev

Kafka Consumer poll and reconnection

来自分类Dev

C ++中的Kafka Consumer

来自分类Dev

Kafka Consumer-Java实现

来自分类Dev

Kafka Consumer的poll()方法被阻止

来自分类Dev

了解Java Kafka Consumer API

来自分类Dev

来自Kafka Consumer的Spark Streaming

来自分类Dev

整合kafka Consumer春季批次

来自分类Dev

“ Kafka Spout”和“ kafka Consumer”之间有什么区别?

来自分类Dev

Storm Kafka Spout无法阅读最后阅读

来自分类Dev

阅读Apache Kafka中的多个主题

来自分类Dev

Kafka 消费者阅读速度太慢

来自分类Dev

如何在Scala中实施Kafka Consumer

来自分类Dev

Kafka Consumer-Java(0.9 API)

来自分类Dev

Kafka Consumer挂在Java中的.hasNext

来自分类Dev

不要打印kafka-console-consumer警告

来自分类Dev

随机生成的Kafka Consumer组ID

来自分类Dev

Kafka Consumer - 随机访问获取分区范围

来自分类Dev

如何处理 Kafka Consumer 中的错误

来自分类Dev

Spring Kafka会延迟使用记录

来自分类Dev

哪个版本的 Kafka Stream 会更高效?

来自分类Dev

用kafka火花:NoSuchMethodError:org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava / util / Collection;)

来自分类Dev

kafka kafka-consumer-groups.sh --describe不返回消费者组的输出

来自分类Dev

将Spark DataFrame写入Kafka会忽略分区列和kafka.partitioner.class

来自分类Dev

Apache Spark Streaming-Kafka-阅读较旧的消息

来自分类Dev

阅读更多的Kafka主题,而不是CPU内核数

来自分类Dev

阅读Spark批处理作业中的Kafka主题

来自分类Dev

在Kafka中阅读邮件时重新平衡问题