我正在本地模式下使用Spark集群运行PySpark,并且试图将流式DataFrame写入Kafka主题。
运行查询时,收到以下消息:
java.lang.IllegalStateException: Set(topicname-0) are gone. Some data may have been missed..
Some data may have been lost because they are not available in Kafka any more; either the
data was aged out by Kafka or the topic may have been deleted before all the data in the
topic was processed. If you don't want your streaming query to fail on such cases, set the
source option "failOnDataLoss" to "false".
这是我的代码:
query = (
output_stream
.writeStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "ratings-cleaned")
.option("checkpointLocation", "checkpoints-folder")
.start()
)
sleep(2)
print(query.status)
自上次查询以来,从源主题中删除了一些消息/偏移后,通常会显示此错误消息。删除是由于清理策略(例如保留时间)而发生的。
想象一下,您的主题中包含偏移量0、1、2的消息已全部由应用程序处理。检查点文件存储最后一个偏移量2,以记住下次启动偏移量3时继续。
一段时间后,向该主题生成了偏移量为3、4、5的消息,但是将偏移量为0、1、2、3的消息从主题中删除到保留。
新增功能,在重新启动Spark结构化流作业时,它尝试根据其检查点文件获取3,但意识到只有偏移量为4的消息才可用。在这种情况下,它将抛出此异常。
您可以通过解决此问题
.option("failOnDataLoss", "false")
在readStream
操作中进行设置,或根据《结构化流+ Kafka集成指南》,该选项failOnDataLoss
描述为:
“是否有可能在数据丢失(例如,主题被删除或偏移量超出范围)时使查询失败。这可能是一个错误的警报。您可以在其无法正常工作时将其禁用。批处理如果由于丢失数据而无法从提供的偏移量中读取任何数据,查询将始终失败。”
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句