有没有一种方法可以在不等待所有分区完成执行的情况下将结果流式传输到驱动程序?

神户约翰

有没有一种方法可以将结果流式传输到驱动程序,而无需等待所有分区完成执行?

我是Spark的新手,因此,如果有更好的方法,请指出正确的方向。我想并行执行大量分区,并使用spark来处理分发/重新启动等操作。完成操作后,我想将结果收集到驱动程序中的单个存档中。

toLocalIterator()

我已经能够做到这一点toLocalIterator()根据文档,它限制了驱动程序所需的资源。因此,它基本上可以工作。

问题在于,toLocalIterator()不仅将驱动程序一次限制为一个分区,而且似乎一次只能执行一个分区。这对我没有用。该行为在下面的演示代码中进行了演示。

使用persist()+ count()+toLocalIterator()

我发现我可以通过持久化然后使用触发并行执行来解决此问题count()之后,toLocalIterator()便可以快速提取预先计算的结果。

问题是我有很多分区(大约10 ^ 3或10 ^ 4),每个分区大约需要15分钟。这样最终会保留大量数据(不是很多),但更糟糕的是,一旦整个工作持续太长时间,它似乎就失去了持久性。分区最终被重新计算。我正在与可抢占的工作人员一起使用google dataproc,因此这可能与它有关系,但是我很确定它最终甚至在固定工作人员上也要重新计算...我不确定到底发生了什么。

无论如何,在访问第一个结果之前必须执行所有分区似乎并不理想。

下面的演示代码演示了一切都可以很好地持续并且迭代不会触发重新计算的最佳情况。

??? ->迭代数据而无需等待完整执行

有没有类似的东西?

复制/粘贴演示代码

import time
import pyspark.storagelevel

def slow_square(n):
    time.sleep(5)
    return n**2


with pyspark.SparkContext() as spark_context:
    numbers = spark_context.parallelize(range(4), 4)  # I think 4 is default executors locally
    squares = numbers.map(slow_square)

    # Use toLocalIterator()
    start = time.time()
    list(squares.toLocalIterator())
    print('toLocalIterator() took {:.1f} seconds (expected about 5)'.format(time.time() - start))
    # I get about 20s

    # Use count() to show that it's faster in parallel
    start = time.time()
    squares.count()
    print('count() took {:.1f} seconds (expected about 5)'.format(time.time() - start))
    # I get about 5s

    # Use persist() + count() + toLocalIterator()
    start = time.time()
    squares.persist(pyspark.storagelevel.StorageLevel.MEMORY_AND_DISK)
    squares.count()
    list(squares.toLocalIterator())
    print('persisted toLocalIterator() took {:.1f} seconds (expected about 5)'.format(time.time() - start))
    # I get about 5s
零323

一般来说,这不是您通常在Spark中执行的操作。通常,我们尝试将通过驱动程序传递的数据量限制为最少。这样做的主要原因有两个:

  • 将数据传递给Spark驱动程序很容易成为应用程序的瓶颈。
  • 驱动程序实际上是批处理应用程序中的单点故障。

在正常情况下,您只需要继续进行作业,写入持久性存储,然后最终对结果应用进一步的处理步骤即可。

如果您希望能够迭代访问结果,则可以选择以下几种方法:

  • 使用Spark Streaming。创建一个简单的过程,将数据推送到群集,然后收集每个批次。它简单,可靠,经过测试,并且不需要任何其他基础结构。
  • 使用foreach/处理数据foreachPartition,并在产生数据时将数据推送到外部消息传递系统,并使用另一个过程进行消耗和写入。这需要附加组件,但从概念上讲可能更容易(您可以使用反压,缓冲结果,从驱动程序分离合并逻辑以最大程度地减少应用程序失败的风险)。
  • Hack Spark累加器。任务完成后,火花累积器会更新,因此您可以分批处理累积的即将到来的数据。

    警告以下代码仅是概念验证。它没有经过适当的测试,很有可能是非常不可靠的

    AccumulatorParam使用RXPy的示例

    # results_param.py
    
    from rx.subjects import Subject
    from pyspark import AccumulatorParam, TaskContext
    
    class ResultsParam(AccumulatorParam, Subject):
        """An observable accumulator which collects task results"""
        def zero(self, v):
            return []
    
        def addInPlace(self, acc1, acc2):
            # This is executed on the workers so we have to
            # merge the results
            if (TaskContext.get() is not None and 
                    TaskContext().get().partitionId() is not None):
                acc1.extend(acc2)
                return acc1
            else:
                # This is executed on the driver so we discard the results
                # and publish to self instead
                for x in acc2:
                    self.on_next(x)
                return []
    

    简单的Spark应用程序(Python 3.x):

    # main.py
    
    import time
    from pyspark import SparkContext, TaskContext
    
    sc = SparkContext(master="local[4]")
    sc.addPyFile("results_param.py")
    
    from results_param import ResultsParam
    
    # Define accumulator
    acc = sc.accumulator([], ResultsParam())
    
    # Dummy subscriber 
    acc.accum_param.subscribe(print)
    
    def process(x):
        """Identity proccess"""
        result = x
        acc.add([result])
    
        # Add some delay
        time.sleep(5)
    
        return result
    
    sc.parallelize(range(32), 8).foreach(process)
    

    这是相对简单的,但是如果多个任务同时完成,则存在驱动程序不堪重负的风险,因此您必须大幅超额分配驱动程序资源(成比例地达到并行度和任务结果的预期大小)。

  • runJob直接使用Scala (不支持Python)。

    实际上,Spark实际上是异步获取结果的,只要您不关心顺序,就不需要等待所有数据被处理。例如reduce您可以查看实现Scala

    应该可以使用这种机制将分区推送到Python进程,但是我还没有尝试过。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

