我有一个RDD,就像((String, String), TimeStamp)
。我有大量的记录,我想为每个键选择具有最新TimeStamp值的记录。我尝试了以下代码,但仍在为此苦苦挣扎。有人可以帮我做到这一点吗?
我尝试的以下代码是错误的,并且无法正常工作
val context = sparkSession.read.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")
.option("url", url)
.option("dbtable", "student_risk")
.option("user", "user")
.option("password", "password")
.load()
context.cache();
val studentRDD = context.rdd.map(r => ((r.getString(r.fieldIndex("course_id")), r.getString(r.fieldIndex("student_id"))), r.getTimestamp(r.fieldIndex("risk_date_time"))))
val filteredRDD = studentRDD.collect().map(z => (z._1, z._2)).reduce((x, y) => (x._2.compareTo(y._2)))
直接在DataFrame(context
在这里命名)很容易:
val result = context
.groupBy("course_id", "student_id")
.agg(min("risk_date_time") as "risk_date_time")
然后,您可以像以前一样将其转换为RDD(如果需要)-结果具有相同的架构。
如果您确实想通过RDD执行此操作,请使用reduceByKey
:
studentRDD.reduceByKey((t1, t2) => if (t1.before(t2)) t1 else t2)
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句