我正在尝试实现一个使用自定义接收器从 SQS 读取消息的流作业。每条消息都包含对 S3 文件的单个引用,然后我想读取、解析该文件并将其存储为 ORC。
这是我到目前为止的代码:
val sc = new SparkContext(conf)
val streamContext = new StreamingContext(sc, Seconds(5))
val sqs = streamContext.receiverStream(new SQSReceiver("events-elb")
.credentials("accessKey", "secretKey")
.at(Regions.US_EAST_1)
.withTimeout(5))
val s3File = sqs.map(messages => {
val sqsMsg: JsValue = Json.parse(messages)
val s3Key = "s3://" +
Json.stringify(sqsMsg("Records")(0)("s3")("bucket")("name")).replace("\"", "") + "/" +
Json.stringify(sqsMsg("Records")(0)("s3")("object")("key")).replace("\"", "")
val rawLogs = sc.textFile(s3Key)
rawLogs
}).saveAsTextFiles("/tmp/output")
不幸的是,这失败并出现以下错误:
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
- object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@52fc5eb1)
- field (class: SparrowOrc$$anonfun$1, name: sc$1, type: class org.apache.spark.SparkContext)
- object (class SparrowOrc$$anonfun$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
这是一种不正确的使用方式sc.textFile
吗?如果是这样,我可以使用什么方法将从 SQS 收到的每个文件路径转发到文件阅读器进行处理?
FWIW,val s3File
最终是mappedDStream
.
对于进一步的上下文,我使用它作为我的接收器:https : //github.com/imapi/spark-sqs-receiver。
实际上,我们不能sparkContext
在map
操作中使用,因为在一个阶段转换的闭包在没有SparkContext
定义的执行器中运行。
解决这个问题的方法是将过程分为两部分:首先,我们使用现有的来计算文件map
,但要textFile
在transform
操作中使用:
val s3Keys = sqs.map(messages => {
val sqsMsg: JsValue = Json.parse(messages)
val s3Key = "s3://" +
Json.stringify(sqsMsg("Records")(0)("s3")("bucket")("name")).replace("\"", "") + "/" +
Json.stringify(sqsMsg("Records")(0)("s3")("object")("key")).replace("\"", "")
}
val files DStream = s3Keys.transform{keys =>
val fileKeys= keys.collect()
Val files = fileKeys.map(f=>
sparkContext.textFile(f))
sparkContext.union(files)
}
filesDStream.saveAsTextFiles(..)
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句