다음 클래스는 Elasticsearch에서 읽기를 시도하고 반환 된 문서를 인쇄하는 주요 함수를 포함합니다.
object TopicApp extends Serializable {
def run() {
val start = System.currentTimeMillis()
val sparkConf = new Configuration()
sparkConf.set("spark.executor.memory","1g")
sparkConf.set("spark.kryoserializer.buffer","256")
val es = new EsContext(sparkConf)
val esConf = new Configuration()
esConf.set("es.nodes","localhost")
esConf.set("es.port","9200")
esConf.set("es.resource", "temp_index/some_doc")
esConf.set("es.query", "?q=*:*")
esConf.set("es.fields", "_score,_id")
val documents = es.documents(esConf)
documents.foreach(println)
val end = System.currentTimeMillis()
println("Total time: " + (end-start) + " ms")
es.shutdown()
}
def main(args: Array[String]) {
run()
}
}
다음 클래스는 반환 된 문서를 JSON으로 변환합니다. org.json4s
class EsContext(sparkConf:HadoopConfig) extends SparkBase {
private val sc = createSCLocal("ElasticContext", sparkConf)
def documentsAsJson(esConf:HadoopConfig):RDD[String] = {
implicit val formats = DefaultFormats
val source = sc.newAPIHadoopRDD(
esConf,
classOf[EsInputFormat[Text, MapWritable]],
classOf[Text],
classOf[MapWritable]
)
val docs = source.map(
hit => {
val doc = Map("ident" -> hit._1.toString) ++ mwToMap(hit._2)
write(doc)
}
)
docs
}
def shutdown() = sc.stop()
// mwToMap() converts MapWritable to Map
}
다음 클래스 SparkContext
는 응용 프로그램 의 로컬 을 만듭니다 .
trait SparkBase extends Serializable {
protected def createSCLocal(name:String, config:HadoopConfig):SparkContext = {
val iterator = config.iterator()
for (prop <- iterator) {
val k = prop.getKey
val v = prop.getValue
if (k.startsWith("spark."))
System.setProperty(k, v)
}
val runtime = Runtime.getRuntime
runtime.gc()
val conf = new SparkConf()
conf.setMaster("local[2]")
conf.setAppName(name)
conf.set("spark.serializer", classOf[KryoSerializer].getName)
conf.set("spark.ui.port", "0")
new SparkContext(conf)
}
}
실행할 때 TopicApp
다음과 같은 오류가 발생합니다.
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.map(RDD.scala:323)
at TopicApp.EsContext.documents(EsContext.scala:51)
at TopicApp.TopicApp$.run(TopicApp.scala:28)
at TopicApp.TopicApp$.main(TopicApp.scala:39)
at TopicApp.TopicApp.main(TopicApp.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
- object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@14f70e7d)
- field (class: TopicApp.EsContext, name: sc, type: class org.apache.spark.SparkContext)
- object (class TopicApp.EsContext, TopicApp.EsContext@2cf77cdc)
- field (class: TopicApp.EsContext$$anonfun$documents$1, name: $outer, type: class TopicApp.EsContext)
- object (class TopicApp.EsContext$$anonfun$documents$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
... 13 more
비슷한 문제를 다루는 다른 게시물을 살펴보면 클래스를 만들 Serializable
거나 직렬화 할 수없는 객체를 클래스에서 분리하는 것이 좋습니다 .
내가 가지고있는 오류에서 나는 것을 유추 SparkContext
즉 sc
SparkContext가 직렬화 클래스 아니므로 비 직렬화입니다.
애플리케이션이 올바르게 실행되도록 SparkContext를 어떻게 분리해야합니까?
확실히 프로그램을 실행할 수는 없지만 RDD의 데이터에서 실행해야하는 경우 직렬화 할 수없는 클래스의 멤버를 참조하는 익명 함수를 만드는 것이 일반적인 규칙이 아닙니다. 귀하의 경우 :
EsContext
SparkContext
(의도적으로) 직렬화 할 수없는 유형의 val이 있습니다.RDD.map
의 EsContext.documentsAsJson
,이의 또 다른 함수를 호출 EsContext
인스턴스 ( mwToMap
인스턴스을 따라가 SparkContext으로 보유하는 것이 직렬화하는 힘 스파크)한 가지 가능한 해결책은 클래스 mwToMap
에서 제거하는 것입니다 EsContext
( 가능하면 동반 객체 로 EsContext
-객체는 정적이므로 직렬화 할 필요가 없음). 같은 성격 ( write
?) 의 다른 방법이있는 경우에도 이동해야합니다. 이것은 다음과 같습니다.
import EsContext._
class EsContext(sparkConf:HadoopConfig) extends SparkBase {
private val sc = createSCLocal("ElasticContext", sparkConf)
def documentsAsJson(esConf: HadoopConfig): RDD[String] = { /* unchanged */ }
def documents(esConf: HadoopConfig): RDD[EsDocument] = { /* unchanged */ }
def shutdown() = sc.stop()
}
object EsContext {
private def mwToMap(mw: MapWritable): Map[String, String] = { ... }
}
이러한 메서드를 옮길 수없는 경우 (즉, EsContext
의 멤버 중 일부가 필요한 경우 )-실제 매핑을 수행하는 클래스를이 컨텍스트 (SparkContext를 둘러싼 일종의 래퍼처럼 보이는)에서 분리하는 것이 좋습니다. 그건 그입니다 모두 )가되어야한다고.
이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.
침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제
몇 마디 만하겠습니다