我正在使用反应堆卡夫卡发送卡夫卡消息并接收和处理它们。在接收kakfa有效负载时,我会进行反序列化,并且如果有异常,我只想记录该有效负载(通过保存到mongo),然后继续接收其他有效负载。
为此,我正在使用以下方法-
@EventListener(ApplicationStartedEvent.class)
public void kafkaReceiving() {
for(Flux<ReceiverRecord<String, Object>> flux: kafkaService.getFluxReceives()) {
flux.delayUntil(//some function to do something)
.doOnNext(r -> r.receiverOffset().acknowledge())
.onErrorResume(this::handleException()) // here I'll just save to mongo
.subscribe();
}
}
private Publisher<? extends ReceiverRecord<String,Object>> handleException(object ex) {
// save to mongo
return Flux.empty();
}
在这里,我希望每当接收有效负载时遇到异常时,onErrorResume都应捕获该异常并登录到mongo,然后当我发送到kafka队列时,我应该继续接收更多消息。但是,我看到在异常之后,即使调用了onErrorResume方法,但我也无法处理发送给Kakfa主题的消息。我在这里可能会缺少任何东西吗?
就像@bsideup提到的那样,我最终也没有从反序列化器中抛出异常,因为kafka无法为该记录提交偏移量,并且没有一种干净的方法来忽略该记录并继续使用记录因为我们没有记录的偏移量信息(因为其格式错误)。因此,即使我尝试使用反应式错误运算符忽略记录,民意调查也会获取相同的记录,从而使使用者陷入困境
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句