직렬화 문제없이 SparkContext 강화

안드레아

Spark를 사용하여 HBase 테이블에서 가져온 데이터를 처리하려고합니다. 이 블로그 게시물NewHadoopAPIHadoop에서 데이터를 읽는 데 사용하는 방법의 예를 제공합니다 InputFormat.

내가 뭘 한거지

이 작업을 여러 번 수행해야 SparkContext하므로 HBase의 주어진 열 집합에서 RDD를 얻을 수 있도록 암시 적을 사용하여을 보강하려고했습니다 . 다음 도우미를 작성했습니다.

trait HBaseReadSupport {
  implicit def toHBaseSC(sc: SparkContext) = new HBaseSC(sc)

  implicit def bytes2string(bytes: Array[Byte]) = new String(bytes)
}


final class HBaseSC(sc: SparkContext) extends Serializable {
  def extract[A](data: Map[String, List[String]], result: Result, interpret: Array[Byte] => A) =
    data map { case (cf, columns) =>
      val content = columns map { column =>
        val cell = result.getColumnLatestCell(cf.getBytes, column.getBytes)

        column -> interpret(CellUtil.cloneValue(cell))
      } toMap

      cf -> content
    }

  def makeConf(table: String) = {
    val conf = HBaseConfiguration.create()

    conf.setBoolean("hbase.cluster.distributed", true)
    conf.setInt("hbase.client.scanner.caching", 10000)
    conf.set(TableInputFormat.INPUT_TABLE, table)

    conf
  }

  def hbase[A](table: String, data: Map[String, List[String]])
    (interpret: Array[Byte] => A) =

    sc.newAPIHadoopRDD(makeConf(table), classOf[TableInputFormat],
      classOf[ImmutableBytesWritable], classOf[Result]) map { case (key, row) =>
        Bytes.toString(key.get) -> extract(data, row, interpret)
      }

}

다음과 같이 사용할 수 있습니다.

val rdd = sc.hbase[String](table, Map(
  "cf" -> List("col1", "col2")
))

이 경우 (String, Map[String, Map[String, String]])첫 번째 구성 요소는 rowkey이고 두 번째 구성 요소는 키가 column family이고 값은 키가 열이고 내용이 셀 값인 맵인 맵인 RDD를 얻습니다 .

실패한 곳

불행히도 내 직업은 sc. 내가 작업을 실행할 때 얻는 것은

Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)

도우미 클래스를 제거하고 내 작업에서 동일한 논리를 인라인으로 사용할 수 있으며 모든 것이 정상적으로 실행됩니다. 하지만 동일한 상용구를 반복해서 작성하는 대신 재사용 할 수있는 것을 얻고 싶습니다.

그건 그렇고,이 문제는 암시적인 것에 만 국한된 것이 아니며, 기능을 사용하더라도 sc동일한 문제 나타냅니다.

비교를 위해 TSV 파일을 읽는 다음 도우미 (인용문 등을 지원하지 않기 때문에 깨 졌다는 것을 알고 있습니다)가 제대로 작동하는 것 같습니다.

trait TsvReadSupport {
  implicit def toTsvRDD(sc: SparkContext) = new TsvRDD(sc)
}

final class TsvRDD(val sc: SparkContext) extends Serializable {
  def tsv(path: String, fields: Seq[String], separator: Char = '\t') = sc.textFile(path) map { line =>
    val contents = line.split(separator).toList

    (fields, contents).zipped.toMap
  }
}

의도하지 않게 SparkContext를 캡처하지 않고 HBase에서 행을 읽는 논리를 캡슐화하려면 어떻게해야합니까?

들불

변수 @transient주석을 추가 하기 만하면 됩니다 sc.

final class HBaseSC(@transient val sc: SparkContext) extends Serializable {
  ...
}

워커에서 사용할 수 없으므로 함수 sc내에서 사용되지 않는지 확인하십시오 extract.

분산 계산 내에서 Spark 컨텍스트에 액세스해야하는 경우 rdd.context함수를 사용할 수 있습니다.

val rdd = sc.newAPIHadoopRDD(...)
rdd map {
  case (k, v) => 
    val ctx = rdd.context
    ....
}

이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.

침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제

에서 수정
0

몇 마디 만하겠습니다

0리뷰
로그인참여 후 검토

관련 기사

분류에서Dev

Spark : 작업을 직렬화 할 수 없음 (Broadcast / RDD / SparkContext)

분류에서Dev

직렬화 문제

분류에서Dev

로컬 akka 직렬화 강제

분류에서Dev

plpgsql로 직렬화 / 직렬화 해제

분류에서Dev

RestSharp 역 직렬화 문제

분류에서Dev

RestSharp 역 직렬화 문제

분류에서Dev

역 직렬화 문제

분류에서Dev

이상한 Java 직렬화 문제

분류에서Dev

데이터 직렬화 문제

분류에서Dev

C # / JSON 개체 직렬화, 역 직렬화 및 대 / 소문자 문제 없음

분류에서Dev

C # JSON 직렬화 / 비 직렬화-INPUT 문제

분류에서Dev

C # JSON 직렬화 / 비 직렬화-INPUT 문제

분류에서Dev

제네릭 직렬화 Typescript

분류에서Dev

XmlObjectSerializer 문서 역 직렬화

분류에서Dev

Gson : Jackson이 직렬화 한 시간 직렬화 해제

분류에서Dev

XML 직렬화 저장 컬렉션을 별도의 이진 파일로 강화

분류에서Dev

이진 직렬화 InvalidCastException

분류에서Dev

HashSet 직렬화 / 복제 문제

분류에서Dev

Protobuf 맵 직렬화 해제 문제

분류에서Dev

다 대다 필드 직렬화 문제

분류에서Dev

XML 파일 직렬화 문제

분류에서Dev

@ManyToMany 관계의 Jackson 직렬화 문제

분류에서Dev

C # XML 역 직렬화 문제

분류에서Dev

SignalR 인수 직렬화 문제

분류에서Dev

SignalR 인수 직렬화 문제

분류에서Dev

문자열 직렬화 해제

분류에서Dev

WCF 개체 직렬화 문제

분류에서Dev

자바-직렬화-EOFException 문제

분류에서Dev

Spring JSON 역 직렬화 문자 제한