Spark에서 직렬화 오류를 방지하기 위해 직렬화 할 수없는 개체 분리

아니 메쉬 판 데이

다음 클래스는 Elasticsearch에서 읽기를 시도하고 반환 된 문서를 인쇄하는 주요 함수를 포함합니다.

object TopicApp extends Serializable {

  def run() {

    val start = System.currentTimeMillis()

    val sparkConf = new Configuration()
    sparkConf.set("spark.executor.memory","1g")
    sparkConf.set("spark.kryoserializer.buffer","256")

    val es = new EsContext(sparkConf)
    val esConf = new Configuration()
    esConf.set("es.nodes","localhost")
    esConf.set("es.port","9200")
    esConf.set("es.resource", "temp_index/some_doc")
    esConf.set("es.query", "?q=*:*")
    esConf.set("es.fields", "_score,_id")

    val documents = es.documents(esConf)
    documents.foreach(println)

    val end = System.currentTimeMillis()
    println("Total time: " + (end-start) + " ms")

    es.shutdown()

  }

  def main(args: Array[String]) {
    run()
  }

}

다음 클래스는 반환 된 문서를 JSON으로 변환합니다. org.json4s

class EsContext(sparkConf:HadoopConfig) extends SparkBase {
  private val sc = createSCLocal("ElasticContext", sparkConf)

  def documentsAsJson(esConf:HadoopConfig):RDD[String] = {
    implicit val formats = DefaultFormats
    val source = sc.newAPIHadoopRDD(
      esConf,
      classOf[EsInputFormat[Text, MapWritable]],
      classOf[Text],
      classOf[MapWritable]
    )
    val docs = source.map(
      hit => {
        val doc = Map("ident" -> hit._1.toString) ++ mwToMap(hit._2)
        write(doc)
      }
    )
    docs
  }

  def shutdown() = sc.stop()

  // mwToMap() converts MapWritable to Map

}

다음 클래스 SparkContext는 응용 프로그램 의 로컬 만듭니다 .

trait SparkBase extends Serializable {
  protected def createSCLocal(name:String, config:HadoopConfig):SparkContext = {
    val iterator = config.iterator()
    for (prop <- iterator) {
      val k = prop.getKey
      val v = prop.getValue
      if (k.startsWith("spark."))
        System.setProperty(k, v)
    }
    val runtime = Runtime.getRuntime
    runtime.gc()

    val conf = new SparkConf()
    conf.setMaster("local[2]")

    conf.setAppName(name)
    conf.set("spark.serializer", classOf[KryoSerializer].getName)

    conf.set("spark.ui.port", "0")

    new SparkContext(conf)
  }
}

실행할 때 TopicApp다음과 같은 오류가 발생합니다.

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.map(RDD.scala:323)
    at TopicApp.EsContext.documents(EsContext.scala:51)
    at TopicApp.TopicApp$.run(TopicApp.scala:28)
    at TopicApp.TopicApp$.main(TopicApp.scala:39)
    at TopicApp.TopicApp.main(TopicApp.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
    - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@14f70e7d)
    - field (class: TopicApp.EsContext, name: sc, type: class org.apache.spark.SparkContext)
    - object (class TopicApp.EsContext, TopicApp.EsContext@2cf77cdc)
    - field (class: TopicApp.EsContext$$anonfun$documents$1, name: $outer, type: class TopicApp.EsContext)
    - object (class TopicApp.EsContext$$anonfun$documents$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
    ... 13 more

비슷한 문제를 다루는 다른 게시물을 살펴보면 클래스를 만들 Serializable거나 직렬화 할 수없는 객체를 클래스에서 분리하는 것이 좋습니다 .

내가 가지고있는 오류에서 나는 것을 유추 SparkContextscSparkContext가 직렬화 클래스 아니므로 비 직렬화입니다.

애플리케이션이 올바르게 실행되도록 SparkContext를 어떻게 분리해야합니까?

차크 조 하르

확실히 프로그램을 실행할 수는 없지만 RDD의 데이터에서 실행해야하는 경우 직렬화 할 수없는 클래스의 멤버를 참조하는 익명 함수를 만드는 것이 일반적인 규칙이 아닙니다. 귀하의 경우 :

  • EsContextSparkContext(의도적으로) 직렬화 할 수없는 유형의 val이 있습니다.
  • 에 전달 된 익명 함수에서 RDD.mapEsContext.documentsAsJson,이의 또 다른 함수를 호출 EsContext인스턴스 ( mwToMap인스턴스을 따라가 SparkContext으로 보유하는 것이 직렬화하는 힘 스파크)

한 가지 가능한 해결책은 클래스 mwToMap에서 제거하는 것입니다 EsContext( 가능하면 동반 객체EsContext-객체는 정적이므로 직렬화 할 필요가 없음). 같은 성격 ( write?) 의 다른 방법이있는 경우에도 이동해야합니다. 이것은 다음과 같습니다.

import EsContext._

class EsContext(sparkConf:HadoopConfig) extends SparkBase {
   private val sc = createSCLocal("ElasticContext", sparkConf)

   def documentsAsJson(esConf: HadoopConfig): RDD[String] = { /* unchanged */ }
   def documents(esConf: HadoopConfig): RDD[EsDocument] = { /* unchanged */ }
   def shutdown() = sc.stop()
}

object EsContext {
   private def mwToMap(mw: MapWritable): Map[String, String] = { ... }
}

이러한 메서드를 옮길 수없는 경우 (즉, EsContext의 멤버 중 일부가 필요한 경우 )-실제 매핑을 수행하는 클래스를이 컨텍스트 (SparkContext를 둘러싼 일종의 래퍼처럼 보이는)에서 분리하는 것이 좋습니다. 그건 그입니다 모두 )가되어야한다고.

