Spark 1.6 Streaming consumer reading in kafka offset stuck at createDirectStream

gimp770

I am trying to read in the spark streaming offset into my consumer but I cannot seem to do it correctly.

Here is my code.

val dfoffset = hiveContext.sql(s"select * from $db")
dfoffset.show()
val dfoffsetArray = dfoffset.collect()
println("printing array of data")
dfoffsetArray.foreach(println)
val fromOffsets = collection.mutable.Map[TopicAndPartition, Long]()
for (i <- dfoffsetArray) {
  val topicAndPartition = (TopicAndPartition(i(1).toString, i(0).toString.toInt) -> (i(2).toString.toLong))
  fromOffsets += topicAndPartition
}

val kafkaParams = Map[String, String]("bootstrap.servers" -> serverName, "group.id" -> "test")
val topics = Array(topicName).toSet
//stuck here 
var directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

directKafkaStream.foreachRDD(rdd1 => { ..

Here is the output from showing the dataframe

partition_number|topic_name|current_offset|
+----------------+----------+--------------+
|               0|TOPIC_NAME|          4421|

Any help is greatly appreciated.

I am using spark 1.6 , Scala 2.10.5, kafka 10

Ire

As the official document shown KafkaUtils.createDirectStream, you should pass the fromOffsets as the 3rd parameter of createDirectStream(and don't forget about the 4th parameter messageHandler).

The fromOffsets parameter suppose to be a collection.immutable.Map[TopicAndPartition, Long], we usually use immutable instead of mutable as possible in Scala.
You may transform the dfoffsetArray to immutable.Map[TopicAndPartition, Long] with the following:

val fromOffsets = dfoffsetArray.map( i =>
  TopicAndPartition(i(1).toString, i(0).toString.toInt) -> (i(2).toString.toLong)
).toMap

And the messageHandler is type of (MessageAndMetadata[K, V]) ⇒ R), which deals key and value of messages. You can define a simple handler as the following:

val messageHandler =
  (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)

Then your createDirectStream will look like...

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder,
  (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)

Now you are free to do some transformation to your stream. Happy streaming!


I was tutored by this article months ago. Maybe you will find it helpful.

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

Kafka consumer manual commit offset

分類Dev

How to fix "java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord" in Spark Streaming Kafka Consumer?

分類Dev

How to set offset committed by the consumer group using Spark's Direct Stream for Kafka?

分類Dev

ExceptionInInitializerError Spark Streaming Kafka

分類Dev

Spark Kafka Streaming Issue

分類Dev

spark-streaming-kafka-0-10 auto.offset.reset is always set to none

分類Dev

Incorrect Kafka offset across consumer groups

分類Dev

Kafka topic partitions to Spark streaming

分類Dev

spark-streaming-kafka-0-10auto.offset.resetは常にnoneに設定されます

分類Dev

Who keeps track of the last read message offset of the consumer in Apache Kafka?

分類Dev

Spark Streaming Kafka Consumerで「java.io.NotSerializableException:org.apache.kafka.clients.consumer.ConsumerRecord」を修正するにはどうすればよいですか?

分類Dev

Spark Structured Streaming with Kafka version 2

分類Dev

Spark Streaming + Kafka統合0.8.2.1

分類Dev

Kafka Consumer not able to read all message after offset commit (error=OFFSET_OUT_OF_RANGE

分類Dev

KafkaUtils.createDirectStream to a String object Spark

分類Dev

How to read from specific Kafka partition in Spark structured streaming

分類Dev

Kafka - How to commit offset after every message using High-Level consumer?

分類Dev

Is consumer offset commited even when failing to post to output topic in Kafka Streams?

分類Dev

Kafka Consumer configuration - How does auto.offset.reset controls the message consumption

分類Dev

kafkaでSpark:NoSuchMethodError:org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava / util / Collection;)

分類Dev

Spark with kafka: NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)

分類Dev

Apache Kafka: Can we restrict message to be read by only 1 consumer?

分類Dev

Spark Streaming Kafka java.lang.ClassNotFoundException:org.apache.kafka.common.serialization.StringDeserializer

分類Dev

Kafka Connect: Reading "Connect.offsets" in standalone mode to get the last processed offset

分類Dev

Kafka MirrorMakerが__consumer_offsetトピックの複製に失敗する

分類Dev

Kafka Consumer with JAVA

分類Dev

Spring boot Kafka consumer

分類Dev

Spark 1.6Kafkaオフセットでのストリーミングコンシューマー読み取りがcreateDirectStreamでスタック

分類Dev

kafka-consumer-groupsコマンドは、spark構造化ストリーミングアプリケーション(コンシューマー)のLAGとCURRENT-OFFSETを表示しません

Related 関連記事

  1. 1

    Kafka consumer manual commit offset

  2. 2

    How to fix "java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord" in Spark Streaming Kafka Consumer?

  3. 3

    How to set offset committed by the consumer group using Spark's Direct Stream for Kafka?

  4. 4

    ExceptionInInitializerError Spark Streaming Kafka

  5. 5

    Spark Kafka Streaming Issue

  6. 6

    spark-streaming-kafka-0-10 auto.offset.reset is always set to none

  7. 7

    Incorrect Kafka offset across consumer groups

  8. 8

    Kafka topic partitions to Spark streaming

  9. 9

    spark-streaming-kafka-0-10auto.offset.resetは常にnoneに設定されます

  10. 10

    Who keeps track of the last read message offset of the consumer in Apache Kafka?

  11. 11

    Spark Streaming Kafka Consumerで「java.io.NotSerializableException:org.apache.kafka.clients.consumer.ConsumerRecord」を修正するにはどうすればよいですか?

  12. 12

    Spark Structured Streaming with Kafka version 2

  13. 13

    Spark Streaming + Kafka統合0.8.2.1

  14. 14

    Kafka Consumer not able to read all message after offset commit (error=OFFSET_OUT_OF_RANGE

  15. 15

    KafkaUtils.createDirectStream to a String object Spark

  16. 16

    How to read from specific Kafka partition in Spark structured streaming

  17. 17

    Kafka - How to commit offset after every message using High-Level consumer?

  18. 18

    Is consumer offset commited even when failing to post to output topic in Kafka Streams?

  19. 19

    Kafka Consumer configuration - How does auto.offset.reset controls the message consumption

  20. 20

    kafkaでSpark:NoSuchMethodError:org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava / util / Collection;)

  21. 21

    Spark with kafka: NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)

  22. 22

    Apache Kafka: Can we restrict message to be read by only 1 consumer?

  23. 23

    Spark Streaming Kafka java.lang.ClassNotFoundException:org.apache.kafka.common.serialization.StringDeserializer

  24. 24

    Kafka Connect: Reading "Connect.offsets" in standalone mode to get the last processed offset

  25. 25

    Kafka MirrorMakerが__consumer_offsetトピックの複製に失敗する

  26. 26

    Kafka Consumer with JAVA

  27. 27

    Spring boot Kafka consumer

  28. 28

    Spark 1.6Kafkaオフセットでのストリーミングコンシューマー読み取りがcreateDirectStreamでスタック

  29. 29

    kafka-consumer-groupsコマンドは、spark構造化ストリーミングアプリケーション(コンシューマー)のLAGとCURRENT-OFFSETを表示しません

ホットタグ

アーカイブ