我建立了一个由1个生产者和1个消费者使用的Kafka解决方案,并验证了所有连接都是正确的(我可以产生消息并使用它们)。ZK Server和Kakfa Server已启动且稳定。
如前所述,我的问题是,使用者可以从停止的地方读取消息,但是只能读取在开始读取之前创建的消息。此后,直到我杀死使用者并重新启动他之后,才会读取新消息。
相关的消费者Scala代码
val consumer = Consumer.create(new ConsumerConfig(readConsumerPropertiesFromConfig))
val filterSpec = new Whitelist("some-valid-topic")
val stream: KafkaStream[String, String] =
consumer.createMessageStreamsByFilter(filterSpec, 1, new StringDecoder, new StringDecoder).head
log.info(s"Consumer started. Listening to topics [$filterSpec].")
def read() = stream map digest
摘要在其中使用MessageAndMetadata并从中获得乐趣
def digest(messageAndMeta: MessageAndMetadata[String, String]) = {
log.info(s"processing the message [$messageAndMeta]")
属性是
properties.put("group.id", "default_consumer_group")
properties.put("zookeeper.connect", "localhost:2181")
properties.put("auto.offset.reset", "smallest")
properties.put("consumer.timeout.ms", 2000)
我可以复制的时间表
有什么想法吗?谢谢。
问题是我忽略了使我的消费者崩溃的ConsumerTimeoutException,并将其误认为是“消费者永远悬挂”。
从有关使用者配置的文档中:
默认情况下,此值为-1,如果没有新消息可供使用,则使用者将无限期阻塞。
我将其设置为若干秒,然后将其抛出。通过将其设置为-1,我可以获得所需的行为,尽管(针对我的用例)理想的解决方案是按照此项目的方式实施一些操作:https : //github.com/kciesielski/reactive-kafka
希望它可以帮助别人。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句