이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.

침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제

에서 수정
0

몇 마디 만하겠습니다

0리뷰
로그인참여 후 검토

관련 기사

분류에서Dev

역 직렬화 할 수없는 일부 개체를 포함하는 배열 역 직렬화 (역 직렬화 가능 부분 구하기)

분류에서Dev

개체 목록에 하위 요소를 역 직렬화 할 수 없습니다.

분류에서Dev

Spark : 직렬화 할 수없는 개체

분류에서Dev

매개 변수없는 생성자가 없기 때문에 개체를 직렬화 할 수 없습니다.

분류에서Dev

복잡하고 직렬화 할 수없는 개체를 Android 조각에 전달하는 방법

분류에서Dev

PostAsJsonAsync는 사전에서 개체를 직렬화 할 수 없습니다.

분류에서Dev

Spark API를 사용하여 MySQL에 쓸 수 없음-pickle.PicklingError : 개체를 직렬화 할 수 없음

분류에서Dev

PySpark Pipeline.fit (df) 메서드는 PicklingError : 개체를 직렬화 할 수 없음 : ValueError : Elephas를 사용하는 동안 하위 문자열을 찾을 수 없음

분류에서Dev

xcode에서 모델링하는 새 속성은 NSLocalizedDescription에서 발생합니다.-관리되는 개체를 직렬화 할 수 없습니다.

분류에서Dev

직렬화 된 제네릭 목록에서 개체 위치를 찾는 방법

분류에서Dev

C ++ MFC에서 이진 직렬화 된 개체를 C #에서 역 직렬화 할 수 있습니까?

분류에서Dev

MainViewModel이 직렬화 가능으로 표시되지 않았기 때문에 C # 개체를 직렬화 할 수 없습니다.

분류에서Dev

pexpect 및 다중 처리 사용 오류? 오류 "TypError : '_io.TextIOWrapper'개체를 직렬화 할 수 없습니다."

분류에서Dev

Windows Phone 7에서 개체를 직렬화하는 방법

분류에서Dev

Azure에서 모든 개체를 직렬화하지 않는 JSON

분류에서Dev

C #의 IMGUR API에서 현재 JSON 개체 오류를 역 직렬화 할 수 없습니다.

분류에서Dev

C #에서 배열 개체를 직렬화 할 수 없습니다.

분류에서Dev

컨트롤러에서 json 개체를 역 직렬화 할 수 없습니다.

분류에서Dev

C #에서 3 개의 하위 개체를 포함하는 JSON 개체 역 직렬화

분류에서Dev

직렬화 할 수없고 구문 분석 할 수없는 개체를 RemoteService에서 활동으로 보낼 수 있습니까?

분류에서Dev

파일에서 개체를 역 직렬화 및 재 직렬화하는 데 문제가있는 C # JSON.net

분류에서Dev

내 부모 개체의 하위 클래스를 직렬화 할 수 없습니다.

분류에서Dev

멤버가 직접 직렬화 할 수 없지만 str () 표현이있는 객체를 직렬화하는 방법은 무엇입니까?

분류에서Dev

현재 JSON 개체를 역 직렬화 할 수 없습니다. 그룹 열을 사용하여 현재 JSON 개체를 역 직렬화 할 수 없습니다.

