我正在尝试获取文件中所有JSON对象的平均评级。我加载了文件并转换为数据帧,但在解析avg时却出错。样品要求:
{
"country": "France",
"customerId": "France001",
"visited": [
{
"placeName": "US",
"rating": "2.3",
"famousRest": "N/A",
"placeId": "AVBS34"
},
{
"placeName": "US",
"rating": "3.3",
"famousRest": "SeriousPie",
"placeId": "VBSs34"
},
{
"placeName": "Canada",
"rating": "4.3",
"famousRest": "TimHortons",
"placeId": "AVBv4d"
}
]
}
因此对于此JSON,美国平均评分为(2.3 + 3.3)/ 2 = 2.8
{
"country": "Egypt",
"customerId": "Egypt009",
"visited": [
{
"placeName": "US",
"rating": "1.3",
"famousRest": "McDonald",
"placeId": "Dedcf3"
},
{
"placeName": "US",
"rating": "3.3",
"famousRest": "EagleNest",
"placeId": "CDfet3"
},
}
{
"country": "Canada",
"customerId": "Canada012",
"visited": [
{
"placeName": "UK",
"rating": "3.3",
"famousRest": "N/A",
"placeId": "XSdce2"
},
]
}
对于我们这个平均值=(3.3 +1.3)/ 2 = 2.3
因此,总体而言,平均评分将为:(2.8 + 2.3)/ 2 = 2.55(只有两个请求的访问列表中有“ US”)
我的架构:
root
|-- country: string(nullable=true)
|-- customerId:string(nullable=true)
|-- visited: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- placeId: string (nullable = true)
| | |-- placeName: string (nullable = true)
| | |-- famousRest: string (nullable = true)
| | |-- rating: string (nullable = true)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.jsonFile("temp.txt")
df.show()
因此,基本上,我需要获取例如placeName ='US'的评分的平均值。AVG_RATING =每个JSON对象中的评分总和,其中placeName是US /这样的访问条目的计数,而FINAL_VALUE =每个JSON对象中所有具有placeName'US'的AVG_RATING的总和/所有JSON对象的placeName ='US'的计数。
到目前为止,我尝试了:
df.registerTempTable("people")
sqlContext.sql("select avg(expResults.rank) from people LATERAL VIEW explode(visited)people AS expResults where expResults.placeName = 'US' ").collect().foreach(println)
val result = df.select("*").where(array_contains (df("visited.placeName"), "US")); - gives the list where visited array contains US. But I am not sure how do parse through list of structs.
有人可以告诉我该怎么做吗?
您似乎想要这样的东西:
import org.apache.spark.sql.functions.{avg, explode}
val result = df
.withColumn("visit", explode($"visited")) // Explode visits
.groupBy($"customerId", $"visit.placeName") // Group by using dot syntax
.agg(avg($"visit.rating".cast("double")).alias("tmp"))
.groupBy($"placeName").agg(avg($"tmp").alias("value"))
之后,您可以针对您选择的国家/地区进行过滤。
result.where($"placeName" === "US").show
// +---------+-----+
// |placeName|value|
// +---------+-----+
// | US| 2.55|
// +---------+-----+
不太优雅的方法是使用UDF:
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.udf
def userAverage(country: String) = udf((visits: Seq[Row]) => Try {
val filtered = visits
.filter(_.getAs[String]("placeName") == country)
.map(_.getAs[String]("rating").toDouble)
filtered.sum / filtered.size
}.toOption)
df.select(userAverage("US")($"visited").as("tmp")).na.drop.agg(avg("tmp"))
注意:这是在问题中通过计算平均值而得出的解题,该平均值与接受的答案不同。对于简单的平均值:
val result = df
.select(explode($"visited").alias("visit"))
.groupBy($"visit.placeName")
.agg(avg($"visit.rating".cast("double")))
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句