重新启动Spark作业时,如果馈入kafka的数据遇到意外格式,会发生什么情况

香卡

我对Kafka的Spark结构化流媒体有疑问。假设我正在执行紧急工作,并且一切正常。一天,我的火花作业失败,原因是送入kafka的数据不一致。不一致可能是诸如数据格式问题或无法处理的垃圾字符之类的东西。在这种情况下,我们如何解决该问题?有没有办法进入kafka主题并手动更改数据?

如果我们不解决数据问题并重新启动Spark作业,它将读取导致失败的同一行,因为我们尚未提交检查点。所以我们如何摆脱这个循环。如何修复Kafka主题中的数据问题以恢复中止的Spark作业?

麦克风

除非您真的知道自己在做什么,否则我将避免尝试在Kafka主题中手动更改一条消息。

为防止将来发生这种情况,您可能要考虑对数据使用架构(与架构注册表结合)。

为了缓解您描述的问题,我看到以下选项:

  • 手动更改结构化流应用程序的使用者组的偏移量
  • 创建一个“新”流作业,该作业从特定的偏移量开始读取

手动更改偏移

使用Sparks结构化流时,Spark会自动设置使用者组。根据代码,消费者组将定义为:

val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"

您可以使用该kafka-consumer-groups工具更改偏移量首先通过以下方式确定消费者组的实际名称

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

然后为特定主题的该消费者组设置偏移量(例如,偏移量100)

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --execute --reset-offsets --group spark-kafka-source-1337 --topic topic1 --to-offset 100

如果您只需要更改特定分区的偏移量,则可以查看该工具的帮助功能,了解如何执行此操作。

创建新的流作业

您可以startingOffsets按照Spark + Kafka集成指南中的说明使用Spark选项

选项: startingOffsets

值: “最早”,“最新”(仅用于流)或json字符串“”“ {” topicA“:{” 0“:23,” 1“:-1},” topicB“:{” 0“:- 2}}“”“

默认值: “最新”用于流式传输,“最早”用于批处理

含义:查询开始的起点,或者是从最早的偏移量开始的“最早”,从最新的偏移量开始的“最新”,或者是为每个TopicPartition指定起始偏移量的json字符串。在json中,可使用-2作为偏移量来指代最早的,-1到最新的。注意:对于批查询,不允许最新(隐式或在json中使用-1)。对于流查询,这仅在启动新查询时适用,并且恢复将始终从查询中断的地方开始。查询期间新发现的分区最早将开始。

为此,重要的是要有一个“新”查询。这意味着您需要删除现有作业的检查点文件或创建完整的新应用程序。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

如果您在不重新启动的情况下安装 2 个更新相同文件的更新,会发生什么情况,如果您不按顺序安装更新会发生什么情况?

来自分类Dev

Ubuntu 18.04 和 Timeshift - 如果我在 timeshift 拍摄快照时重新启动会发生什么

来自分类Dev

重新启动Gunicorn时,Django网站会发生什么?

来自分类Dev

如果shutdown命令杀死了我的统一cron作业,会发生什么情况?

来自分类Dev

在源中更新其基础数据时,结构化流中使用的Spark DataFrame会发生什么情况?

来自分类Dev

如果使用ifconfig“启动”网络接口,会发生什么情况?

来自分类Dev

如果我在Spark中两次缓存相同的RDD,会发生什么情况?

来自分类Dev

如果超过100个用户尝试连接到Firebase Spark Plan,会发生什么情况

来自分类Dev

重新分区HDD时标记为坏的扇区会发生什么情况?

来自分类Dev

如果通过SSH连接时更新sshd,会发生什么情况?

来自分类Dev

如果应用程序处于睡眠状态时执行PerformSelector会发生什么情况?

来自分类Dev

如果不将字体嵌入Flash,会发生什么情况?

来自分类Dev

如果rm在提示中途停止,会发生什么情况?

来自分类Dev

如果不关闭Docker容器,会发生什么情况?

来自分类Dev

如果在执行过程中修改了SQL Server作业步骤,会发生什么情况

来自分类Dev

