scala spark - groupBy 查找日期范围内月份之间的平均值

诺尔丁

我正在查看这个无人机租赁数据集。我想尝试按 Spark 中的“结果”列进行分组,以显示每架无人机根据其在该月度过的天数而得出的平均结果 ($)。

IE。结果列中的值除以总天数,然后归因于开始日期和结束日期之间的每个月的天数

+------+------------------+------------------+--------+
| Drone|     Start        |      End         | Result |
+------+------------------+------------------+--------+
| DR1    16/06/2013 10:30   22/08/2013 07:00    2786  |
| DR1    20/04/2013 23:30   16/06/2013 10:30    7126  |
| DR1    24/01/2013 23:00   20/04/2013 23:30    2964  |
| DR2    01/03/2014 19:00   07/05/2014 18:00    8884  |
| DR2    04/09/2015 09:00   04/11/2015 07:00    7828  |
| DR2    04/10/2013 05:00   24/12/2013 07:00    5700  |
+-----------------------------------------------------+

这很困难,因为它是一项长期租赁业务,而不是与某个日期相关的价值,因此简单的 groupBy 对我不起作用。

请注意,在完整数据集中每分钟雇用无人机有点混乱。

我将不胜感激有关解决此类问题的正确思考过程以及代码外观的帮助。

您将如何更改我在下面写的内容以将每个月视为一个单独的案例?(我只能基于开始日期):/

val df_avgs = df.groupBy("Start").mean()
df_avgs.select($"Date",$"avg(Result)").show()

以每种无人机类型的第一个示例为例,我的预期输出是:

+------+-------+-------+---------+
|Drone | Month | Days  |   Avg   |
+------+-------+-------+---------+
|DR1     June      X       $YY   |
|DR1     July      X       $YY   |
|DR1     August    X       $YY   |
|DR2     March     Y       $ZZ   |
|DR2     April     Y       $ZZ   |
|DR2     May       Y       $ZZ   |
+--------------------------------+

非常感谢

堆栈0114106

你能看看这个吗?。我在日期格式中使用了“MMM-yy”,这样如果开始和结束日期跨越年份,那么它会很容易区分。如果您只需要一个月,您可以将其更改为“MMM”。

scala> val df_t = Seq(("DR1","16/06/2013 10:30","22/08/2013 07:00",2786),("DR1","20/04/2013 23:30","16/06/2013 10:30",7126),("DR1","24/01/2013 23:00","20/04/2013 23:30",2964),("DR2","01/03/2014 19:00","07/05/2014 18:00",8884),("DR2","04/09/2015 09:00","04/11/2015 07:00",7828),("DR2","04/10/2013 05:00","24/12/2013 07:00",5700)).toDF("drone","start","end","result")
df_t: org.apache.spark.sql.DataFrame = [drone: string, start: string ... 2 more fields]

scala> val df = df_t.withColumn("start",to_timestamp('start,"dd/MM/yyyy HH:mm")).withColumn("end",to_timestamp('end,"dd/MM/yyyy HH:mm"))
df: org.apache.spark.sql.DataFrame = [drone: string, start: timestamp ... 2 more fields]

scala> df.show(false)
+-----+-------------------+-------------------+------+
|drone|start              |end                |result|
+-----+-------------------+-------------------+------+
|DR1  |2013-06-16 10:30:00|2013-08-22 07:00:00|2786  |
|DR1  |2013-04-20 23:30:00|2013-06-16 10:30:00|7126  |
|DR1  |2013-01-24 23:00:00|2013-04-20 23:30:00|2964  |
|DR2  |2014-03-01 19:00:00|2014-05-07 18:00:00|8884  |
|DR2  |2015-09-04 09:00:00|2015-11-04 07:00:00|7828  |
|DR2  |2013-10-04 05:00:00|2013-12-24 07:00:00|5700  |
+-----+-------------------+-------------------+------+


scala> :paste
// Entering paste mode (ctrl-D to finish)

def months_range(a:java.sql.Date,b:java.sql.Date):Seq[String]=
{
import java.time._
import java.time.format._
val start = a.toLocalDate
val end = b.toLocalDate
(start.toEpochDay until end.toEpochDay).map(LocalDate.ofEpochDay(_)).map(DateTimeFormatter.ofPattern("MMM-yy").format(_)).toSet.toSeq
}

// Exiting paste mode, now interpreting.

months_range: (a: java.sql.Date, b: java.sql.Date)Seq[String]

scala> val udf_months_range = udf(  months_range(_:java.sql.Date,_:java.sql.Date):Seq[String] )
udf_months_range: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,ArrayType(StringType,true),Some(List(DateType, DateType)))

