Spark를 사용하여 HBase 테이블에서 가져온 데이터를 처리하려고합니다. 이 블로그 게시물 은 NewHadoopAPI
Hadoop에서 데이터를 읽는 데 사용하는 방법의 예를 제공합니다 InputFormat
.
내가 뭘 한거지
이 작업을 여러 번 수행해야 SparkContext
하므로 HBase의 주어진 열 집합에서 RDD를 얻을 수 있도록 암시 적을 사용하여을 보강하려고했습니다 . 다음 도우미를 작성했습니다.
trait HBaseReadSupport {
implicit def toHBaseSC(sc: SparkContext) = new HBaseSC(sc)
implicit def bytes2string(bytes: Array[Byte]) = new String(bytes)
}
final class HBaseSC(sc: SparkContext) extends Serializable {
def extract[A](data: Map[String, List[String]], result: Result, interpret: Array[Byte] => A) =
data map { case (cf, columns) =>
val content = columns map { column =>
val cell = result.getColumnLatestCell(cf.getBytes, column.getBytes)
column -> interpret(CellUtil.cloneValue(cell))
} toMap
cf -> content
}
def makeConf(table: String) = {
val conf = HBaseConfiguration.create()
conf.setBoolean("hbase.cluster.distributed", true)
conf.setInt("hbase.client.scanner.caching", 10000)
conf.set(TableInputFormat.INPUT_TABLE, table)
conf
}
def hbase[A](table: String, data: Map[String, List[String]])
(interpret: Array[Byte] => A) =
sc.newAPIHadoopRDD(makeConf(table), classOf[TableInputFormat],
classOf[ImmutableBytesWritable], classOf[Result]) map { case (key, row) =>
Bytes.toString(key.get) -> extract(data, row, interpret)
}
}
다음과 같이 사용할 수 있습니다.
val rdd = sc.hbase[String](table, Map(
"cf" -> List("col1", "col2")
))
이 경우 (String, Map[String, Map[String, String]])
첫 번째 구성 요소는 rowkey이고 두 번째 구성 요소는 키가 column family이고 값은 키가 열이고 내용이 셀 값인 맵인 맵인 RDD를 얻습니다 .
실패한 곳
불행히도 내 직업은 sc
. 내가 작업을 실행할 때 얻는 것은
Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
도우미 클래스를 제거하고 내 작업에서 동일한 논리를 인라인으로 사용할 수 있으며 모든 것이 정상적으로 실행됩니다. 하지만 동일한 상용구를 반복해서 작성하는 대신 재사용 할 수있는 것을 얻고 싶습니다.
그건 그렇고,이 문제는 암시적인 것에 만 국한된 것이 아니며, 기능을 사용하더라도 sc
동일한 문제 를 나타냅니다.
비교를 위해 TSV 파일을 읽는 다음 도우미 (인용문 등을 지원하지 않기 때문에 깨 졌다는 것을 알고 있습니다)가 제대로 작동하는 것 같습니다.
trait TsvReadSupport {
implicit def toTsvRDD(sc: SparkContext) = new TsvRDD(sc)
}
final class TsvRDD(val sc: SparkContext) extends Serializable {
def tsv(path: String, fields: Seq[String], separator: Char = '\t') = sc.textFile(path) map { line =>
val contents = line.split(separator).toList
(fields, contents).zipped.toMap
}
}
의도하지 않게 SparkContext를 캡처하지 않고 HBase에서 행을 읽는 논리를 캡슐화하려면 어떻게해야합니까?
변수 에 @transient
주석을 추가 하기 만하면 됩니다 sc
.
final class HBaseSC(@transient val sc: SparkContext) extends Serializable {
...
}
워커에서 사용할 수 없으므로 함수 sc
내에서 사용되지 않는지 확인하십시오 extract
.
분산 계산 내에서 Spark 컨텍스트에 액세스해야하는 경우 rdd.context
함수를 사용할 수 있습니다.
val rdd = sc.newAPIHadoopRDD(...)
rdd map {
case (k, v) =>
val ctx = rdd.context
....
}
이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.
침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제
몇 마디 만하겠습니다