当前没有wsarecv时,传入数据会发生什么情况

来自分类Dev

在部分读取时,unix流辅助数据会发生什么情况?

来自分类Dev

调用exec()时,线程会发生什么情况?

来自分类Dev

在提示中键入“ unset *”时,会发生什么情况?

来自分类Dev

在提示中键入“ unset *”时,会发生什么情况?

来自分类Dev

硬盘突然关闭时,磁头会发生什么情况?

来自分类Dev

将巨大的文件加载到hadoop集群中时,如果客户端在将数据传输到datanode时失败,会发生什么情况?

来自分类Dev

如果我使用--force标志格式化已安装的USB驱动器,会发生什么情况?

来自分类Dev

iptables:重新启动后会发生什么?

来自分类Dev

如果使用Objective C在xcode 7 +,ios 9中创建项目时忘记添加核心数据会发生什么情况

来自分类Dev

如果某些程序正在从一个文件中读取数据时删除输入文件,将会发生什么情况?

来自分类Dev

如果多个用户想要修改Oracle数据库中的相同数据块,会发生什么情况

来自分类Dev

如果在ThreadPool线程正在写入文件时关闭应用程序会发生什么情况?

来自分类Dev

如果您尝试对MongoDB数据库的锁定部分执行操作,会发生什么情况?

Related 相关文章

  1. 1

    如果您在不重新启动的情况下安装 2 个更新相同文件的更新,会发生什么情况,如果您不按顺序安装更新会发生什么情况?

  2. 2

    Ubuntu 18.04 和 Timeshift - 如果我在 timeshift 拍摄快照时重新启动会发生什么

  3. 3

    重新启动Gunicorn时,Django网站会发生什么?

  4. 4

    如果shutdown命令杀死了我的统一cron作业,会发生什么情况?

  5. 5

    在源中更新其基础数据时,结构化流中使用的Spark DataFrame会发生什么情况?

  6. 6

    如果使用ifconfig“启动”网络接口,会发生什么情况?

  7. 7

    如果我在Spark中两次缓存相同的RDD,会发生什么情况?

  8. 8

    如果超过100个用户尝试连接到Firebase Spark Plan,会发生什么情况

  9. 9

    重新分区HDD时标记为坏的扇区会发生什么情况?

  10. 10

    如果通过SSH连接时更新sshd,会发生什么情况?

  11. 11

    如果应用程序处于睡眠状态时执行PerformSelector会发生什么情况?

  12. 12

    如果不将字体嵌入Flash,会发生什么情况?

  13. 13

    如果rm在提示中途停止,会发生什么情况?

  14. 14

    如果不关闭Docker容器,会发生什么情况?

  15. 15

    如果在执行过程中修改了SQL Server作业步骤,会发生什么情况

  16. 16

    当前没有wsarecv时,传入数据会发生什么情况

  17. 17

    在部分读取时,unix流辅助数据会发生什么情况?

  18. 18

    调用exec()时,线程会发生什么情况?

  19. 19

    在提示中键入“ unset *”时,会发生什么情况?

  20. 20

    在提示中键入“ unset *”时,会发生什么情况?

  21. 21

    硬盘突然关闭时,磁头会发生什么情况?

  22. 22

    将巨大的文件加载到hadoop集群中时,如果客户端在将数据传输到datanode时失败,会发生什么情况?

  23. 23

    如果我使用--force标志格式化已安装的USB驱动器,会发生什么情况?

  24. 24

    iptables:重新启动后会发生什么?

  25. 25

    如果使用Objective C在xcode 7 +,ios 9中创建项目时忘记添加核心数据会发生什么情况

  26. 26

    如果某些程序正在从一个文件中读取数据时删除输入文件,将会发生什么情况?

  27. 27

    如果多个用户想要修改Oracle数据库中的相同数据块,会发生什么情况

  28. 28

    如果在ThreadPool线程正在写入文件时关闭应用程序会发生什么情况?

  29. 29

    如果您尝试对MongoDB数据库的锁定部分执行操作,会发生什么情况?

热门标签

归档