使用onErrorResume使用Reactor Kafka处理发布到Kafka的有问题的有效负载

我正在使用反应堆卡夫卡发送卡夫卡消息并接收和处理它们。在接收kakfa有效负载时,我会进行反序列化,并且如果有异常,我只想记录该有效负载(通过保存到mongo),然后继续接收其他有效负载。

为此,我正在使用以下方法-

@EventListener(ApplicationStartedEvent.class)
public void kafkaReceiving() {
   for(Flux<ReceiverRecord<String, Object>> flux: kafkaService.getFluxReceives()) {
       flux.delayUntil(//some function to do something)
       .doOnNext(r -> r.receiverOffset().acknowledge())
       .onErrorResume(this::handleException()) // here I'll just save to mongo 
       .subscribe();
   }
}


private Publisher<? extends ReceiverRecord<String,Object>> handleException(object ex) {
 // save to mongo
 return Flux.empty();
}

在这里,我希望每当接收有效负载时遇到异常时,onErrorResume都应捕获该异常并登录到mongo,然后当我发送到kafka队列时,我应该继续接收更多消息。但是,我看到在异常之后,即使调用了onErrorResume方法,但我也无法处理发送给Kakfa主题的消息。我在这里可能会缺少任何东西吗?

就像@bsideup提到的那样,我最终也没有从反序列化器中抛出异常,因为kafka无法为该记录提交偏移量,并且没有一种干净的方法来忽略该记录并继续使用记录因为我们没有记录的偏移量信息(因为其格式错误)。因此,即使我尝试使用反应式错误运算符忽略记录,民意调查也会获取相同的记录,从而使使用者陷入困境

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

使用Spark结构化流(pyspark)从Kafka Connect JSONConverter消息中提取“有效负载”(模式和有效负载)

来自分类Dev

使用jQuery发布Json时,发布的数据与有效负载不同吗?

来自分类Dev

使用有效负载创建操作

来自分类Dev

通过使用scrapy中的请求有效负载来发布请求

来自分类Dev

使用 AngularJs 和 PHP 发布有效负载并接收响应

来自分类Dev

使用 XmlHttpRequest/ajax 发布表单会改变有效负载

来自分类Dev

Flink-如何与TimestampAssigner一起使用从事件有效负载中获取时间(不使用Kafka时间戳)

来自分类Dev

使用哪个REST命令从有效负载接收数据,对其进行处理并返回结果?

来自分类Dev

使用Restangular使用有效负载进行PUT / GET

来自分类Dev

推送通知使用有效负载打开正确的页面

来自分类Dev

使用nfqueue / scapy更改TCP有效负载

来自分类Dev

在POST有效负载中使用环境变量

来自分类Dev

使用apigee代理返回大有效负载

来自分类Dev

使用PHP转换WooCommerce Webhook有效负载

来自分类Dev

使用放心创建动态xml有效负载

来自分类Dev

无法使用官方gem将有效请求发布到Authorize.net终结点

来自分类Dev

Powershell:无法使用其他有效脚本将其发布到Slack

来自分类Dev

使用发布配置文件部署到 Azure:使用 Jenkins 在 VS2017 上发布失败时有效

来自分类Dev

在WSO2 ESB 4.9.0中使用有效负载工厂生成JavaScript有效负载

来自分类Dev

在kafka主题中创建多个分区,并使用kafka-node将消息发布到所有分区

来自分类Dev

在发布PubNub聊天消息有效负载时,推送通知正在接收,但是后台获取未使用pn_apns标签调用

来自分类Dev

如何使用带有DRF序列化程序的camelCase属性生成有效负载

来自分类Dev

带有 JSON 有效负载正文的 HTTP 帖子,其中包含使用 cURL 的空行

来自分类Dev

使用javascript画布进行有效的像素处理

来自分类Dev

使用Python函数有效处理DataFrame行?

来自分类Dev

使用Python函数有效处理DataFrame行?

来自分类Dev

解决范围问题以有效使用chrome.storage

来自分类Dev

使用Kafka REST代理发布Avro消息会引发“将JSON转换为Avro失败”

来自分类Dev

iOS-APNS如何在有效负载中使用loc_key

Related 相关文章

  1. 1

    使用Spark结构化流(pyspark)从Kafka Connect JSONConverter消息中提取“有效负载”(模式和有效负载)

  2. 2

    使用jQuery发布Json时,发布的数据与有效负载不同吗?

  3. 3

    使用有效负载创建操作

  4. 4

    通过使用scrapy中的请求有效负载来发布请求

  5. 5

    使用 AngularJs 和 PHP 发布有效负载并接收响应

  6. 6

    使用 XmlHttpRequest/ajax 发布表单会改变有效负载

  7. 7

    Flink-如何与TimestampAssigner一起使用从事件有效负载中获取时间(不使用Kafka时间戳)

  8. 8

    使用哪个REST命令从有效负载接收数据,对其进行处理并返回结果?

  9. 9

    使用Restangular使用有效负载进行PUT / GET

  10. 10

    推送通知使用有效负载打开正确的页面

  11. 11

    使用nfqueue / scapy更改TCP有效负载

  12. 12

    在POST有效负载中使用环境变量

  13. 13

    使用apigee代理返回大有效负载

  14. 14

    使用PHP转换WooCommerce Webhook有效负载

  15. 15

    使用放心创建动态xml有效负载

  16. 16

    无法使用官方gem将有效请求发布到Authorize.net终结点

  17. 17

    Powershell:无法使用其他有效脚本将其发布到Slack

  18. 18

    使用发布配置文件部署到 Azure:使用 Jenkins 在 VS2017 上发布失败时有效

  19. 19

    在WSO2 ESB 4.9.0中使用有效负载工厂生成JavaScript有效负载

  20. 20

    在kafka主题中创建多个分区,并使用kafka-node将消息发布到所有分区

  21. 21

    在发布PubNub聊天消息有效负载时,推送通知正在接收,但是后台获取未使用pn_apns标签调用

  22. 22

    如何使用带有DRF序列化程序的camelCase属性生成有效负载

  23. 23

    带有 JSON 有效负载正文的 HTTP 帖子,其中包含使用 cURL 的空行

  24. 24

    使用javascript画布进行有效的像素处理

  25. 25

    使用Python函数有效处理DataFrame行?

  26. 26

    使用Python函数有效处理DataFrame行?

  27. 27

    解决范围问题以有效使用chrome.storage

  28. 28

    使用Kafka REST代理发布Avro消息会引发“将JSON转换为Avro失败”

  29. 29

    iOS-APNS如何在有效负载中使用loc_key

热门标签

归档