当我并行处理大型列表时,Spark Context关闭

栈G

当我从Spark中的列表创建RDD时,通常会在我尝试对其执行RDD操作时立即关闭Spark上下文。

这是导致崩溃的代码,下面是堆栈跟踪。任何指导,不胜感激!

import sys

import numpy as np
import pyspark

SC = pyspark.SparkContext("local", "Crash app")

for i in xrange(10):

    randArray = np.random.rand(10**i)

    randRdd = SC.parallelize(randArray)
    print "Size of the RDD is ", randRdd.count()
    sys.stdout.flush()

生成此堆栈跟踪:

Size of the RDD is 1
Size of the RDD is 10
Size of the RDD is 100
Size of the RDD is 1000
Size of the RDD is 10000
Size of the RDD is 100000
Size of the RDD is 1000000
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-3-7e69d839c2b5> in <module>()
      4 
      5     randRdd = SC.parallelize(randArray)
----> 6     print "Size of the RDD is " + str(randRdd.count())
      7     sys.stdout.flush()

/usr/local/spark/python/pyspark/rdd.pyc in count(self)
    706         3
    707         """
--> 708         return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
    709 
    710     def stats(self):

/usr/local/spark/python/pyspark/rdd.pyc in sum(self)
    697         6.0
    698         """
--> 699         return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
    700 
    701     def count(self):

/usr/local/spark/python/pyspark/rdd.pyc in reduce(self, f)
    617             if acc is not None:
    618                 yield acc
--> 619         vals = self.mapPartitions(func).collect()
    620         return reduce(f, vals)
    621 

/usr/local/spark/python/pyspark/rdd.pyc in collect(self)
    581         """
    582         with _JavaStackTrace(self.context) as st:
--> 583           bytesInJava = self._jrdd.collect().iterator()
    584         return list(self._collect_iterator_through_file(bytesInJava))
    585 

/usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in __call__(self, *args)
    535         answer = self.gateway_client.send_command(command)
    536         return_value = get_return_value(answer, self.gateway_client,
--> 537                 self.target_id, self.name)
    538 
    539         for temp_arg in temp_args:

/usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling o103.collect.
: org.apache.spark.SparkException: Job cancelled because SparkContext was shut down
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:639)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:638)
    at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
    at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:638)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1215)
    at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:201)
    at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163)
    at akka.actor.ActorCell.terminate(ActorCell.scala:338)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
    at akka.dispatch.Mailbox.run(Mailbox.scala:218)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
塞夫

10000000是很多。我不是python方面的专家,但是虽然1000000个(什么是什么?整数?)数字可以容纳到普通PC的内存中,但不能超过十倍。我相信您的上下文由于底层内存问题而被关闭。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

当我并行处理大型列表时,Spark Context关闭

来自分类Dev

checkSelfPermission(context, Manifest.permission.ACCESS_FINE_LOCATION) 即使我的 GPS 关闭也返回 true

来自分类Dev

当我关闭BufferedInputStream时,基础InputStream是否也关闭了?

来自分类Dev

当我在Vim中按Tab时,关闭引号/括号

来自分类Dev

当我触摸地图android Osm时如何关闭infoWindow

来自分类Dev

当我有多个打开时,关闭单个vim实例

来自分类Dev

当我尝试返回元组时,IDataReader关闭

来自分类Dev

当我单击evrywhere时,如何关闭jQuery弹出窗口

来自分类Dev

当我单击按钮打开此活动时,它关闭

来自分类Dev

每当我启动时,Eclipse都会自动关闭

来自分类Dev

当我更改语言时,大写锁定已关闭

来自分类Dev

当我关闭Nautilus时,复制操作将被取消

来自分类Dev

当我点击按钮时应用程序关闭

来自分类Dev

当我打印 Pandas DataFrame 时 QMainWindow 意外关闭?

来自分类Dev

我正在尝试制作自己的批处理游戏,但是当我尝试运行脚本/代码时,它立即关闭

来自分类Dev

当我明确关闭 Stream 时,关闭 StreamReader(或 StreamWriter)是否有好处?

来自分类Dev

当我关闭启动画面时 WPF 应用程序关闭

来自分类Dev

我关闭时出错

来自分类Dev

当Kafka关闭时,我该如何处理IOException?

来自分类Dev

为什么当我收到大的json消息时,我的websocket连接会关闭?

来自分类Dev

当我按下按钮时,我的android应用会继续关闭

来自分类Dev

当我关闭GCP项目时,我收到了一个跟踪号未知的错误

来自分类Dev

当我单击UserInfo按钮时如果强制关闭我的应用程序

来自分类Dev

wxPython:当我关闭框架时,CheckBox如何记住我的选择

来自分类Dev

当我尝试输入密码时,Bitlocker关闭了我的计算机

来自分类Dev

当且仅当我单击按钮时,我才想关闭导航抽屉

来自分类Dev

每当我的网络设备启动或关闭时,如何自动更改我的mac地址?

来自分类Dev

当我关闭代码块时,我遇到了这个错误

来自分类Dev

当我们关闭开关时,我们如何取消选择该行

Related 相关文章

  1. 1

    当我并行处理大型列表时,Spark Context关闭

  2. 2

    checkSelfPermission(context, Manifest.permission.ACCESS_FINE_LOCATION) 即使我的 GPS 关闭也返回 true

  3. 3

    当我关闭BufferedInputStream时,基础InputStream是否也关闭了?

  4. 4

    当我在Vim中按Tab时,关闭引号/括号

  5. 5

    当我触摸地图android Osm时如何关闭infoWindow

  6. 6

    当我有多个打开时,关闭单个vim实例

  7. 7

    当我尝试返回元组时,IDataReader关闭

  8. 8

    当我单击evrywhere时,如何关闭jQuery弹出窗口

  9. 9

    当我单击按钮打开此活动时,它关闭

  10. 10

    每当我启动时,Eclipse都会自动关闭

  11. 11

    当我更改语言时,大写锁定已关闭

  12. 12

    当我关闭Nautilus时,复制操作将被取消

  13. 13

    当我点击按钮时应用程序关闭

  14. 14

    当我打印 Pandas DataFrame 时 QMainWindow 意外关闭?

  15. 15

    我正在尝试制作自己的批处理游戏,但是当我尝试运行脚本/代码时,它立即关闭

  16. 16

    当我明确关闭 Stream 时,关闭 StreamReader(或 StreamWriter)是否有好处?

  17. 17

    当我关闭启动画面时 WPF 应用程序关闭

  18. 18

    我关闭时出错

  19. 19

    当Kafka关闭时,我该如何处理IOException?

  20. 20

    为什么当我收到大的json消息时,我的websocket连接会关闭?

  21. 21

    当我按下按钮时,我的android应用会继续关闭

  22. 22

    当我关闭GCP项目时,我收到了一个跟踪号未知的错误

  23. 23

    当我单击UserInfo按钮时如果强制关闭我的应用程序

  24. 24

    wxPython:当我关闭框架时,CheckBox如何记住我的选择

  25. 25

    当我尝试输入密码时,Bitlocker关闭了我的计算机

  26. 26

    当且仅当我单击按钮时,我才想关闭导航抽屉

  27. 27

    每当我的网络设备启动或关闭时,如何自动更改我的mac地址?

  28. 28

    当我关闭代码块时,我遇到了这个错误

  29. 29

    当我们关闭开关时,我们如何取消选择该行

热门标签

归档