我正在尝试在Spark 1.6.1中使用SQLContext.subtract()基于另一个数据框中的列从数据框中删除行。让我们举个例子:
from pyspark.sql import Row
df1 = sqlContext.createDataFrame([
Row(name='Alice', age=2),
Row(name='Bob', age=1),
]).alias('df1')
df2 = sqlContext.createDataFrame([
Row(name='Bob'),
])
df1_with_df2 = df1.join(df2, 'name').select('df1.*')
df1_without_df2 = df1.subtract(df1_with_df2)
由于我希望所有df1
不包括在内的行都在name='Bob'
我的期望之中Row(age=2, name='Alice')
。但是我还检索了鲍勃:
print(df1_without_df2.collect())
# [Row(age='1', name='Bob'), Row(age='2', name='Alice')]
经过各种实验以了解此MCVE,我发现问题出在age
关键。如果我忽略它:
df1_noage = sqlContext.createDataFrame([
Row(name='Alice'),
Row(name='Bob'),
]).alias('df1_noage')
df1_noage_with_df2 = df1_noage.join(df2, 'name').select('df1_noage.*')
df1_noage_without_df2 = df1_noage.subtract(df1_noage_with_df2)
print(df1_noage_without_df2.collect())
# [Row(name='Alice')]
然后我只得到预期的爱丽丝。我所做的最奇怪的观察是,可以添加键,只要它们在连接中使用了键之后(按字典顺序的意义):
df1_zage = sqlContext.createDataFrame([
Row(zage=2, name='Alice'),
Row(zage=1, name='Bob'),
]).alias('df1_zage')
df1_zage_with_df2 = df1_zage.join(df2, 'name').select('df1_zage.*')
df1_zage_without_df2 = df1_zage.subtract(df1_zage_with_df2)
print(df1_zage_without_df2.collect())
# [Row(name='Alice', zage=2)]
我正确地得到了爱丽丝(和她的zage)!在我的实际示例中,我对所有列都感兴趣,而不仅仅是after之后的那些列name
。
好吧,这里有一些错误(第一个问题似乎与SPARK-6231涉及相同的问题),JIRA看起来是个好主意,但SUBTRACT
/EXCEPT
不是部分匹配的正确选择。
相反,从Spark 2.0开始,您可以使用anti-join:
df1.join(df1_with_df2, ["name"], "leftanti").show()
在1.6中,您可以使用标准外部联接执行几乎相同的操作:
import pyspark.sql.functions as F
ref = df1_with_df2.select("name").alias("ref")
(df1
.join(ref, ref.name == df1.name, "leftouter")
.filter(F.isnull("ref.name"))
.drop(F.col("ref.name")))
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句