有没有一种方法可以将结果流式传输到驱动程序,而无需等待所有分区完成执行?
我是Spark的新手,因此,如果有更好的方法,请指出正确的方向。我想并行执行大量分区,并使用spark来处理分发/重新启动等操作。完成操作后,我想将结果收集到驱动程序中的单个存档中。
toLocalIterator()
我已经能够做到这一点toLocalIterator()
,根据文档,它限制了驱动程序所需的资源。因此,它基本上可以工作。
问题在于,toLocalIterator()
不仅将驱动程序一次限制为一个分区,而且似乎一次只能执行一个分区。这对我没有用。该行为在下面的演示代码中进行了演示。
persist()
+ count()
+toLocalIterator()
我发现我可以通过持久化然后使用触发并行执行来解决此问题count()
。之后,toLocalIterator()
便可以快速提取预先计算的结果。
问题是我有很多分区(大约10 ^ 3或10 ^ 4),每个分区大约需要15分钟。这样最终会保留大量数据(不是很多),但更糟糕的是,一旦整个工作持续太长时间,它似乎就失去了持久性。分区最终被重新计算。我正在与可抢占的工作人员一起使用google dataproc,因此这可能与它有关系,但是我很确定它最终甚至在固定工作人员上也要重新计算...我不确定到底发生了什么。
无论如何,在访问第一个结果之前必须执行所有分区似乎并不理想。
下面的演示代码演示了一切都可以很好地持续并且迭代不会触发重新计算的最佳情况。
有没有类似的东西?
import time
import pyspark.storagelevel
def slow_square(n):
time.sleep(5)
return n**2
with pyspark.SparkContext() as spark_context:
numbers = spark_context.parallelize(range(4), 4) # I think 4 is default executors locally
squares = numbers.map(slow_square)
# Use toLocalIterator()
start = time.time()
list(squares.toLocalIterator())
print('toLocalIterator() took {:.1f} seconds (expected about 5)'.format(time.time() - start))
# I get about 20s
# Use count() to show that it's faster in parallel
start = time.time()
squares.count()
print('count() took {:.1f} seconds (expected about 5)'.format(time.time() - start))
# I get about 5s
# Use persist() + count() + toLocalIterator()
start = time.time()
squares.persist(pyspark.storagelevel.StorageLevel.MEMORY_AND_DISK)
squares.count()
list(squares.toLocalIterator())
print('persisted toLocalIterator() took {:.1f} seconds (expected about 5)'.format(time.time() - start))
# I get about 5s
一般来说,这不是您通常在Spark中执行的操作。通常,我们尝试将通过驱动程序传递的数据量限制为最少。这样做的主要原因有两个:
在正常情况下,您只需要继续进行作业,写入持久性存储,然后最终对结果应用进一步的处理步骤即可。
如果您希望能够迭代访问结果,则可以选择以下几种方法:
foreach
/处理数据foreachPartition
,并在产生数据时将数据推送到外部消息传递系统,并使用另一个过程进行消耗和写入。这需要附加组件,但从概念上讲可能更容易(您可以使用反压,缓冲结果,从驱动程序分离合并逻辑以最大程度地减少应用程序失败的风险)。Hack Spark累加器。任务完成后,火花累积器会更新,因此您可以分批处理累积的即将到来的数据。
警告:以下代码仅是概念验证。它没有经过适当的测试,很有可能是非常不可靠的。
AccumulatorParam
使用RXPy的示例
# results_param.py
from rx.subjects import Subject
from pyspark import AccumulatorParam, TaskContext
class ResultsParam(AccumulatorParam, Subject):
"""An observable accumulator which collects task results"""
def zero(self, v):
return []
def addInPlace(self, acc1, acc2):
# This is executed on the workers so we have to
# merge the results
if (TaskContext.get() is not None and
TaskContext().get().partitionId() is not None):
acc1.extend(acc2)
return acc1
else:
# This is executed on the driver so we discard the results
# and publish to self instead
for x in acc2:
self.on_next(x)
return []
简单的Spark应用程序(Python 3.x):
# main.py
import time
from pyspark import SparkContext, TaskContext
sc = SparkContext(master="local[4]")
sc.addPyFile("results_param.py")
from results_param import ResultsParam
# Define accumulator
acc = sc.accumulator([], ResultsParam())
# Dummy subscriber
acc.accum_param.subscribe(print)
def process(x):
"""Identity proccess"""
result = x
acc.add([result])
# Add some delay
time.sleep(5)
return result
sc.parallelize(range(32), 8).foreach(process)
这是相对简单的,但是如果多个任务同时完成,则存在驱动程序不堪重负的风险,因此您必须大幅超额分配驱动程序资源(成比例地达到并行度和任务结果的预期大小)。
runJob
直接使用Scala (不支持Python)。
实际上,Spark实际上是异步获取结果的,只要您不关心顺序,就不需要等待所有数据被处理。例如,reduce
您可以查看实现Scala。
应该可以使用这种机制将分区推送到Python进程,但是我还没有尝试过。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句