使用Shopify Sarama的Kafka错误处理

梦m以求的

因此,我尝试将Kafka用于我的应用程序,该应用程序的生产者将操作记录到Kafka MQ中,而使用者将其从MQ读取。由于我的应用程序位于Go中,因此我使用Shopify Sarama来实现这一点。

现在,我可以读取MQ并使用以下命令打印消息内容

fmt.Printf

Howeveer,我真的希望错误处理要好于控制台打印,并且我愿意付出更多努力。

现在用于消费者连接的代码:

mqCfg := sarama.NewConfig()

master, err := sarama.NewConsumer([]string{brokerConnect}, mqCfg)
if err != nil {
    panic(err) // Don't want to panic when error occurs, instead handle it
}

以及消息的处理:

    go func() {
    defer wg.Done()
    for message := range consumer.Messages() {
        var msgContent Message
        _ = json.Unmarshal(message.Value, &msgContent)
        fmt.Printf("Reading message of type %s with id : %d\n", msgContent.Type, msgContent.ContentId) //Don't want to print it
    }
}()

我的问题(我是测试Kafka的新手,而对于kafka则是新手):

  1. 上面的程序中哪里可能会发生错误,以便我进行处理?任何示例代码对我来说都是很棒的。我可能想到的错误条件是,当msgContent在JSON中实际上不包含任何Type of ContentId字段时。

  2. 在kafka中,是否存在使用者试图以当前偏移量读取但由于某种原因而无法读取的情况(即使JSON格式正确)?我的消费者是否有可能回溯说失败的偏移量以上x步并重新处理偏移量?还是有更好的方法来做到这一点?同样,这些情况可能是什么?

我愿意阅读和尝试事物。

嗅探器

关于1)检查下面我在哪里记录错误消息。这或多或少是我要做的。

关于2),我不了解尝试向后退的话题。只需反复创建一个消费者,其起始偏移量每次减一个,就很有可能。但我不建议这样做,因为您很可能最终会一遍又一遍地重放相同的消息。我建议您经常保存偏移量,以便在情况向南的情况下可以恢复。

以下是我认为可以解决您大多数问题的代码。我没有尝试编译它。而且sarama api最近有所更改,因此api当前可能有所不同。

func StartKafkaReader(wg *sync.WaitGroup, lastgoodoff int64, out chan<- *Message) (error) {
    wg.Add(1)
    go func(){
        defer wg.Done()
        //to track the last known good offset we processed, which is 
        // updated after each successfully processed event. 
        saveprogress := func(off int64){
            //Save the offset somewhere...a file... 
            //Ive also used kafka to store progress 
            //using a special topic as a WAL
        }
        defer saveprogress(lastgoodoffset)

        client, err := sarama.NewClient("clientId", brokers, sarama.NewClientConfig())
        if err != nil {
            log.Error(err)
            return
        }
        defer client.Close()
        sarama.NewConsumerConfig()
        consumerConfig.OffsetMethod = sarama.OffsetMethodManual
        consumerConfig.OffsetValue = int64(lastgoodoff)
        consumer, err := sarama.NewConsumer(client, topic, partition, "consumerId", consumerConfig)
        if err != nil {
            log.Error(err)
            return
        }
        defer consumer.Close()
        for {
            select {
            case event := <-consumer.Events():
                if event.Err != nil {
                    log.Error(event.Err)
                    return
                }
                msgContent := &Message{}
                err = json.Unmarshal(message.Value, msgContent)
                if err != nil {
                    log.Error(err)
                    continue //continue to skip this message or return to stop without updating the offset.
                }
                // Send the message on to be processed.
                out <- msgContent 

                lastgoodoff = event.Offset
            }
        }
    }()
}

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

Spring Kafka-使用哪个批处理错误处理程序?

来自分类Dev

如何使用Bluebird错误处理程序?

来自分类Dev

使用请求时的 Python 错误处理

来自分类Dev

Spring JavaConfig:使用WebApplicationInitializer进行错误处理

来自分类Dev

如何使用页面错误处理程序映射页面?

来自分类常见问题

使用node.js流进行错误处理

来自分类Dev

使用Multer与ExpressJS上传文件时的错误处理

来自分类Dev

使用Angular2异步管道进行错误处理

来自分类Dev

使用期货在Scala中进行错误处理

来自分类Dev

使用try语句的data.table和错误处理

来自分类Dev

使用Range.Find()VBA进行错误处理

来自分类Dev

fpinscala书:使用选项类型的错误处理

来自分类Dev

使用外部DLL时PL / I中的错误处理?

来自分类Dev

在Laravel中使用try和catch进行错误处理

来自分类Dev

Ajax错误处理无法与CORS一起使用

来自分类Dev

Swift 2中使用Alamofire进行错误处理

来自分类Dev

如何使用RSpec测试Mongo连接错误处理

来自分类Dev

使用Promises进行猫鼬错误处理

来自分类Dev

如何使用express.Router实例进行错误处理

来自分类Dev

使用Observable SelectMany的Reactive Extensions错误处理

来自分类Dev

在C中使用Scala的Option或Rust的Result错误处理

来自分类Dev

在Django中使用try除外进行错误处理

来自分类Dev

在Swift中使用JSONDecoder进行错误处理

来自分类Dev

Powershell:使用-asjob参数时的错误处理

来自分类Dev

如何使用页面错误处理程序映射页面?

来自分类Dev

使用Swift语法进行块中的错误处理

来自分类Dev

在存储过程中使用错误处理

来自分类Dev

混合使用MVC和Web API错误处理

来自分类Dev

在 C 中使用 goto 进行错误处理的奇怪行为

Related 相关文章

热门标签

归档