분류에서Dev

보유하고있는 객체를 직렬화하지 않고 ArrayList를 직렬화 할 수 있습니까?

분류에서Dev

xml 문서를 역 직렬화 할 수 있지만 동일한 문서로 직렬화 할 수 없음

분류에서Dev

DataContractJsonSerializer를 사용하여 개체를 JSON으로 직렬화 할 수 없음

분류에서Dev

직렬화 할 수없는 개체를 수정하는 방법은 "HttpSession"개체에 저장하면 안됩니다 (squid : S2441).

분류에서Dev

C #에서 열거 형을 직렬화 할 때 어셈블리 ID 오류를 해결하는 방법은 무엇입니까?

Related 관련 기사

  1. 1

    역 직렬화 할 수없는 일부 개체를 포함하는 배열 역 직렬화 (역 직렬화 가능 부분 구하기)

  2. 2

    개체 목록에 하위 요소를 역 직렬화 할 수 없습니다.

  3. 3

    Spark : 직렬화 할 수없는 개체

  4. 4

    매개 변수없는 생성자가 없기 때문에 개체를 직렬화 할 수 없습니다.

  5. 5

    복잡하고 직렬화 할 수없는 개체를 Android 조각에 전달하는 방법

  6. 6

    PostAsJsonAsync는 사전에서 개체를 직렬화 할 수 없습니다.

  7. 7

    Spark API를 사용하여 MySQL에 쓸 수 없음-pickle.PicklingError : 개체를 직렬화 할 수 없음

  8. 8

    PySpark Pipeline.fit (df) 메서드는 PicklingError : 개체를 직렬화 할 수 없음 : ValueError : Elephas를 사용하는 동안 하위 문자열을 찾을 수 없음

  9. 9

    xcode에서 모델링하는 새 속성은 NSLocalizedDescription에서 발생합니다.-관리되는 개체를 직렬화 할 수 없습니다.

  10. 10

    직렬화 된 제네릭 목록에서 개체 위치를 찾는 방법

  11. 11

    C ++ MFC에서 이진 직렬화 된 개체를 C #에서 역 직렬화 할 수 있습니까?

  12. 12

    MainViewModel이 직렬화 가능으로 표시되지 않았기 때문에 C # 개체를 직렬화 할 수 없습니다.

  13. 13

    pexpect 및 다중 처리 사용 오류? 오류 "TypError : '_io.TextIOWrapper'개체를 직렬화 할 수 없습니다."

  14. 14

    Windows Phone 7에서 개체를 직렬화하는 방법

  15. 15

    Azure에서 모든 개체를 직렬화하지 않는 JSON

  16. 16

    C #의 IMGUR API에서 현재 JSON 개체 오류를 역 직렬화 할 수 없습니다.

  17. 17

    C #에서 배열 개체를 직렬화 할 수 없습니다.

  18. 18

    컨트롤러에서 json 개체를 역 직렬화 할 수 없습니다.

  19. 19

    C #에서 3 개의 하위 개체를 포함하는 JSON 개체 역 직렬화

  20. 20

    직렬화 할 수없고 구문 분석 할 수없는 개체를 RemoteService에서 활동으로 보낼 수 있습니까?

  21. 21

    파일에서 개체를 역 직렬화 및 재 직렬화하는 데 문제가있는 C # JSON.net

  22. 22

    내 부모 개체의 하위 클래스를 직렬화 할 수 없습니다.

  23. 23

    멤버가 직접 직렬화 할 수 없지만 str () 표현이있는 객체를 직렬화하는 방법은 무엇입니까?

  24. 24

    현재 JSON 개체를 역 직렬화 할 수 없습니다. 그룹 열을 사용하여 현재 JSON 개체를 역 직렬화 할 수 없습니다.

  25. 25

    보유하고있는 객체를 직렬화하지 않고 ArrayList를 직렬화 할 수 있습니까?

  26. 26

    xml 문서를 역 직렬화 할 수 있지만 동일한 문서로 직렬화 할 수 없음

  27. 27

    DataContractJsonSerializer를 사용하여 개체를 JSON으로 직렬화 할 수 없음

  28. 28

    직렬화 할 수없는 개체를 수정하는 방법은 "HttpSession"개체에 저장하면 안됩니다 (squid : S2441).

  29. 29

    C #에서 열거 형을 직렬화 할 때 어셈블리 ID 오류를 해결하는 방법은 무엇입니까?

뜨겁다태그

보관