我有一堆CSV文件,我读这些文件可以触发火花(与pyspark一起使用),在我想将它们在特定字段上加入一个大表之后。
问题是,此字段不是唯一的,但相关的属性是唯一的。数据的来源是唯一的,但是在我将其作为csv之前,这些信息已被删除。我无法使用联接查询的附加属性说明文件之间的连接。但是所有文件中出现的顺序都说明了结构。因此,如果我可以使人造ID具有文件中的ID和出现次数,它将起作用。
我的问题是,我可以定义一个SparkSQL查询(或其他pyspark方法),通过它为每个文件中的非唯一行添加连续计数,以便可以将其用于联接吗?
我想要的是:
ID| ct(ID) | generated_number
A | 2 | 1
A | 2 | 2
A | 2 | 3
B | 1 | 1
C | 2 | 1
C | 2 | 2
D | 1 | 1
E | 3 | 1
E | 3 | 2
E | 3 | 3
基于这一点,我可以创建一个新的ID,例如conc(ID,'_',generate_number)-至少我会对具有非唯一性的行进行处理。
是否有一个聪明的SparkNative版本,我真的不想在Shell脚本中修改源数据(我想这很糟糕)
非常感谢
解:
这两个答案都适合解决方案,非常感谢。我现在的方法如下:
SELECT ID,
row_number() OVER (
PARTITION BY ID
ORDER BY ID ) as row_count,
count(ID) OVER (
PARTITION BY ID
ORDER BY ID ) as count
FROM TB_TEMP AS main
WHERE cellname_s = "A"
好吧,不要使用WHERE子句,但是为了显示它,它更容易;)
这给了我想要的输出:
+----------+---------+-----+
| ID|row_count|count|
+----------+---------+-----+
| A| 1| 4|
| A| 2| 4|
| A| 3| 4|
| A| 4| 4|
+----------+---------+-----+
要获取我的唯一ID,我将
CASE WHEN count > 1 THEN concact(ID, "_", row_count) ELSE ID END AS ID
因此,为我提供了我没有但没有操纵过的唯一字段,这对处理数据的人员来说更好。
我认为您在这里要求一个row_number
使用类似
select id,CT(id),row_number() over(partition by id,CT(id) order by id) from ** your ** table
如果希望在Dataframe中使用它,则可以使用:
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句