在SparkStreaming中暂停和恢复KafkaConsumer

Borja

:)

我已经陷入了一个(奇怪的)情况,在短暂的情况下,我不想使用Kafka的任何新记录,因此请暂停该主题中所有分区的sparkStreaming消耗(InputDStream [ConsumerRecord]),执行一些操作并最后,恢复使用记录。

首先...这可能吗?

我一直在尝试这样的事情:

var consumer: KafkaConsumer[String, String] = _    
consumer = new KafkaConsumer[String, String](properties)    
consumer.subscribe(java.util.Arrays.asList(topicName))

consumer.pause(consumer.assignment())
...
consumer.resume(consumer.assignment())

但是我得到了:

println(s"Assigned partitions: $consumer.assignment()") --> []
println(s"Paused partitions: ${consumer.paused()}") --> []
println(s"Partitions for: ${consumer.partitionsFor(topicNAme)}") --> [Partition(topic=topicAAA, partition=0, leader=1, replicas=[1,2,3], partition=1, ... ]

欢迎您提供任何帮助,以帮助我了解我所缺少的内容,以及为什么在清楚地为用户分配了分区后为什么我得到空的结果!

版本:Kafka:0.10 Spark:2.3.0 Scala:2.11.8

水银

是的,可以在代码中添加检查点并通过持久存储(本地磁盘,S3,HDFS)路径

并且每当您开始/恢复工作时,它将从检查点获取具有消费者偏移的Kafka消费者组信息,并从停止位置开始进行处理。

val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

Spark Check- = pointing不仅是一种用于保存偏移量的机制,而且还可以保存Stages和Jobs的DAG的序列化状态。因此,每当您使用新代码重新启动工作时,

  1. 读取和处理序列化数据
  2. 如果您的Spark App中的代码有任何更改,请清理缓存的DAG阶段
  3. 从具有最新代码的新数据中恢复处理。

现在,从磁盘读取只是Spark加载Kafka偏移,DAG和旧的不完整处理数据所需的一次性操作

完成后,它将始终按照默认值或指定的检查点间隔继续将数据保存到磁盘。

Spark流式传输提供了一个选项来指定Kafka组ID,但Spark结构化的流则没有。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

Libgdx中的暂停和恢复

来自分类Dev

暂停和恢复功能

来自分类Dev

暂停和恢复BackgroundWorker

来自分类Dev

暂停和恢复下载

来自分类Dev

如何在Android中暂停和恢复录音

来自分类Dev

如何在JavaScript中暂停和恢复计时器?

来自分类Dev

在R中暂停和恢复插入符号训练

来自分类Dev

暂停和恢复如何在Android的协程中工作

来自分类Dev

如何在JavaScript中暂停和恢复多个setTimeout?

来自分类Dev

暂停和恢复本地存储中的计数值

来自分类Dev

暂停和恢复圆形动画

来自分类Dev

SpriteKit暂停和恢复SKView

来自分类Dev

暂停和恢复书脊动画

来自分类Dev

Swift:暂停和恢复NSTimer

来自分类Dev

如何暂停和恢复流程

来自分类Dev

暂停并恢复javascript中的setInterval

来自分类Dev

暂停并恢复javascript中的setInterval

来自分类Dev

Pygame 无法从暂停中恢复

来自分类Dev

摇摆动画暂停和恢复

来自分类Dev

暂停和恢复SwingWorker.doInBackground()

来自分类Dev

如何暂停和恢复jQuery间隔

来自分类Dev

状态恢复,如何拦截和暂停SKScene

来自分类Dev

停止,中断,暂停和恢复Java线程

来自分类Dev

暂停和恢复线程活动

来自分类Dev

如何暂停和恢复UIView.animateWithDuration

来自分类Dev

JMeter-按需暂停(和恢复)执行

来自分类Dev

状态恢复,如何拦截和暂停SKScene

来自分类Dev

停止,中断,暂停和恢复Java线程

来自分类Dev

摇摆动画暂停和恢复