scala> val df2 = df.withColumn("days",datediff('end,'start)).withColumn("diff_months",udf_months_range('start,'end))
df2: org.apache.spark.sql.DataFrame = [drone: string, start: timestamp ... 4 more fields]

scala> df2.show(false)
+-----+-------------------+-------------------+------+----+--------------------------------+
|drone|start              |end                |result|days|diff_months                     |
+-----+-------------------+-------------------+------+----+--------------------------------+
|DR1  |2013-06-16 10:30:00|2013-08-22 07:00:00|2786  |67  |[Jun-13, Jul-13, Aug-13]        |
|DR1  |2013-04-20 23:30:00|2013-06-16 10:30:00|7126  |57  |[Apr-13, May-13, Jun-13]        |
|DR1  |2013-01-24 23:00:00|2013-04-20 23:30:00|2964  |86  |[Jan-13, Feb-13, Mar-13, Apr-13]|
|DR2  |2014-03-01 19:00:00|2014-05-07 18:00:00|8884  |67  |[Mar-14, Apr-14, May-14]        |
|DR2  |2015-09-04 09:00:00|2015-11-04 07:00:00|7828  |61  |[Sep-15, Oct-15, Nov-15]        |
|DR2  |2013-10-04 05:00:00|2013-12-24 07:00:00|5700  |81  |[Oct-13, Nov-13, Dec-13]        |
+-----+-------------------+-------------------+------+----+--------------------------------+


