重试Deadly Queue for Kafka中的消息的最佳实践是什么

阿鲁普

我们正在将Kafka用作微服务之间的消息传递系统。我们有一个kafka使用者,他正在听一个特定的主题,然后将数据发布到另一个主题,由Kafka Connector负责,后者负责将其发布到某些数据存储中。

我们使用Apache Avro作为序列化机制。

我们需要使DLQ能够为Kafka Consumer和Kafka Connector添加容错功能。

由于多种原因,任何消息都可能移至DLQ:

  1. 格式错误
  2. 不良数据
  3. 通过大量消息进行限制,因此某些消息可能移至DLQ
  4. 由于连通性,发布到数据存储失败。

对于上述第3点和第4点,我们想再次尝试来自DLQ的消息。

相同的最佳做法是什么。请指教。

斯文德

仅推送到导致不可重试错误的DLQ记录,即:示例中的点1(错误格式)和点2(错误数据)。对于DLQ记录的格式,一种好的方法是:

  • 将kafka记录值和键推入DLQ,使其与原始记录完全相同,请勿将其包装在任何信封中。这使得在故障排除过程中使用其他工具进行重新处理变得更加容易(例如,使用新版本的解串器等)。
  • 添加一堆Kafka标头来传达有关该错误的元数据,一些典型示例如下:
    • 该记录的原始主题名称,分区,偏移量和Kafka时间戳
    • 异常或错误消息
    • 无法处理该记录的应用程序的名称和版本
    • 错误时间

通常,每个服务或应用程序使用一个DLQ主题(而不是每个入站主题使用一个,不是跨服务共享一个)。这倾向于使事情保持独立和可管理。

哦,您可能想对DLQ主题的入站流量进行一些监视和警报;)

恕我直言,第3点(高音量)应该通过某种自动缩放而不是DLQ来处理。尝试始终高估(一点)输入主题的分区数,因为可以启动服务的最大实例数受此限制。消息数量过多不会使您的服务超载,因为Kafka使用者在决定时会明确轮询更多消息,因此他们所要求的永远不会超过应用程序可以处理的数量。如果出现消息高峰,将会发生什么,只是它们将继续堆积在上游kafka主题中。

由于错误是暂时的,因此应该直接从源主题重试第4点(连接性),而无需涉及任何DLQ。将消息拖放到DLQ并接收下一个消息不会解决任何问题,因为好吧,连接性问题仍然存在,下一条消息也可能会被丢弃。读取或不读取来自Kafka的记录并不会使其消失,因此存储在该处的记录以后很容易再次读取。仅当服务成功将结果记录写入出站主题时,才可以对服务进行编程以前进至下一个入站记录(请参阅Kafka事务:读取主题实际上涉及写操作) 操作,因为新的使用者偏移量需要保留,因此您可以告诉您的程序将新的偏移量和输出记录保留为同一原子事务的一部分)。

与消息队列相比,Kafka更像是一个存储系统(仅执行2个操作:顺序读取和顺序写入),它擅长于持久性,数据复制,吞吐量,规模...(...和炒作;))。如“事件源”中那样,它对于将数据表示为事件序列确实非常有用。如果此微服务设置的需求主要是异步点对点消息传递,并且如果大多数情况宁愿使用超低延迟并选择丢弃消息而不是重新处理旧消息(如所列4点所示),则可能是像Redis队列这样的有损内存排队系统是否更合适?

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

使用 Retrofit 时重试机制的最佳实践是什么?

来自分类Dev

处理 RxSwift 重试和错误处理的最佳实践是什么

来自分类Dev

构建异常消息的最佳实践是什么?

来自分类Dev

Kafka中的Worker Queue选项

来自分类Dev

重试每条在Kafka中失败的已消耗消息的退避时间

来自分类Dev

Fopen中重试的最佳控制

来自分类Dev

Vuejs中mixin的最佳实践是什么?

来自分类Dev

在C ++中定义的最佳实践是什么?

来自分类Dev

从C#中的方法返回值或错误消息的最佳实践是什么?

来自分类Dev

在Python / Django中存储UI消息字符串的最佳实践是什么?

来自分类Dev

重试KafkaStreams拓扑中的消息

来自分类Dev

使Kafka消费者在python中存活的最佳实践是什么?

来自分类Dev

用queue()更新图表的模式是什么?

来自分类Dev

用queue()更新图表的模式是什么?

来自分类Dev

在javascript中,该函数返回promise并重试内部异步过程的最佳实践

来自分类Dev

Scrapy:收集重试消息

来自分类Dev

从JBoss JMS Queue检索消息

来自分类Dev

什么是gevent.queue.Channel?

来自分类Dev

AWS Simple Queue Service(SQS)可见性超时-设置最大重试次数?

来自分类Dev

Queue()函数在Chisel中起什么作用?

来自分类Dev

AWS Lambda中的重试

来自分类Dev

在rxJava中重试

来自分类Dev

重试mysql中的死锁

来自分类Dev

实体框架中多个“包含”的最佳实践是什么?

来自分类Dev

在CloudKit中获取CKReferences的最佳实践方法是什么?

来自分类Dev

在MVC中处理静态内容的最佳实践是什么

来自分类Dev

Matlab中浮点比较的最佳实践是什么?

来自分类Dev

MySQL中时区处理的最佳实践是什么?

来自分类Dev

在OData中执行“稳定分页”的最佳实践是什么?

Related 相关文章

热门标签

归档