Pyspark에서 목록의 모든 값을 추가하는 방법은 무엇입니까?

뜨내기

jupyter 노트북에서 아래 pyspark 변환을 실행하고 있습니다. 내 요구 사항은 469 + 84451 + 903 ...과 같은 요소의 모든 값을 추가하는 것이며 총 개수 만 반환해야합니다.

다음은 변화와 행동입니다.

In [46]: newdispokey1.collect()

[(u'Hello', 469),
 (u'is', 84451),
 (u'the', 903),
 (u'an', 21208),
 (u'and', 19903),
 (u'route', 185),
 (u'bag', 1894),
 (u'metal', 315),
 (u'bus', 620194),
 (u'cloud', 1036)]

예상 결과는 모든 값을 더한 것입니다. 나는 아래의 변형과 행동을 시도하고 있습니다.

   In [46]: newdispokey1.reduce( lambda x,y: x[1]+y[1] ).collect()

및 오류가 발생합니다.

    ---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-46-9557bb70b499> in <module>()
----> 1 newdispokey1.reduce( lambda x,y: x[1]+y[1] ).collect()

/home/newuser/spark/python/pyspark/rdd.pyc in reduce(self, f)
    797             yield reduce(f, iterator, initial)
    798 
--> 799         vals = self.mapPartitions(func).collect()
    800         if vals:
    801             return reduce(f, vals)

/home/newuser/spark/python/pyspark/rdd.pyc in collect(self)
    772         with SCCallSiteSync(self.context) as css:
--> 773             port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
    774         return list(_load_from_socket(port, self._jrdd_deserializer))
    775 

/home/newuser/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/home/newuser/spark/python/pyspark/sql/utils.pyc in deco(*a, **kw)
     34     def deco(*a, **kw):
     35         try:
---> 36             return f(*a, **kw)
     37         except py4j.protocol.Py4JJavaError as e:
     38             s = e.java_exception.toString()

/home/newuser/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 48.0 failed 1 times, most recent failure: Lost task 0.0 in stage 48.0 (TID 167, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/newuser/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/home/newuser/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/newuser/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/newuser/spark/python/pyspark/rdd.py", line 797, in func
    yield reduce(f, iterator, initial)
  File "<ipython-input-46-9557bb70b499>", line 1, in <lambda>
TypeError: 'int' object has no attribute '__getitem__'

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1921)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:909)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:908)
    at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)
    at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
    at sun.reflect.GeneratedMethodAccessor46.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:207)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/newuser/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/home/newuser/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/newuser/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/newuser/spark/python/pyspark/rdd.py", line 797, in func
    yield reduce(f, iterator, initial)
  File "<ipython-input-46-9557bb70b499>", line 1, in <lambda>
TypeError: 'int' object has no attribute '__getitem__'

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    ... 1 more

Apache Spark를 처음 사용하며이 문제를 해결하는 방법은 무엇입니까?

엘라 자르

가장 간단한 해결책은

>>> newdispokey1.values().sum()
750558

문제는 reduce메서드 에 대한 매개 변수 유형-감속기 유형에 있습니다. 하나는 이전 결과이거나 첫 번째 요소이고 두 번째는 새 요소입니다. 따라서 다른 요소처럼 보이는 요소를 반환해야합니다. 한 쌍을 반환해야합니다.

>>> newdispokey1.reduce(lambda x, y: ('', x[1] + y[1]))
('', 750558)

또는 더 정교하게 변경하려는 경우 :

>>> newdispokey1.map(lambda x: x[1]).reduce(lambda x, y: x + y)
750558

첫 번째 솔루션과 동일하지만 읽기가 어렵습니다.

이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.

침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제

에서 수정
0

몇 마디 만하겠습니다

0리뷰
로그인참여 후 검토

관련 기사

분류에서Dev

목록 내의 모든 튜플에 하나의 값을 추가하는 방법은 무엇입니까?

분류에서Dev

목록 내의 모든 요소에 값을 추가하는 방법은 무엇입니까?

분류에서Dev

UnmodifiableSet의 모든 값을 ArrayList에 추가하는 방법은 무엇입니까?

분류에서Dev

모든 MonoBehaviours of Scene을 목록에 추가하는 방법은 무엇입니까?

분류에서Dev

공통 목록의 모든 루프에서 출력 값을 나열하는 방법은 무엇입니까?

분류에서Dev

Where 절에서 목록의 모든 값을 확인하는 방법은 무엇입니까?

분류에서Dev

스칼라의지도 목록에서 모든 값을 얻는 방법은 무엇입니까?

분류에서Dev

immutable.js가 만든 목록에서 모든 값을 가져 오는 방법은 무엇입니까?

분류에서Dev

목록의 첫 번째 항목을 해당 목록의 모든 순열 끝에 추가하는 방법은 무엇입니까?

분류에서Dev

MATLAB의 구조체에있는 모든 필드의 값을 추가하는 방법은 무엇입니까?

분류에서Dev

사전의 목록에 값을 추가하는 방법은 무엇입니까?

분류에서Dev

Js에서 모든 트리 배열의 객체에 항목을 추가하는 방법은 무엇입니까?

분류에서Dev

중첩 목록에서 모든 하위 목록을 추출하는 방법은 무엇입니까?

분류에서Dev

