我有一个 Spark 数据框,如下所示 -
val myDF = Seq(
(1,"A",100,0,0),
(1,"E",200,0,0),
(1,"",300,1,49),
(2,"A",200,0,0),
(2,"C",300,0,0),
(2,"D",100,0,0)
).toDF("visitor","channel","timestamp","purchase_flag","amount")
scala> myDF.show
+-------+-------+---------+-------------+------+
|visitor|channel|timestamp|purchase_flag|amount|
+-------+-------+---------+-------------+------+
| 1| A| 100| 0| 0|
| 1| E| 200| 0| 0|
| 1| | 300| 1| 49|
| 2| A| 200| 0| 0|
| 2| C| 300| 0| 0|
| 2| D| 100| 0| 0|
+-------+-------+---------+-------------+------+
我想为每个访问者创建 Sequence 数据框,myDF
以便跟踪访问者按timestamp
维度订购的购买路径。输出数据帧应如下所示(->
可以是任何分隔符)-
+-------+---------------------+
|visitor|channel sequence |
+-------+---------------------+
| 1| A->E->purchase |
| 2| D->A->C->no_purchase|
+-------+---------------------+
为了清楚起见,visitor2
已经暴露于 channel D
,然后A
,然后C
;他没有购买。因此该序列将形成为D->A-C->no_purchase
。
注意:每当发生进货,渠道价值远远blank
并且purchase_flag
设置为1。
我想Scala UDF
在 Spark 中使用 a 来做到这一点,以便我在其他数据集上重新应用该方法。
这是使用udf
函数完成的方法
val myDF = Seq(
(1,"A",100,0,0),
(1,"E",200,0,0),
(1,"",300,1,49),
(2,"A",200,0,0),
(2,"C",300,0,0),
(2,"D",100,0,0)
).toDF("visitor","channel","timestamp","purchase_flag","amount")
import org.apache.spark.sql.functions._
def sequenceUdf = udf((struct: Seq[Row], purchased: Seq[Int])=> struct.map(row => (row.getAs[String]("channel"), row.getAs[Int]("timestamp"))).sortBy(_._2).map(_._1).filterNot(_ == "").mkString("->")+{if(purchased.contains(1)) "->purchase" else "->no_purchase"})
myDF.groupBy("visitor").agg(collect_list(struct("channel", "timestamp")).as("struct"), collect_list("purchase_flag").as("purchased"))
.select(col("visitor"), sequenceUdf(col("struct"), col("purchased")).as("channel sequence"))
.show(false)
这应该给你
+-------+--------------------+
|visitor|channel sequence |
+-------+--------------------+
|1 |A->E->purchase |
|2 |D->A->C->no_purchase|
+-------+--------------------+
您可以使其尽可能通用。这只是关于您应该如何进行的演示
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句