实际上,我有一个包含一些蛋白质名称及其结构域的RDD。我使用了笛卡尔函数来确定可能的对。结果,我不幸地获得了重复的对。我如何只保留一个元组并删除重复的元组?这是一个例子:
+------------------------------------+------------------------------------+
| Protein1 | Protein2 |
+------------------------------------+------------------------------------+
|(P0C2L1,IPR0179) |(P0CW05,IPR004372;IPR000890) |
|(P0CW05,IPR004372;IPR000890) |(P0C2L1,IPR0179) |
|(B2UDV1,IPR0104) |(Q4R8P0,IPR029058;IPR000073;IPR0266)|
|(Q4R8P0,IPR029058;IPR000073;IPR0266)|(B2UDV1,IPR0104) |
+------------------------------------+------------------------------------+
我想拥有:
+------------------------------------+------------------------------------+
| Protein1 | Protein2 |
+------------------------------------+------------------------------------+
|(P0C2L1,IPR0179) |(P0CW05,IPR004372;IPR000890) |
|(B2UDV1,IPR0104) |(Q4R8P0,IPR029058;IPR000073;IPR0266)|
+------------------------------------+------------------------------------+
我已根据以下解决方案提供和实施的信息假定了输入数据。
它确实:
注意:要实现此目的,必须满足“ length(Protein1)> length(Protein2)”的要求。如果OP提供了更多的输入数据清晰度,则可以使用更多的解决方案。
//Creating the paired RDD as provided by OP
var x: RDD[(String, String)] = sc.parallelize(Seq((("P0C2L1,IPR0179"), ("P0CW05,IPR004372;IPR000890")), (("P0CW05,IPR004372;IPR000890"),("P0C2L1,IPR0179") ), (("B2UDV1,IPR0104"),("Q4R8P0,IPR029058;IPR000073;IPR0266")), (("Q4R8P0,IPR029058;IPR000073;IPR0266"),("B2UDV1,IPR0104")) ))
//Creating as spark dataframe out of this RDD
var combDF = spark.createDataFrame(x).toDF("Protein1","Protein2")
combDF.show(20,false)
//+------------------------------------+------------------------------------+
//|Protein1 |Protein2 |
//+------------------------------------+------------------------------------+
//|(P0C2L1,IPR0179) |(P0CW05,IPR004372;IPR000890) |
//|(P0CW05,IPR004372;IPR000890) |(P0C2L1,IPR0179) |
//|(B2UDV1,IPR0104) |(Q4R8P0,IPR029058;IPR000073;IPR0266)|
//|(Q4R8P0,IPR029058;IPR000073;IPR0266)|(B2UDV1,IPR0104) |
//+------------------------------------+------------------------------------+
// creating temporary views
combDF.createOrReplaceTempView("combDF")
// Below statement is only required for this example just to cast to struct
combDF = spark.sql("""select named_struct("col1", element_at(split(Protein1,","),1), "col2", element_at(split(Protein1,","),2)) as Protein1,
named_struct("col1", element_at(split(Protein2,","),1), "col2", element_at(split(Protein2,","),2)) as Protein2
from combDF""")
//end
combDF.createOrReplaceTempView("combDF")
combDF.show()
var result = spark.sql("""
|select case when length(Protein1_m) > length(Protein2_m) then element_at(protein_array, 2)
| else element_at(protein_array, 1)
| end as Protein1,
| case when length(Protein1_m) > length(Protein2_m) then element_at(protein_array, 1)
| else element_at(protein_array, 2)
| end as Protein2
|from
|(select Protein1, Protein2, cast(Protein1 as string) as Protein1_m, cast(Protein2 as string) as Protein2_m,
| array(Protein1,Protein2) as protein_array
|from combDF) a
""".stripMargin).dropDuplicates()
// Result in spark dataframe
result.show(20,false)
//+-----------------+-------------------------------------+
//|Protein1 |Protein2 |
//+-----------------+-------------------------------------+
//|(B2UDV1,IPR0104) |(Q4R8P0,IPR029058;IPR000073;IPR0266) |
//|(P0C2L1,IPR0179) |(P0CW05,IPR004372;IPR000890) |
//+-------------------+-----------------------------------+
// result in RDD
var resultRDD = result.rdd
resultRDD.collect().foreach(println)
//[(B2UDV1,IPR0104),(Q4R8P0,IPR029058;IPR000073;IPR0266)]
//[(P0C2L1,IPR0179),(P0CW05,IPR004372;IPR000890)]
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句