R의 모든 NA에 열의 마지막 값을 추가하는 방법은 무엇입니까?

분류에서Dev

Angular에서 모듈의 종속성 목록에 종속성을 추가하는 방법은 무엇입니까?

분류에서Dev

카운터의 모든 값에 N을 추가하는 방법은 무엇입니까? -파이썬

분류에서Dev

내가 어떻게 목록에서 같은 해에 모든 값을 추가하는 방법은 무엇입니까?

분류에서Dev

Ruby 배열의 모든 항목에 동일한 항목을 추가하는 방법은 무엇입니까?

분류에서Dev

목록에서 가장 높은 숫자의 모든 색인을 찾는 방법은 무엇입니까?

분류에서Dev

목록에서 가장 높은 숫자의 모든 색인을 찾는 방법은 무엇입니까?

분류에서Dev

목록에서 가장 높은 숫자의 모든 발생을 찾는 방법은 무엇입니까?

분류에서Dev

사전 목록에서 하나의 목록 값을 포함하는 모든 사전을 제거하거나 제거하는 방법은 무엇입니까?

분류에서Dev

순서가 지정되지 않은 목록의 모든 요소에 확인란을 추가하는 방법은 무엇입니까?

분류에서Dev

하이브에서만 NULL 값을 갖는 모든 열 목록을 얻는 방법은 무엇입니까?

분류에서Dev

파이썬에서 목록의 모든 항목을 순서대로 바꾸는 방법은 무엇입니까?

분류에서Dev

문자열 목록의 모든 항목 중에서 공통 부분을 찾는 방법은 무엇입니까?

분류에서Dev

Android의 ListView에서 모든 TextView 값을 얻는 방법은 무엇입니까?

분류에서Dev

Android의 ListView에서 모든 TextView 값을 얻는 방법은 무엇입니까?

분류에서Dev

dojo의 FilteringSelect에서 모든 값을 얻는 방법은 무엇입니까?

Related 관련 기사

  1. 1

    목록 내의 모든 튜플에 하나의 값을 추가하는 방법은 무엇입니까?

  2. 2

    목록 내의 모든 요소에 값을 추가하는 방법은 무엇입니까?

  3. 3

    UnmodifiableSet의 모든 값을 ArrayList에 추가하는 방법은 무엇입니까?

  4. 4

    모든 MonoBehaviours of Scene을 목록에 추가하는 방법은 무엇입니까?

  5. 5

    공통 목록의 모든 루프에서 출력 값을 나열하는 방법은 무엇입니까?

  6. 6

    Where 절에서 목록의 모든 값을 확인하는 방법은 무엇입니까?

  7. 7

    스칼라의지도 목록에서 모든 값을 얻는 방법은 무엇입니까?

  8. 8

    immutable.js가 만든 목록에서 모든 값을 가져 오는 방법은 무엇입니까?

  9. 9

    목록의 첫 번째 항목을 해당 목록의 모든 순열 끝에 추가하는 방법은 무엇입니까?

  10. 10

    MATLAB의 구조체에있는 모든 필드의 값을 추가하는 방법은 무엇입니까?

  11. 11

    사전의 목록에 값을 추가하는 방법은 무엇입니까?

  12. 12

    Js에서 모든 트리 배열의 객체에 항목을 추가하는 방법은 무엇입니까?

  13. 13

    중첩 목록에서 모든 하위 목록을 추출하는 방법은 무엇입니까?

  14. 14

    R의 모든 NA에 열의 마지막 값을 추가하는 방법은 무엇입니까?

  15. 15

    Angular에서 모듈의 종속성 목록에 종속성을 추가하는 방법은 무엇입니까?

  16. 16

    카운터의 모든 값에 N을 추가하는 방법은 무엇입니까? -파이썬

  17. 17

    내가 어떻게 목록에서 같은 해에 모든 값을 추가하는 방법은 무엇입니까?

  18. 18

    Ruby 배열의 모든 항목에 동일한 항목을 추가하는 방법은 무엇입니까?

  19. 19

    목록에서 가장 높은 숫자의 모든 색인을 찾는 방법은 무엇입니까?

  20. 20

    목록에서 가장 높은 숫자의 모든 색인을 찾는 방법은 무엇입니까?

  21. 21

    목록에서 가장 높은 숫자의 모든 발생을 찾는 방법은 무엇입니까?

  22. 22

    사전 목록에서 하나의 목록 값을 포함하는 모든 사전을 제거하거나 제거하는 방법은 무엇입니까?

  23. 23

    순서가 지정되지 않은 목록의 모든 요소에 확인란을 추가하는 방법은 무엇입니까?

  24. 24

    하이브에서만 NULL 값을 갖는 모든 열 목록을 얻는 방법은 무엇입니까?

  25. 25

    파이썬에서 목록의 모든 항목을 순서대로 바꾸는 방법은 무엇입니까?

  26. 26

    문자열 목록의 모든 항목 중에서 공통 부분을 찾는 방법은 무엇입니까?

  27. 27

    Android의 ListView에서 모든 TextView 값을 얻는 방법은 무엇입니까?

  28. 28

    Android의 ListView에서 모든 TextView 값을 얻는 방법은 무엇입니까?

  29. 29

    dojo의 FilteringSelect에서 모든 값을 얻는 방법은 무엇입니까?

뜨겁다태그

보관