如何在Kafka Direct Stream中使用Spark结构化流?

谢尔盖·B

我遇到了带有Spark的结构化流,它提供了一个示例,该示例不断从S3存储桶中使用数据并将处理后的结果写入MySQL数据库。

// Read data continuously from an S3 location
val inputDF = spark.readStream.json("s3://logs")

// Do operations using the standard DataFrame API and write to MySQL
inputDF.groupBy($"action", window($"time", "1 hour")).count()
       .writeStream.format("jdbc")
       .start("jdbc:mysql//...")

如何与Spark Kafka Streaming一起使用

val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

有没有办法在不使用的情况下合并这两个示例stream.foreachRDD(rdd => {})

尤瓦尔·伊茨恰科夫(Yuval Itzchakov)

有没有办法在不使用的情况下合并这两个示例stream.foreachRDD(rdd => {})

还没有。Spark 2.0.0不支持结构化流的Kafka接收器。根据Spark Streaming的创建者之一Tathagata Das的说法,此功能应在Spark 2.1.0中推出这是相关的JIRA问题

编辑:(29/11/2018)

是的,Spark 2.2及更高版本有可能。

stream
  .writeStream // use `write` for batch, like DataFrame
  .format("kafka")
  .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
  .option("topic", "target-topic1")
  .start()

有关更多信息,请查看此SO帖子(使用Spark Streaming在Kafka主题上进行读写)

编辑:(06/12/2016)

Spark 2.0.2现在支持基于结构流的Kafka 0.10集成

val ds1 = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

ds1
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

如何在Spark 3.0结构化流中使用kafka.group.id和检查点以继续从重启后停止的Kafka读取?

来自分类Dev

Spark结构化流-Kafka偏移处理

来自分类Dev

Spark结构化流-Kafka偏移处理

来自分类Dev

如何找到Spark结构化流应用程序的使用者组ID?

来自分类Dev

如何使用Airflow重新启动失败的结构化流Spark作业?

来自分类Dev

如何删除Spark结构化流创建的旧数据?

来自分类Dev

通过使用两个不同的Spark结构化流读取同一主题来调试Kafka管道

来自分类Dev

结构化流如何动态解析kafka的json数据

来自分类Dev

如何在(Py)Spark结构化流中捕获不正确的(损坏的)JSON记录?

来自分类Dev

在使用Kafka的Spark结构化流媒体中,Spark如何管理多个主题的偏移

来自分类Dev

如何使用Instagram Direct API

来自分类Dev

如何使用(Py)Spark结构化流为带有时间戳(来自Kafka)的JSON记录定义架构?-显示空值

来自分类Dev

如何将静态数据帧与Spark结构化流中的流数据进行比较?

来自分类Dev

Spark结构化流作业如何处理流-静态DataFrame连接?

来自分类Dev

具有SASL_SSL身份验证的Kafka Spark结构化流

来自分类Dev

从Kafka读取的Spark结构化流应用程序仅返回空值

来自分类Dev

带有 kafka 的 Spark 结构化流只导致一批(Pyspark)

来自分类Dev

如何在结构化流中正确使用foreachBatch.batchDF.unpersist()?(保持错误)

来自分类Dev

我们如何在Spark结构化流中管理偏移量?(_spark_metadata问题)

来自分类Dev

从Kafka主题读取流时,Spark结构化流是否存在一些超时问题?

来自分类Dev

Spark结构化流时,DataFrame中的字符串列如何拆分为多个列

来自分类Dev

使用Spark结构化流(pyspark)从Kafka Connect JSONConverter消息中提取“有效负载”(模式和有效负载)

来自分类Dev

如何在Spark结构化流中基于时间戳字段重复数据删除并保持最新?

来自分类Dev

查询开始时,使用结构化流从Kafka主题的开头进行读取

来自分类Dev

如何初始化Ext.Direct?

来自分类Dev

Spark Streaming Direct Kafka API,OffsetRanges:如何处理首次运行

来自分类Dev

如何在结构化查询中使用scikit学习模型?

来自分类Dev

从 Kafka 读取时 Pyspark 结构化流中的异常

来自分类Dev

带有结构化流协议的 Apache Kafka

Related 相关文章

  1. 1

    如何在Spark 3.0结构化流中使用kafka.group.id和检查点以继续从重启后停止的Kafka读取?

  2. 2

    Spark结构化流-Kafka偏移处理

  3. 3

    Spark结构化流-Kafka偏移处理

  4. 4

    如何找到Spark结构化流应用程序的使用者组ID?

  5. 5

    如何使用Airflow重新启动失败的结构化流Spark作业?

  6. 6

    如何删除Spark结构化流创建的旧数据?

  7. 7

    通过使用两个不同的Spark结构化流读取同一主题来调试Kafka管道

  8. 8

    结构化流如何动态解析kafka的json数据

  9. 9

    如何在(Py)Spark结构化流中捕获不正确的(损坏的)JSON记录?

  10. 10

    在使用Kafka的Spark结构化流媒体中,Spark如何管理多个主题的偏移

  11. 11

    如何使用Instagram Direct API

  12. 12

    如何使用(Py)Spark结构化流为带有时间戳(来自Kafka)的JSON记录定义架构?-显示空值

  13. 13

    如何将静态数据帧与Spark结构化流中的流数据进行比较?

  14. 14

    Spark结构化流作业如何处理流-静态DataFrame连接?

  15. 15

    具有SASL_SSL身份验证的Kafka Spark结构化流

  16. 16

    从Kafka读取的Spark结构化流应用程序仅返回空值

  17. 17

    带有 kafka 的 Spark 结构化流只导致一批(Pyspark)

  18. 18

    如何在结构化流中正确使用foreachBatch.batchDF.unpersist()?(保持错误)

  19. 19

    我们如何在Spark结构化流中管理偏移量?(_spark_metadata问题)

  20. 20

    从Kafka主题读取流时,Spark结构化流是否存在一些超时问题?

  21. 21

    Spark结构化流时,DataFrame中的字符串列如何拆分为多个列

  22. 22

    使用Spark结构化流(pyspark)从Kafka Connect JSONConverter消息中提取“有效负载”(模式和有效负载)

  23. 23

    如何在Spark结构化流中基于时间戳字段重复数据删除并保持最新?

  24. 24

    查询开始时,使用结构化流从Kafka主题的开头进行读取

  25. 25

    如何初始化Ext.Direct?

  26. 26

    Spark Streaming Direct Kafka API,OffsetRanges:如何处理首次运行

  27. 27

    如何在结构化查询中使用scikit学习模型?

  28. 28

    从 Kafka 读取时 Pyspark 结构化流中的异常

  29. 29

    带有结构化流协议的 Apache Kafka

热门标签

归档