Kafka - 如何读取超过 10 行

完成

我制作了使用 jdbc 从数据库读取的连接器,并从 Spark 应用程序中使用它。该应用程序很好地读取了数据库数据,但它只读取了前 10 行,似乎忽略了其余的行。我应该如何休息,以便我可以使用所有数据进行计算。

这是我的火花代码:

val brokers = "http://127.0.0.1:9092"
val topics = List("postgres-accounts2")
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
//sparkConf.setMaster("spark://sda1:7077,sda2:7077")
sparkConf.setMaster("local[2]")
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
sparkConf.registerKryoClasses(Array(classOf[Record]))

val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")

 // Create direct kafka stream with brokers and topics
//val topicsSet = topics.split(",")

val kafkaParams = Map[String, Object](
  "schema.registry.url" -> "http://127.0.0.1:8081",
  "bootstrap.servers" -> "http://127.0.0.1:9092",
  "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
  "value.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer",
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "earliest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val messages = KafkaUtils.createDirectStream[String, Record](
  ssc,
  PreferConsistent,
  Subscribe[String, Record](topics, kafkaParams)
)

val data = messages.map(record => {
    println( record) // print only first 10
    // compute here?
    (record.key, record.value)
})

data.print()

// Start the computation
ssc.start()
ssc.awaitTermination()
夏道

我认为问题在于Spark 是懒惰的,只会读取实际使用的数据

默认情况下,print将显示流中的前 10 个元素。由于代码除了这两个操作之外不包含任何其他操作,print因此不需要读取超过 10 行的数据。尝试使用count或其他操作来确认它正在工作。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

在R中具有超过10行的目录中加入CSV文件

来自分类Dev

如果熊猫数据框具有超过10行,则将其分成两部分

来自分类Dev

TableView不显示超过10行

来自分类Dev

如何避免两个不同的线程从数据库读取相同的行(Hibernate和Oracle 10g)

来自分类Dev

如何分配10 ^ 6行的矩阵?

来自分类Dev

如何在Python中超过10,000行的文件中计算每个系统的系外行星?

来自分类Dev

如何从Wikipedia API获得超过默认的10个结果?

来自分类Dev

在Kafka中,如何处理源表中已反映在Kafka主题中的已删除行?

来自分类Dev

如何使读取文件超过特定行的条件?

来自分类Dev

如何使用Javascript获得中心的前10行,后3行和2行

来自分类Dev

在R中具有超过10行的目录中加入CSV文件

来自分类Dev

如何从Sql数据库中仅读取10行?

来自分类Dev

用户滚动超过10像素时如何运行功能?

来自分类Dev

读取前10行带有注释的文件

来自分类Dev

如果长度超过XY,如何删除行?

来自分类Dev

TableView不显示超过10行

来自分类Dev

从要匹配的模式超过10个的CSV文件中删除不需要的行

来自分类Dev

在Bash中,您将如何只读取超过特定时间戳的日志中的行?

来自分类Dev

如何在Microsoft Excel中填充大型系列(超过10,000行)而不拖动或选择单元格?

来自分类Dev

flink从kafka读取数据

来自分类Dev

如何从flink访问/读取kafka主题数据?

来自分类Dev

如何在文本文件中读取超过 2 行?C#

来自分类Dev

MySQL:如何获取最后 10 行而不是前 10 行

来自分类Dev

sed 命令删除包含超过 10 个字符且不以 91 开头的行

来自分类Dev

Windows 10 中的 Kafka 设置

来自分类Dev

dataTable 不能在表中显示超过 10 行

来自分类Dev

如何在 Access 中读取超过 255 个字符/行并包含控制字符的 TXT 文件?

来自分类Dev

Python Dataframe 删除出现超过 10 次特定值的行

来自分类Dev

如何读取所有数据单元格并仅突出显示超过 10 个字符的单元格

Related 相关文章

  1. 1

    在R中具有超过10行的目录中加入CSV文件

  2. 2

    如果熊猫数据框具有超过10行,则将其分成两部分

  3. 3

    TableView不显示超过10行

  4. 4

    如何避免两个不同的线程从数据库读取相同的行(Hibernate和Oracle 10g)

  5. 5

    如何分配10 ^ 6行的矩阵?

  6. 6

    如何在Python中超过10,000行的文件中计算每个系统的系外行星?

  7. 7

    如何从Wikipedia API获得超过默认的10个结果?

  8. 8

    在Kafka中,如何处理源表中已反映在Kafka主题中的已删除行?

  9. 9

    如何使读取文件超过特定行的条件?

  10. 10

    如何使用Javascript获得中心的前10行,后3行和2行

  11. 11

    在R中具有超过10行的目录中加入CSV文件

  12. 12

    如何从Sql数据库中仅读取10行?

  13. 13

    用户滚动超过10像素时如何运行功能?

  14. 14

    读取前10行带有注释的文件

  15. 15

    如果长度超过XY,如何删除行?

  16. 16

    TableView不显示超过10行

  17. 17

    从要匹配的模式超过10个的CSV文件中删除不需要的行

  18. 18

    在Bash中,您将如何只读取超过特定时间戳的日志中的行?

  19. 19

    如何在Microsoft Excel中填充大型系列(超过10,000行)而不拖动或选择单元格?

  20. 20

    flink从kafka读取数据

  21. 21

    如何从flink访问/读取kafka主题数据?

  22. 22

    如何在文本文件中读取超过 2 行?C#

  23. 23

    MySQL:如何获取最后 10 行而不是前 10 行

  24. 24

    sed 命令删除包含超过 10 个字符且不以 91 开头的行

  25. 25

    Windows 10 中的 Kafka 设置

  26. 26

    dataTable 不能在表中显示超过 10 行

  27. 27

    如何在 Access 中读取超过 255 个字符/行并包含控制字符的 TXT 文件?

  28. 28

    Python Dataframe 删除出现超过 10 次特定值的行

  29. 29

    如何读取所有数据单元格并仅突出显示超过 10 个字符的单元格

热门标签

归档