scala> df2.withColumn("month",explode('diff_months)).withColumn("Avg",'result/'days).select("drone","month","days","avg").show(false)
+-----+------+----+------------------+
|drone|month |days|avg               |
+-----+------+----+------------------+
|DR1  |Jun-13|67  |41.582089552238806|
|DR1  |Jul-13|67  |41.582089552238806|
|DR1  |Aug-13|67  |41.582089552238806|
|DR1  |Apr-13|57  |125.01754385964912|
|DR1  |May-13|57  |125.01754385964912|
|DR1  |Jun-13|57  |125.01754385964912|
|DR1  |Jan-13|86  |34.46511627906977 |
|DR1  |Feb-13|86  |34.46511627906977 |
|DR1  |Mar-13|86  |34.46511627906977 |
|DR1  |Apr-13|86  |34.46511627906977 |
|DR2  |Mar-14|67  |132.59701492537314|
|DR2  |Apr-14|67  |132.59701492537314|
|DR2  |May-14|67  |132.59701492537314|
|DR2  |Sep-15|61  |128.327868852459  |
|DR2  |Oct-15|61  |128.327868852459  |
|DR2  |Nov-15|61  |128.327868852459  |
|DR2  |Oct-13|81  |70.37037037037037 |
|DR2  |Nov-13|81  |70.37037037037037 |
|DR2  |Dec-13|81  |70.37037037037037 |
+-----+------+----+------------------+


scala>

编辑1

根据每个月的天数拆分。必须从 UDF 更改代码。

scala> :paste
// Entering paste mode (ctrl-D to finish)

def months_range(a:java.sql.Date,b:java.sql.Date)=
{
import java.time._
import java.time.format._
val start = a.toLocalDate
val end = b.toLocalDate
(start.toEpochDay until end.toEpochDay).map(LocalDate.ofEpochDay(_)).map(DateTimeFormatter.ofPattern("MMM-yy").format(_)).groupBy(identity).map( x => (x._1,x._2.length) )
}

// Exiting paste mode, now interpreting.

months_range: (a: java.sql.Date, b: java.sql.Date)scala.collection.immutable.Map[String,Int]

scala> val udf_months_range = udf(  months_range(_:java.sql.Date,_:java.sql.Date):Map[String,Int] )
udf_months_range: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,MapType(StringType,IntegerType,false),Some(List(DateType, DateType)))

scala>  val df2 = df.withColumn("days",datediff('end,'start)).withColumn("diff_months",udf_months_range('start,'end))
df2: org.apache.spark.sql.DataFrame = [drone: string, start: timestamp ... 4 more fields]

scala> val df3=df2.select(col("*"),explode('diff_months).as(Seq("month","month_days")) ).withColumn("mnth_rent",'result*('month_days/'days)).select("drone","month","month_days","days","mnth_rent")
df3: org.apache.spark.sql.DataFrame = [drone: string, month: string ... 3 more fields]

scala> df3.show(false)
+-----+------+----------+----+------------------+
|drone|month |month_days|days|mnth_rent         |
+-----+------+----------+----+------------------+
|DR1  |Aug-13|21        |67  |873.223880597015  |
|DR1  |Jul-13|31        |67  |1289.044776119403 |
|DR1  |Jun-13|15        |67  |623.7313432835821 |
|DR1  |May-13|31        |57  |3875.543859649123 |
|DR1  |Apr-13|11        |57  |1375.1929824561403|
|DR1  |Jun-13|15        |57  |1875.2631578947367|
|DR1  |Apr-13|19        |86  |654.8372093023256 |
|DR1  |Feb-13|28        |86  |965.0232558139536 |
|DR1  |Mar-13|31        |86  |1068.4186046511627|
|DR1  |Jan-13|8         |86  |275.72093023255815|
|DR2  |Apr-14|30        |67  |3977.910447761194 |
|DR2  |Mar-14|31        |67  |4110.507462686567 |
|DR2  |May-14|6         |67  |795.5820895522388 |
|DR2  |Nov-15|3         |61  |384.983606557377  |
|DR2  |Oct-15|31        |61  |3978.1639344262294|
|DR2  |Sep-15|27        |61  |3464.8524590163934|
|DR2  |Nov-13|30        |81  |2111.111111111111 |
|DR2  |Oct-13|28        |81  |1970.3703703703702|
|DR2  |Dec-13|23        |81  |1618.5185185185185|
+-----+------+----------+----+------------------+


scala> df3.groupBy('drone,'month).agg(sum('month_days).as("s_month_days"),sum('mnth_rent).as("mnth_rent"),max('days).as("days")).orderBy('drone,'month).show(false)
+-----+------+------------+------------------+----+
|drone|month |s_month_days|mnth_rent         |days|
+-----+------+------------+------------------+----+
|DR1  |Apr-13|30          |2030.030191758466 |86  |
|DR1  |Aug-13|21          |873.223880597015  |67  |
|DR1  |Feb-13|28          |965.0232558139536 |86  |
|DR1  |Jan-13|8           |275.72093023255815|86  |
|DR1  |Jul-13|31          |1289.044776119403 |67  |
|DR1  |Jun-13|30          |2498.994501178319 |67  |
|DR1  |Mar-13|31          |1068.4186046511627|86  |
|DR1  |May-13|31          |3875.543859649123 |57  |
|DR2  |Apr-14|30          |3977.910447761194 |67  |
|DR2  |Dec-13|23          |1618.5185185185185|81  |
|DR2  |Mar-14|31          |4110.507462686567 |67  |
|DR2  |May-14|6           |795.5820895522388 |67  |
|DR2  |Nov-13|30          |2111.111111111111 |81  |
|DR2  |Nov-15|3           |384.983606557377  |61  |
|DR2  |Oct-13|28          |1970.3703703703702|81  |
|DR2  |Oct-15|31          |3978.1639344262294|61  |
|DR2  |Sep-15|27          |3464.8524590163934|61  |
+-----+------+------------+------------------+----+


scala>

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

Groupby 和减去 Spark Scala

来自分类Dev

使用 Spark Scala 计算平均值

来自分类常见问题

在给定范围内查找最大序列-Spark / Scala

来自分类Dev

Scala、Spark-shell、Groupby 失败

来自分类Dev

在 spark scala 中查找平均值给出空白结果

来自分类Dev

scala-spark:groupby之后如何过滤RDD

来自分类Dev

Spark scala 中 GroupByKey($"col") 和 GroupBy($"col") 的区别

来自分类Dev

Groupby 正则表达式 Spark Scala

来自分类Dev

Spark Scala:计算连续的月份

来自分类Dev

计算Spark Scala中文本文件中数字的平均值

来自分类Dev

Scala Spark RDD.groupBy 其中每个元素可以在多个组中

来自分类Dev

如何在 Scala/Spark 中忽略 Groupby 中的第一个元素?

来自分类Dev

Scala中Spark DataFrame的谐波均值

来自分类Dev

Spark Scala DataFrame查找最大值

来自分类Dev

如何使用Scala在Spark中处理日期?

来自分类Dev

如何获取每个条目所有行条目中scala-spark中数组类型列的平均值?

来自分类Dev

Scala查找范围内的缺失值

来自分类Dev

Spark Groupby聚合

来自分类Dev

GroupBy在Scala中

来自分类Dev

Scala groupby元素类型

来自分类Dev

Scala groupby元组

来自分类Dev

计算日期范围内的平均值

来自分类Dev

计算日期范围内的平均值

来自分类Dev

通过Spark 1.8按部门查找平均值

来自分类Dev

通过Spark 1.8按部门查找平均值

来自分类Dev

熊猫-比较给定日期范围内小时平均值的平均值

来自分类Dev

Groupby和滚动平均值

来自分类Dev

熊猫Groupby几何平均值?

来自分类Dev

R groupby并计算特殊平均值