簡単なアプリケーションでSparkStreamingをKafkaに接続しようとしています。このアプリケーションは、Sparkのドキュメントの例で作成しました。実行しようとすると、次のような例外が発生します。
Exception in thread "main" java.lang.ExceptionInInitializerError
at org.apache.spark.streaming.dstream.InputDStream.<init>(InputDStream.scala:80)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.<init>(DirectKafkaInputDStream.scala:59)
at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:147)
at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:124)
at producer.KafkaProducer$.main(KafkaProducer.scala:36)
at producer.KafkaProducer.main(KafkaProducer.scala)
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Incompatible Jackson version: 2.9.4
at com.fasterxml.jackson.module.scala.JacksonModule$class.setupModule(JacksonModule.scala:64)
at com.fasterxml.jackson.module.scala.DefaultScalaModule.setupModule(DefaultScalaModule.scala:19)
at com.fasterxml.jackson.databind.ObjectMapper.registerModule(ObjectMapper.java:751)
at org.apache.spark.rdd.RDDOperationScope$.<init>(RDDOperationScope.scala:82)
これが私のコードです:
object KafkaProducer {
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("KafkaSparkStreaming")
.master("local[*]")
.getOrCreate()
val ssc = new StreamingContext(spark.sparkContext, Seconds(3))
val topics = Array("topic1", "topic2")
def kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "1",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val lines = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
lines.map(_.key())
ssc.start()
ssc.awaitTermination()
問題が構成にあるのか、コード自体にあるのかわかりません。build.sbtファイルは次のようになります。
scalaVersion := "2.11.4"
resolvers += "Spark Packages Repo" at "http://dl.bintray.com/spark-packages/maven"
libraryDependencies ++= Seq(
"org.apache.kafka" %% "kafka" % "1.1.0",
"org.apache.spark" %% "spark-core" % "2.3.0",
"org.apache.spark" %% "spark-sql" % "2.3.0",
"org.apache.spark" %% "spark-streaming" % "2.3.0",
"org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.3.0"
)
何が悪いのかわからないので、助けていただければ幸いです。
直面した例外のスタックトレースをたどると、主な問題は次のとおりです。
原因:com.fasterxml.jackson.databind.JsonMappingException:互換性のないJacksonバージョン:2.9.4
実際のところ
Spark 2.1.0には、推移的な依存関係としてcom.fasterxml.jackson.coreが含まれています。したがって、libraryDependenciesに含める必要はありません。
同様の問題とその解決策について、ここで詳しく説明します。
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加