在这种情况下
val dStream : Stream[_] =
dStream.foreachRDD(a => ... )
dStream.foreachRDD(b => ... )
执行foreach方法:
我想知道这一点,因为我想在数据库插入后提交kafka offset。(并且db连接器仅提供一个“ foreach”插入)
val dStream : Stream[_] = ...().cache()
dStream.toDb // consume the stream
dStream.foreachRDD(b => //commit offset ) //consume the stream but after the db insert
在Spark UI中,看起来好像有命令,但是我不确定它是否可靠。
编辑:如果foreachRDD(a =>)失败,是否仍执行foreachRDD(b =>)?
DStream.foreach
从Spark 0.9.0开始不推荐使用。您希望以等价物DStream.foreachRDD
开始。
Spark DAG中的阶段是按顺序执行的,因为一个转换的输出通常也是图形中下一转换的输入,但是在您的示例中并非如此。
发生的是内部将RDD划分为多个分区。每个分区都在群集管理器可用的不同工作器上运行。在您的示例中,DStream.foreach(a => ...)
将在之前执行DStream.foreach(b => ...)
,但foreach
就内部RDD
迭代而言,其中的执行将并行运行。
我想知道这一点,因为我想在数据库插入后提交kafka offset。
的DStream.foreachRDD
是输出变换,这意味着它会引起火花兑现该图并开始执行。您可以放心地假定,在执行第二个数据库之前,将结束对数据库的插入foreach
,但是请记住,第一个foreach
数据库将在并行foreach分区中更新数据库RDD
。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句