SparkContext.textFile 可以与自定义接收器一起使用吗?

jpavs

我正在尝试实现一个使用自定义接收器从 SQS 读取消息的流作业。每条消息都包含对 S3 文件的单个引用,然后我想读取、解析该文件并将其存储为 ORC。

这是我到目前为止的代码:

val sc = new SparkContext(conf)
val streamContext = new StreamingContext(sc, Seconds(5))

val sqs = streamContext.receiverStream(new SQSReceiver("events-elb")
  .credentials("accessKey", "secretKey")
  .at(Regions.US_EAST_1)
  .withTimeout(5))

val s3File = sqs.map(messages => {
  val sqsMsg: JsValue = Json.parse(messages)
  val s3Key = "s3://" +
    Json.stringify(sqsMsg("Records")(0)("s3")("bucket")("name")).replace("\"", "") + "/" +
    Json.stringify(sqsMsg("Records")(0)("s3")("object")("key")).replace("\"", "")
  val rawLogs = sc.textFile(s3Key)

  rawLogs
}).saveAsTextFiles("/tmp/output")

不幸的是,这失败并出现以下错误:

Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
    - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@52fc5eb1)
    - field (class: SparrowOrc$$anonfun$1, name: sc$1, type: class org.apache.spark.SparkContext)
    - object (class SparrowOrc$$anonfun$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)

这是一种不正确的使用方式sc.textFile吗?如果是这样,我可以使用什么方法将从 SQS 收到的每个文件路径转发到文件阅读器进行处理?

FWIW,val s3File最终是mappedDStream.

对于进一步的上下文,我使用它作为我的接收器:https : //github.com/imapi/spark-sqs-receiver

马士革

实际上,我们不能sparkContextmap操作中使用,因为在一个阶段转换的闭包在没有SparkContext定义的执行器中运行

解决这个问题的方法是将过程分为两部分:首先,我们使用现有的来计算文件map,但要textFiletransform操作中使用:

val s3Keys = sqs.map(messages => {
  val sqsMsg: JsValue = Json.parse(messages)
  val s3Key = "s3://" +
  Json.stringify(sqsMsg("Records")(0)("s3")("bucket")("name")).replace("\"", "") + "/" +
  Json.stringify(sqsMsg("Records")(0)("s3")("object")("key")).replace("\"", "")
}
val files DStream = s3Keys.transform{keys => 
    val fileKeys= keys.collect()
    Val files = fileKeys.map(f=>
      sparkContext.textFile(f))
    sparkContext.union(files)
}
filesDStream.saveAsTextFiles(..)

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

SparkContext.textFile如何在幕后工作?

来自分类Dev

'SparkContext'对象没有属性'textfile'

来自分类Dev

为什么SparkContext.textFile的partition参数不生效?

来自分类Dev

Spark: how to use SparkContext.textFile for local file system

来自分类Dev

Spark:如何将SparkContext.textFile用于本地文件系统

来自分类Dev

在Spark中,参数“ minPartitions”在SparkContext.textFile(path,minPartitions)中有什么作用?

来自分类Dev

如何使用自定义属性初始化 SparkContext?

来自分类Dev

使用 Scio 将 SCollection 从 textFile 放入 BigQuery

来自分类Dev

如何在Spark TextFile函数中使用自定义换行符?

来自分类Dev

我可以将Twisted GTK Reactor与UDP接收器一起使用吗?

来自分类Dev

GridSearchCV可以与自定义分类器一起使用吗?

来自分类Dev

何时使用SPARK_CLASSPATH或SparkContext.addJar

来自分类Dev

何时使用SPARK_CLASSPATH或SparkContext.addJar

来自分类Dev

如何使用spark sc.textFile获取文件名?

来自分类Dev

如何使用textFile来避免“堆空间不足”?

来自分类Dev

我可以在控制器MVC上获取与数据注释一起使用的自定义错误消息吗?

来自分类常见问题

SparkContext.setCheckpointDir(hdfsPath)可以在不同的Spark应用程序中设置相同的hdfsPath吗?

来自分类Dev

SparkContext.setCheckpointDir(hdfsPath)可以在不同的Spark应用程序中设置相同的hdfsPath吗?

来自分类Dev

可以将自定义信任库与SOAPConnection一起使用吗?

来自分类Dev

validate_uniqueness_of可以与自定义范围一起使用吗?

来自分类Dev

将Webhook与Docusign一起使用-我可以获取自定义字段吗?

来自分类Dev

模板可以与 Aurelia 中的自定义属性一起使用吗?

来自分类Dev

使用appSettings配置自定义Serilog接收器

来自分类Dev

SparkContext对象没有属性esRDD(elasticsearch-spark连接器)

来自分类Dev

在同一个JVM中检测到多个SparkContext

来自分类Dev

在同一母版下的Java和R Apps之间共享SparkContext

来自分类Dev

此JVM中可能仅运行一个SparkContext-Flask

来自分类Dev

引起原因:java.io.NotSerializableException:org.apache.spark.SparkContext-在Spark中使用JdbcRDD时

来自分类Dev

使用远程SparkContext在纱线上运行spark作业:Yarn应用程序已结束

Related 相关文章

  1. 1

    SparkContext.textFile如何在幕后工作?

  2. 2

    'SparkContext'对象没有属性'textfile'

  3. 3

    为什么SparkContext.textFile的partition参数不生效?

  4. 4

    Spark: how to use SparkContext.textFile for local file system

  5. 5

    Spark:如何将SparkContext.textFile用于本地文件系统

  6. 6

    在Spark中,参数“ minPartitions”在SparkContext.textFile(path,minPartitions)中有什么作用?

  7. 7

    如何使用自定义属性初始化 SparkContext?

  8. 8

    使用 Scio 将 SCollection 从 textFile 放入 BigQuery

  9. 9

    如何在Spark TextFile函数中使用自定义换行符?

  10. 10

    我可以将Twisted GTK Reactor与UDP接收器一起使用吗?

  11. 11

    GridSearchCV可以与自定义分类器一起使用吗?

  12. 12

    何时使用SPARK_CLASSPATH或SparkContext.addJar

  13. 13

    何时使用SPARK_CLASSPATH或SparkContext.addJar

  14. 14

    如何使用spark sc.textFile获取文件名?

  15. 15

    如何使用textFile来避免“堆空间不足”?

  16. 16

    我可以在控制器MVC上获取与数据注释一起使用的自定义错误消息吗?

  17. 17

    SparkContext.setCheckpointDir(hdfsPath)可以在不同的Spark应用程序中设置相同的hdfsPath吗?

  18. 18

    SparkContext.setCheckpointDir(hdfsPath)可以在不同的Spark应用程序中设置相同的hdfsPath吗?

  19. 19

    可以将自定义信任库与SOAPConnection一起使用吗?

  20. 20

    validate_uniqueness_of可以与自定义范围一起使用吗?

  21. 21

    将Webhook与Docusign一起使用-我可以获取自定义字段吗?

  22. 22

    模板可以与 Aurelia 中的自定义属性一起使用吗?

  23. 23

    使用appSettings配置自定义Serilog接收器

  24. 24

    SparkContext对象没有属性esRDD(elasticsearch-spark连接器)

  25. 25

    在同一个JVM中检测到多个SparkContext

  26. 26

    在同一母版下的Java和R Apps之间共享SparkContext

  27. 27

    此JVM中可能仅运行一个SparkContext-Flask

  28. 28

    引起原因:java.io.NotSerializableException:org.apache.spark.SparkContext-在Spark中使用JdbcRDD时

  29. 29

    使用远程SparkContext在纱线上运行spark作业:Yarn应用程序已结束

热门标签

归档