Spark with kafka: NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)

rigby

I try to run java kafka consumer for spark and no matter what i do i get the exception. In the exception i see (ConsumerStrategy.scala:85) Why does it says scala here? does this mean that it it uses Scala methods instead of java? Are any of my libraries conflicting?

My pom

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.3.0</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.3.0</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.3.0</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
       <version>2.4.5</version>
    </dependency>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.11</artifactId>
        <version>2.3.0</version>
    </dependency>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector-java_2.11</artifactId>
        <version>1.5.2</version>
    </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.13</artifactId>
            <version>2.4.1</version>
        </dependency>
    </dependencies>

my code:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
public static void main(String[] args) throws InterruptedException {
        SparkConf sparkConf = new SparkConf();
        sparkConf.setAppName("kafkaTest");
       // sparkConf.set("spark.cassandra.connection.host", "127.0.0.1");

        JavaStreamingContext streamingContext = new JavaStreamingContext(
                sparkConf, Durations.seconds(1));

        Map<String, Object> kafkaParams = new HashMap<String, Object>();
        kafkaParams.put("bootstrap.servers", "kafka.kafka:9092");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", "spark_group1");
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", false);
        kafkaParams.put("partition.assignment.strategy", "range");

        System.out.println("Hello1");
        Collection<String> topics = Arrays.asList("spark");
        System.out.println("Hello2");
        ConsumerStrategy<String, String> cons = ConsumerStrategies.Subscribe(topics, kafkaParams);

        JavaInputDStream<ConsumerRecord<String, String>> messages =
                KafkaUtils.createDirectStream(
                        streamingContext,
                        LocationStrategies.PreferConsistent(),
                        cons);

        messages.foreachRDD(rdd -> {
            System.out.printf("Mssg received {}", rdd);
        });

i ran it:

spark-submit --jars spark-streaming-kafka-0-10_2.11-2.3.0.jar --class Main spark-kafka-1.0-SNAPSHOT-jar-with-dependencies.jar

(also withouti tried --jars spark-streaming-kafka-0-10_2.11-2.3.0.jar and version 2.4.5 of this lib)

and get the exception

Exception in thread "streaming-start" java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V
        at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:85)
        at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:73)
        at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:259)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
        at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
        at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
        at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
        at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
        at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
        at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
        at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
        at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I tried export SPARK_KAFKA_VERSION=0.10 also tried adding adding kafka-clients 0.10.2.1

still get the same result.

rigby

the problem was that there is another kafka library on spark. /opt/hadoop/share/hadoop/tools/lib/kafka-clients-0.8.2.1.jar . to overwrite it u used maven shade plugin. nothing else worked for me. see this link for details: https://medium.com/@minyodev/relocating-classes-using-apache-maven-shade-plugin-6957a1a8666d

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

kafkaでSpark:NoSuchMethodError:org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava / util / Collection;)

分類Dev

Getting Error "java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign" when tring to consume using flink's Kafka Consumer

分類Dev

Apache Kafka-例外:org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava / util / List;)V

分類Dev

How to fix "java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord" in Spark Streaming Kafka Consumer?

分類Dev

flinkのKafkaコンシューマーを使用して消費しようとすると、エラー「java.lang.NoSuchMethodError:org.apache.kafka.clients.consumer.KafkaConsumer.assign」が発生する

分類Dev

Spark Streaming Kafka Consumerで「java.io.NotSerializableException:org.apache.kafka.clients.consumer.ConsumerRecord」を修正するにはどうすればよいですか?

分類Dev

kafka-clients-test.jarが見つかりませんでした(org.apache.kafka:kafka-clients:0.10.2-kafka-2.2.0)

分類Dev

Spark Streaming Kafka java.lang.ClassNotFoundException:org.apache.kafka.common.serialization.StringDeserializer

分類Dev

No appenders could be found for logger (org.apache.kafka.clients.producer.ProducerConfig)

分類Dev

Connecting Apache Consumer to a single node on a Kafka Cluster

分類Dev

Apache Kafka 2.0 get the consumer lag

分類Dev

org.apache.kafka.common.serialization.StringDeserializerのSparkがNoClassDefFoundErrorで失敗する

分類Dev

Spark 1.6 Streaming consumer reading in kafka offset stuck at createDirectStream

