在Spark Streaming中从Kafka反序列化Avro格式的数据会给出空字符串和0长时间

传输

我正在努力反序列化来自Spark Streaming中来自Kafka的Avro序列化数据。

这是我通过spark-submit运行的文件:

package com.example.mymessage

import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.io.DecoderFactory
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object MyMessageCount extends Logging {
  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: MyMessageCount <zkQuorum> <group> <topics> <numThreads>")
      System.exit(1)
    }

    val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
    if (!log4jInitialized) {
      logInfo("Setting log level to [WARN]." +
        " To override add a custom log4j.properties to the classpath.")
      Logger.getRootLogger.setLevel(Level.WARN)
    }

    val Array(zkQuorum, group, topics, numThreads) = args
    val sparkConf = new SparkConf().setMaster("local[4]").setAppName("MyMessageCount")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    ssc.checkpoint("checkpoint")

    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)

    lines.foreachRDD(rdd => {
      rdd.foreach(avroRecord => {
        val schemaString = "{\"type\":\"record\",\"name\":\"myrecord\",\"fields\":[{\"name\":\"string\",\"type\":\"string\"},{\"name\":\"long\",\"type\":\"long\"}]}"
        val parser = new Schema.Parser()
        val schema = parser.parse(schemaString)
        val reader = new GenericDatumReader[GenericRecord](schema)

        val decoder = DecoderFactory.get.binaryDecoder(avroRecord.toCharArray.map(_.toByte), null)
        val record: GenericRecord = reader.read(null, decoder)

        System.out.println(avroRecord + "," + record.toString 
          + ", string= " + record.get("string")
          + ", long=" + record.get("long"))
      })
    })

    ssc.start()
    ssc.awaitTermination()
  }
}

我一直在使用Confluent平台在本地发送数据。

如果我发送:

{"string":"test","long":30}

然后上面的代码输出:

test<,{"string": "", "long": 0}, string= , long=0

这向我表明数据正在通过,但是由于某种原因,字符串和long值作为看起来像默认值的值出现。如何访问avroRecord来自Kafka的真实“字符串”和“长”值

传输

为此,将Confluent的KafkaAvroDecoder与直接流结合使用即可。

import io.confluent.kafka.serializers.KafkaAvroDecoder

...

val kafkaParams = Map[String, String]("metadata.broker.list" -> zkQuorum,
  "schema.registry.url" -> schemaRegistry,
  "auto.offset.reset" -> "smallest")
val topicSet = Set(topics)
val messages = KafkaUtils.createDirectStream[Object, Object, KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicSet).map(_._2)

val lines = messages.foreachRDD(rdd => {
  rdd.foreach({ avroRecord =>
    println(avroRecord)
  })
})

我发现一个单独的问题,我只能导入版本1,而不能导入更新的版本。

libraryDependencies ++= Seq(
  "io.confluent" % "kafka-avro-serializer" % "1.0",
  ...
)

resolvers ++= Seq(
  Resolver.sonatypeRepo("public"),
  Resolver.url("confluent", url("http://packages.confluent.io/maven/"))
)

更新以下内容用于获取kafka-avro-serializer的最新版本。

libraryDependencies ++= Seq(
  "io.confluent" % "kafka-avro-serializer" % "3.0.0",
  ...
)

resolvers ++= Seq(
  Resolver.sonatypeRepo("public"),
  "Confluent Maven Repo" at "http://packages.confluent.io/maven/"
)

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

无法使用Spark结构化流反序列化Avro消息,其中键是字符串序列化,而值是Avro

来自分类Dev

Spark和Spark Streaming中的时间序列预测

来自分类Dev

由于UTFDataFormatException引起的任务无法在Spark中序列化:编码字符串太长

来自分类Dev

Spark Streaming + Spark SQL

来自分类Dev

Spark Streaming + Spark SQL

来自分类Dev

spark-jobserver序列化格式

来自分类Dev

Spark和不可序列化的DateTimeFormatter

来自分类Dev

spark组和序列化列表

来自分类Dev

在Spark 2.0.1中读取和写入空字符串“” vs NULL

来自分类Dev

Spark序列化异常

来自分类Dev

Kafka和TextSocket Stream中的Spark Streaming数据分发

来自分类Dev

使用Spark Streaming读取Kafka记录时出现不可序列化的异常

来自分类Dev

Spark程序性能-GC和任务反序列化和并发执行

来自分类Dev

Spark Streaming Kafka流

来自分类Dev

在Spark Streaming(Spark 2.0)中使用Kafka

来自分类Dev

Spark:由于自定义字符串比较,在Filter命令期间无法序列化任务

来自分类Dev

WebAPI将反序列化映射空字符串反序列化为“ null”

来自分类Dev

Spark CSV writer输出空字符串的双引号

来自分类常见问题

将嵌套的null值转换为数据帧内部的空字符串spark

来自分类Dev

写入spark数据框时用空字符串替换null

来自分类Dev

在Spark / Scala中组合字符串

来自分类Dev

PySpark中的Spark Pivot字符串

来自分类Dev

Pyspark替换Spark数据框列中的字符串

来自分类Dev

在数据集Apache Spark中拆分字符串

来自分类Dev

rdd 与数据帧 Spark 上的序列化

来自分类Dev

Apache Spark任务不可序列化

来自分类Dev

Java的Apache Spark对象序列化

来自分类Dev

Spark:对象不可序列化

来自分类Dev

Spark 流 + Accumulo - 序列化 BatchWriterImpl

Related 相关文章

热门标签

归档