:)
我已经陷入了一个(奇怪的)情况,在短暂的情况下,我不想使用Kafka的任何新记录,因此请暂停该主题中所有分区的sparkStreaming消耗(InputDStream [ConsumerRecord]),执行一些操作并最后,恢复使用记录。
首先...这可能吗?
我一直在尝试这样的事情:
var consumer: KafkaConsumer[String, String] = _
consumer = new KafkaConsumer[String, String](properties)
consumer.subscribe(java.util.Arrays.asList(topicName))
consumer.pause(consumer.assignment())
...
consumer.resume(consumer.assignment())
但是我得到了:
println(s"Assigned partitions: $consumer.assignment()") --> []
println(s"Paused partitions: ${consumer.paused()}") --> []
println(s"Partitions for: ${consumer.partitionsFor(topicNAme)}") --> [Partition(topic=topicAAA, partition=0, leader=1, replicas=[1,2,3], partition=1, ... ]
欢迎您提供任何帮助,以帮助我了解我所缺少的内容,以及为什么在清楚地为用户分配了分区后为什么我得到空的结果!
版本:Kafka:0.10 Spark:2.3.0 Scala:2.11.8
是的,可以在代码中添加检查点并通过持久存储(本地磁盘,S3,HDFS)路径
并且每当您开始/恢复工作时,它将从检查点获取具有消费者偏移的Kafka消费者组信息,并从停止位置开始进行处理。
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
Spark Check- = pointing不仅是一种用于保存偏移量的机制,而且还可以保存Stages和Jobs的DAG的序列化状态。因此,每当您使用新代码重新启动工作时,
现在,从磁盘读取只是Spark加载Kafka偏移,DAG和旧的不完整处理数据所需的一次性操作。
完成后,它将始终按照默认值或指定的检查点间隔继续将数据保存到磁盘。
Spark流式传输提供了一个选项来指定Kafka组ID,但Spark结构化的流则没有。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句