分類Dev

Apache Kafka: Can we restrict message to be read by only 1 consumer?

分類Dev

Who keeps track of the last read message offset of the consumer in Apache Kafka?

分類Dev

Kafka Consumer with JAVA

分類Dev

Spring boot Kafka consumer

分類Dev

NoSuchMethodError:org.apache.spark.sql.SQLContext.sql

分類Dev

WARN Error while fetching metadata with correlation id 1 : {MY_TOPIC?=INVALID_TOPIC_EXCEPTION} (org.apache.kafka.clients.NetworkClient)

分類Dev

Kafka consumer hangs on poll when kafka is down

分類Dev

Apache Spark:NoSuchMethodError

分類Dev

java.lang.NoClassDefFoundError:org / apache / kafka / connect / header / ConnectHeaders

分類Dev

Spark / Kafka org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0依存関係をMaven pomファイルで機能させる方法は?

分類Dev

Kafka consumer manual commit offset

分類Dev

Kafka Stream: Consumer commit frequency

分類Dev

Partition specific flink kafka consumer

分類Dev

Kafka elixir consumer keeps crashing

分類Dev

kafka consumer code is not running completely

分類Dev

Kafka POM依存関係の問題-ClassNotFoundException:org.apache.kafka.test.TestCondition

Related 関連記事

  1. 1

    kafkaでSpark:NoSuchMethodError:org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava / util / Collection;)

  2. 2

    Getting Error "java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign" when tring to consume using flink's Kafka Consumer

  3. 3

    Apache Kafka-例外:org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava / util / List;)V

  4. 4

    How to fix "java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord" in Spark Streaming Kafka Consumer?

  5. 5

    flinkのKafkaコンシューマーを使用して消費しようとすると、エラー「java.lang.NoSuchMethodError:org.apache.kafka.clients.consumer.KafkaConsumer.assign」が発生する

  6. 6

    Spark Streaming Kafka Consumerで「java.io.NotSerializableException:org.apache.kafka.clients.consumer.ConsumerRecord」を修正するにはどうすればよいですか?

  7. 7

    kafka-clients-test.jarが見つかりませんでした(org.apache.kafka:kafka-clients:0.10.2-kafka-2.2.0)

  8. 8

    Spark Streaming Kafka java.lang.ClassNotFoundException:org.apache.kafka.common.serialization.StringDeserializer

  9. 9

    No appenders could be found for logger (org.apache.kafka.clients.producer.ProducerConfig)

  10. 10

    Connecting Apache Consumer to a single node on a Kafka Cluster

  11. 11

    Apache Kafka 2.0 get the consumer lag

  12. 12

    org.apache.kafka.common.serialization.StringDeserializerのSparkがNoClassDefFoundErrorで失敗する

  13. 13

    Spark 1.6 Streaming consumer reading in kafka offset stuck at createDirectStream

  14. 14

    Apache Kafka: Can we restrict message to be read by only 1 consumer?

  15. 15

    Who keeps track of the last read message offset of the consumer in Apache Kafka?

  16. 16

    Kafka Consumer with JAVA

  17. 17

    Spring boot Kafka consumer

  18. 18

    NoSuchMethodError:org.apache.spark.sql.SQLContext.sql

  19. 19

    WARN Error while fetching metadata with correlation id 1 : {MY_TOPIC?=INVALID_TOPIC_EXCEPTION} (org.apache.kafka.clients.NetworkClient)

  20. 20

    Kafka consumer hangs on poll when kafka is down

  21. 21

    Apache Spark:NoSuchMethodError

  22. 22

    java.lang.NoClassDefFoundError:org / apache / kafka / connect / header / ConnectHeaders

  23. 23

    Spark / Kafka org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0依存関係をMaven pomファイルで機能させる方法は?

  24. 24

    Kafka consumer manual commit offset

  25. 25

    Kafka Stream: Consumer commit frequency

  26. 26

    Partition specific flink kafka consumer

  27. 27

    Kafka elixir consumer keeps crashing

  28. 28

    kafka consumer code is not running completely

  29. 29

    Kafka POM依存関係の問題-ClassNotFoundException:org.apache.kafka.test.TestCondition

ホットタグ

アーカイブ