Spark Parallel Stream - 对象不可序列化

纳纳

我正在使用 Spark 的多输入流阅读器从 Kafka 读取消息。我收到下面提到的错误。如果我不使用多输入流阅读器,则不会出现任何错误。为了实现性能,我需要使用并行概念,测试目的我只使用一个。

错误

java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
    - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = test, partition = 0, offset = 120, CreateTime = -1, checksum = 2372777361, serialized key size = -1, serialized value size = 48, key = null, value = 10051,2018-03-15 17:12:24+0000,Bentonville,Gnana))
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.SerializationStream.writeValue(Serializer.scala:134)
    at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:239)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
18/03/15 17:12:24 ERROR TaskSetManager: Task 0.0 in stage 470.0 (TID 470) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord

代码:

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.Success
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Milliseconds, StreamingContext}

object ParallelStreamJob {

  def main(args: Array[String]): Unit = {
    val spark = SparkHelper.getOrCreateSparkSession()
    val ssc = new StreamingContext(spark.sparkContext, Milliseconds(50))
    val kafkaStream = {

      val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> "localhost:9092",
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> "use_a_separate_group_id_for_each_stream",
        "auto.offset.reset" -> "latest",
        "enable.auto.commit" -> (false: java.lang.Boolean)
      )

      val topics = Array("test")
      val numPartitionsOfInputTopic = 1
      val streams = (1 to numPartitionsOfInputTopic) map { _ =>
        KafkaUtils.createDirectStream[String, String](
          ssc,
          PreferConsistent,
          Subscribe[String, String](topics, kafkaParams)
        )
      }
      val unifiedStream = ssc.union(streams)
      val sparkProcessingParallelism = 1 
      unifiedStream.repartition(sparkProcessingParallelism)
    }

    kafkaStream.foreachRDD(rdd=> {
      rdd.foreach(conRec=> {
        println(conRec.value())
      })
    })

    println(" Spark parallel reader is ready !!!")

   ssc.start()
    ssc.awaitTermination()

  }
}

sbt

scalaVersion := "2.11.8"
val sparkVersion = "2.2.0"
val connectorVersion = "2.0.7"
val kafka_stream_version = "1.6.3"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion ,
  "org.apache.spark" %% "spark-sql" % sparkVersion  ,
  "org.apache.spark" %% "spark-hive" % sparkVersion  ,
  "com.datastax.spark" %% "spark-cassandra-connector" % connectorVersion  ,
  "org.apache.kafka" %% "kafka" % "0.10.1.0",
  "org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion,
  "org.apache.spark" %% "spark-streaming" %  sparkVersion  ,
)

如何解决这个问题?

唐阮

问题很清楚java.io.NotSerializableException:org.apache.kafka.clients.consumer.ConsumerRecordConsumerRecord类不延长Serializable

试着拿出valueConsumerRecordforeachRdd操作kafkaStream.map(_.value())

更新 1:上述修复不起作用,因为异常发生在ssc.union(streams). ssc.union(streams)需要节点之间的数据传输,它必须序列化数据。因此,您可以操作取出value字段以解决问题。mapunion

KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParam) ).map(_.value())

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

Spark:对象不可序列化

来自分类Dev

由于不可序列化的对象,Spark 作业失败

来自分类Dev

解耦不可序列化的对象以避免Spark中的序列化错误

来自分类Dev

Spark和不可序列化的DateTimeFormatter

来自分类Dev

Apache Spark任务不可序列化

来自分类Dev

spark-Kafka流式传输异常-对象不可序列化

来自分类Dev

JSON 解析器的 Apache Spark 对象不可序列化异常

来自分类Dev

Java的Apache Spark对象序列化

来自分类Dev

Spark Task不可序列化(案例类)

来自分类Dev

通过Spark写入HBase:任务不可序列化

来自分类Dev

Spark上的Redis:任务不可序列化

来自分类Dev

Spark上的Redis:任务不可序列化

来自分类Dev

Spark SQL UDF任务不可序列化

来自分类Dev

Twitter Spark流过滤:任务不可序列化异常

来自分类Dev

Spark Accumulator抛出“任务不可序列化”错误

来自分类Dev

任务在Spark中产生了不可序列化的结果

来自分类Dev

Databricks Apache Spark 1.4:任务不可序列化(Scala)

来自分类Dev

Spark:任务不可序列化(广播/ RDD / SparkContext)

来自分类Dev

Spark SQL UDF任务不可序列化

来自分类Dev

带序列化的Scala反射(通过Spark)-符号不可序列化

来自分类Dev

带序列化的Scala反射(通过Spark)-符号不可序列化

来自分类Dev

Spark HBase连接错误:对象不可序列化类:org.apache.hadoop.hbase.client.Result

来自分类Dev

类扩展可序列化时,Apache Spark任务不可序列化

来自分类Dev

类扩展可序列化时,Apache Spark任务不可序列化

来自分类Dev

Spark rawcomparator上序列化对象的比较

来自分类Dev

Spark序列化异常

来自分类Dev

集成Spark SQL和Spark流时出现不可序列化异常

来自分类Dev

集成Spark SQL和Spark流时出现不可序列化的异常

来自分类Dev

RDD不可序列化Cassandra / Spark连接器Java API

Related 相关文章

  1. 1

    Spark:对象不可序列化

  2. 2

    由于不可序列化的对象,Spark 作业失败

  3. 3

    解耦不可序列化的对象以避免Spark中的序列化错误

  4. 4

    Spark和不可序列化的DateTimeFormatter

  5. 5

    Apache Spark任务不可序列化

  6. 6

    spark-Kafka流式传输异常-对象不可序列化

  7. 7

    JSON 解析器的 Apache Spark 对象不可序列化异常

  8. 8

    Java的Apache Spark对象序列化

  9. 9

    Spark Task不可序列化(案例类)

  10. 10

    通过Spark写入HBase:任务不可序列化

  11. 11

    Spark上的Redis:任务不可序列化

  12. 12

    Spark上的Redis:任务不可序列化

  13. 13

    Spark SQL UDF任务不可序列化

  14. 14

    Twitter Spark流过滤:任务不可序列化异常

  15. 15

    Spark Accumulator抛出“任务不可序列化”错误

  16. 16

    任务在Spark中产生了不可序列化的结果

  17. 17

    Databricks Apache Spark 1.4:任务不可序列化(Scala)

  18. 18

    Spark:任务不可序列化(广播/ RDD / SparkContext)

  19. 19

    Spark SQL UDF任务不可序列化

  20. 20

    带序列化的Scala反射(通过Spark)-符号不可序列化

  21. 21

    带序列化的Scala反射(通过Spark)-符号不可序列化

  22. 22

    Spark HBase连接错误:对象不可序列化类:org.apache.hadoop.hbase.client.Result

  23. 23

    类扩展可序列化时,Apache Spark任务不可序列化

  24. 24

    类扩展可序列化时,Apache Spark任务不可序列化

  25. 25

    Spark rawcomparator上序列化对象的比较

  26. 26

    Spark序列化异常

  27. 27

    集成Spark SQL和Spark流时出现不可序列化异常

  28. 28

    集成Spark SQL和Spark流时出现不可序列化的异常

  29. 29

    RDD不可序列化Cassandra / Spark连接器Java API

热门标签

归档