我遇到了带有Spark的结构化流,它提供了一个示例,该示例不断从S3存储桶中使用数据并将处理后的结果写入MySQL数据库。
// Read data continuously from an S3 location
val inputDF = spark.readStream.json("s3://logs")
// Do operations using the standard DataFrame API and write to MySQL
inputDF.groupBy($"action", window($"time", "1 hour")).count()
.writeStream.format("jdbc")
.start("jdbc:mysql//...")
如何与Spark Kafka Streaming一起使用?
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
有没有办法在不使用的情况下合并这两个示例stream.foreachRDD(rdd => {})
?
有没有办法在不使用的情况下合并这两个示例
stream.foreachRDD(rdd => {})
?
还没有。Spark 2.0.0不支持结构化流的Kafka接收器。根据Spark Streaming的创建者之一Tathagata Das的说法,此功能应在Spark 2.1.0中推出。这是相关的JIRA问题。
是的,Spark 2.2及更高版本有可能。
stream
.writeStream // use `write` for batch, like DataFrame
.format("kafka")
.option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
.option("topic", "target-topic1")
.start()
有关更多信息,请查看此SO帖子(使用Spark Streaming在Kafka主题上进行读写)。
Spark 2.0.2现在支持基于结构流的Kafka 0.10集成:
val ds1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
ds1
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句