当我从Spark中的列表创建RDD时,通常会在我尝试对其执行RDD操作时立即关闭Spark上下文。
这是导致崩溃的代码,下面是堆栈跟踪。任何指导,不胜感激!
import sys
import numpy as np
import pyspark
SC = pyspark.SparkContext("local", "Crash app")
for i in xrange(10):
randArray = np.random.rand(10**i)
randRdd = SC.parallelize(randArray)
print "Size of the RDD is ", randRdd.count()
sys.stdout.flush()
生成此堆栈跟踪:
Size of the RDD is 1
Size of the RDD is 10
Size of the RDD is 100
Size of the RDD is 1000
Size of the RDD is 10000
Size of the RDD is 100000
Size of the RDD is 1000000
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-3-7e69d839c2b5> in <module>()
4
5 randRdd = SC.parallelize(randArray)
----> 6 print "Size of the RDD is " + str(randRdd.count())
7 sys.stdout.flush()
/usr/local/spark/python/pyspark/rdd.pyc in count(self)
706 3
707 """
--> 708 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
709
710 def stats(self):
/usr/local/spark/python/pyspark/rdd.pyc in sum(self)
697 6.0
698 """
--> 699 return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
700
701 def count(self):
/usr/local/spark/python/pyspark/rdd.pyc in reduce(self, f)
617 if acc is not None:
618 yield acc
--> 619 vals = self.mapPartitions(func).collect()
620 return reduce(f, vals)
621
/usr/local/spark/python/pyspark/rdd.pyc in collect(self)
581 """
582 with _JavaStackTrace(self.context) as st:
--> 583 bytesInJava = self._jrdd.collect().iterator()
584 return list(self._collect_iterator_through_file(bytesInJava))
585
/usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in __call__(self, *args)
535 answer = self.gateway_client.send_command(command)
536 return_value = get_return_value(answer, self.gateway_client,
--> 537 self.target_id, self.name)
538
539 for temp_arg in temp_args:
/usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
--> 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(
Py4JJavaError: An error occurred while calling o103.collect.
: org.apache.spark.SparkException: Job cancelled because SparkContext was shut down
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:639)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:638)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:638)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1215)
at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:201)
at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163)
at akka.actor.ActorCell.terminate(ActorCell.scala:338)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
at akka.dispatch.Mailbox.run(Mailbox.scala:218)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
10000000是很多。我不是python方面的专家,但是虽然1000000个(什么是什么?整数?)数字可以容纳到普通PC的内存中,但不能超过十倍。我相信您的上下文由于底层内存问题而被关闭。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句