我制作了使用 jdbc 从数据库读取的连接器,并从 Spark 应用程序中使用它。该应用程序很好地读取了数据库数据,但它只读取了前 10 行,似乎忽略了其余的行。我应该如何休息,以便我可以使用所有数据进行计算。
这是我的火花代码:
val brokers = "http://127.0.0.1:9092"
val topics = List("postgres-accounts2")
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
//sparkConf.setMaster("spark://sda1:7077,sda2:7077")
sparkConf.setMaster("local[2]")
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.registerKryoClasses(Array(classOf[Record]))
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
// Create direct kafka stream with brokers and topics
//val topicsSet = topics.split(",")
val kafkaParams = Map[String, Object](
"schema.registry.url" -> "http://127.0.0.1:8081",
"bootstrap.servers" -> "http://127.0.0.1:9092",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer",
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val messages = KafkaUtils.createDirectStream[String, Record](
ssc,
PreferConsistent,
Subscribe[String, Record](topics, kafkaParams)
)
val data = messages.map(record => {
println( record) // print only first 10
// compute here?
(record.key, record.value)
})
data.print()
// Start the computation
ssc.start()
ssc.awaitTermination()
我认为问题在于Spark 是懒惰的,只会读取实际使用的数据。
默认情况下,print
将显示流中的前 10 个元素。由于代码除了这两个操作之外不包含任何其他操作,print
因此不需要读取超过 10 行的数据。尝试使用count
或其他操作来确认它正在工作。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句