我对Kafka的Spark结构化流媒体有疑问。假设我正在执行紧急工作,并且一切正常。一天,我的火花作业失败,原因是送入kafka的数据不一致。不一致可能是诸如数据格式问题或无法处理的垃圾字符之类的东西。在这种情况下,我们如何解决该问题?有没有办法进入kafka主题并手动更改数据?
如果我们不解决数据问题并重新启动Spark作业,它将读取导致失败的同一行,因为我们尚未提交检查点。所以我们如何摆脱这个循环。如何修复Kafka主题中的数据问题以恢复中止的Spark作业?
除非您真的知道自己在做什么,否则我将避免尝试在Kafka主题中手动更改一条消息。
为防止将来发生这种情况,您可能要考虑对数据使用架构(与架构注册表结合)。
为了缓解您描述的问题,我看到以下选项:
使用Sparks结构化流时,Spark会自动设置使用者组。根据代码,消费者组将定义为:
val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
您可以使用该kafka-consumer-groups
工具更改偏移量。首先通过以下方式确定消费者组的实际名称
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
然后为特定主题的该消费者组设置偏移量(例如,偏移量100)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --execute --reset-offsets --group spark-kafka-source-1337 --topic topic1 --to-offset 100
如果您只需要更改特定分区的偏移量,则可以查看该工具的帮助功能,了解如何执行此操作。
您可以startingOffsets
按照Spark + Kafka集成指南中的说明使用Spark选项:
选项: startingOffsets
值: “最早”,“最新”(仅用于流)或json字符串“”“ {” topicA“:{” 0“:23,” 1“:-1},” topicB“:{” 0“:- 2}}“”“
默认值: “最新”用于流式传输,“最早”用于批处理
含义:查询开始的起点,或者是从最早的偏移量开始的“最早”,从最新的偏移量开始的“最新”,或者是为每个TopicPartition指定起始偏移量的json字符串。在json中,可使用-2作为偏移量来指代最早的,-1到最新的。注意:对于批查询,不允许最新(隐式或在json中使用-1)。对于流查询,这仅在启动新查询时适用,并且恢复将始终从查询中断的地方开始。查询期间新发现的分区最早将开始。
为此,重要的是要有一个“新”查询。这意味着您需要删除现有作业的检查点文件或创建完整的新应用程序。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句