有没有一种方法可以在CQL驱动程序中使用QueryBuilder执行“ SELECT JSON ...”?

来自分类Dev

有没有一种方法可以将多个桌面流式传输到一台计算机?

来自分类Dev

有没有一种方法可以将Azure信息保护活动日志流式传输到事件中心?

来自分类Dev

有没有一种方法可以以低延迟将音频从Windows流传输到macOS?

来自分类Dev

有没有一种方法可以在不创建jar的情况下将Spark应用程序提交到集群?

来自分类Dev

有没有一种方法可以在没有CellEditor的情况下选择JTable中单元格中的所有文本?

来自分类Dev

有没有一种方法可以在没有全盘加密的情况下加密我的文件

来自分类Dev

有没有一种方法可以在没有全盘加密的情况下加密我的文件

来自分类Dev

有没有一种方法可以在没有JavaScript的情况下动态更改内容?

来自分类Dev

有没有一种方法可以使用Selenium / Web驱动程序在<div>中获取所有HTML元素ID?

来自分类Dev

有没有一种方法可以“重新启动”触摸板驱动程序?

来自分类Dev

有没有一种方法可以同步使用MongoDB C#驱动程序

来自分类Dev

有没有一种方法可以“重新启动”触摸板驱动程序?

来自分类Dev

有没有一种方法可以强制Windows使用Precision Touchpad驱动程序而不是Synaptics?

来自分类Dev

有没有一种方法可以同步使用MongoDB C#驱动程序

来自分类Dev

有没有一种方法可以在没有ssl的情况下将页面添加到facebook选项卡?

来自分类Dev

有没有一种方法可以在不使用Drush的情况下从bash脚本安装Drupal?

来自分类Dev

有没有一种方法可以在不使用队列或Blob存储的情况下触发webjob?

来自分类Dev

Tkinter:默认情况下,有没有一种方法可以选中复选框?

来自分类Dev

有没有一种方法可以在不打开文件的情况下提取访问模块?

来自分类Dev

有没有一种方法可以在不安装设置的情况下创建内部键盘?

来自分类Dev

有没有一种方法可以在不设置webhook的情况下获得深层链接数据?

来自分类Dev

有没有一种方法可以在不授予用户权限的情况下更新Firestore的文档?

来自分类Dev

有没有一种方法可以在不命名结构的情况下使用指针在结构内部?

来自分类Dev

Tkinter:默认情况下,有没有一种方法可以选中复选框?

来自分类Dev

有没有一种方法可以在不列出参数的情况下编写此Javascript函数?

来自分类Dev

有没有一种方法可以在不重写现有类名的情况下将类名添加到blazor组件?

来自分类Dev

有没有一种方法可以将属性绑定到如果未绑定的情况下应具有的值

来自分类Dev

有没有一种方法可以执行按钮单击下的方法?

Related 相关文章

  1. 1

    有没有一种方法可以在CQL驱动程序中使用QueryBuilder执行“ SELECT JSON ...”?

  2. 2

    有没有一种方法可以将多个桌面流式传输到一台计算机?

  3. 3

    有没有一种方法可以将Azure信息保护活动日志流式传输到事件中心?

  4. 4

    有没有一种方法可以以低延迟将音频从Windows流传输到macOS?

  5. 5

    有没有一种方法可以在不创建jar的情况下将Spark应用程序提交到集群?

  6. 6

    有没有一种方法可以在没有CellEditor的情况下选择JTable中单元格中的所有文本?

  7. 7

    有没有一种方法可以在没有全盘加密的情况下加密我的文件

  8. 8

    有没有一种方法可以在没有全盘加密的情况下加密我的文件

  9. 9

    有没有一种方法可以在没有JavaScript的情况下动态更改内容?

  10. 10

    有没有一种方法可以使用Selenium / Web驱动程序在<div>中获取所有HTML元素ID?

  11. 11

    有没有一种方法可以“重新启动”触摸板驱动程序?

  12. 12

    有没有一种方法可以同步使用MongoDB C#驱动程序

  13. 13

    有没有一种方法可以“重新启动”触摸板驱动程序?

  14. 14

    有没有一种方法可以强制Windows使用Precision Touchpad驱动程序而不是Synaptics?

  15. 15

    有没有一种方法可以同步使用MongoDB C#驱动程序

  16. 16

    有没有一种方法可以在没有ssl的情况下将页面添加到facebook选项卡?

  17. 17

    有没有一种方法可以在不使用Drush的情况下从bash脚本安装Drupal?

  18. 18

    有没有一种方法可以在不使用队列或Blob存储的情况下触发webjob?

  19. 19

    Tkinter:默认情况下,有没有一种方法可以选中复选框?

  20. 20

    有没有一种方法可以在不打开文件的情况下提取访问模块?

  21. 21

    有没有一种方法可以在不安装设置的情况下创建内部键盘?

  22. 22

    有没有一种方法可以在不设置webhook的情况下获得深层链接数据?

  23. 23

    有没有一种方法可以在不授予用户权限的情况下更新Firestore的文档?

  24. 24

    有没有一种方法可以在不命名结构的情况下使用指针在结构内部?

  25. 25

    Tkinter:默认情况下,有没有一种方法可以选中复选框?

  26. 26

    有没有一种方法可以在不列出参数的情况下编写此Javascript函数?

  27. 27

    有没有一种方法可以在不重写现有类名的情况下将类名添加到blazor组件?

  28. 28

    有没有一种方法可以将属性绑定到如果未绑定的情况下应具有的值

  29. 29

    有没有一种方法可以执行按钮单击下的方法?

热门标签

归档