如何使用Scala删除重复的元组?笛卡尔Scala Spark

阿米莉

实际上,我有一个包含一些蛋白质名称及其结构域的RDD。我使用了笛卡尔函数来确定可能的对。结果,我不幸地获得了重复的对。我如何只保留一个元组并删除重复的元组?这是一个例子:

+------------------------------------+------------------------------------+
|             Protein1               |                Protein2            |
+------------------------------------+------------------------------------+
|(P0C2L1,IPR0179)                    |(P0CW05,IPR004372;IPR000890)        |
|(P0CW05,IPR004372;IPR000890)        |(P0C2L1,IPR0179)                    |
|(B2UDV1,IPR0104)                    |(Q4R8P0,IPR029058;IPR000073;IPR0266)| 
|(Q4R8P0,IPR029058;IPR000073;IPR0266)|(B2UDV1,IPR0104)                    |
+------------------------------------+------------------------------------+

我想拥有:

+------------------------------------+------------------------------------+
|             Protein1               |                Protein2            |
+------------------------------------+------------------------------------+
|(P0C2L1,IPR0179)                    |(P0CW05,IPR004372;IPR000890)        |
|(B2UDV1,IPR0104)                    |(Q4R8P0,IPR029058;IPR000073;IPR0266)| 
+------------------------------------+------------------------------------+
曼尼什

我已根据以下解决方案提供和实施的信息假定了输入数据。

它确实:

  1. 将RDD转换为spark数据帧。
  2. 根据length(Protein1)> length(Protein2)每秒交换一次输入。
  3. 并使用dropDuplicates方法删除重复项。
  4. 存储在数据帧中,然后存储在RDD中。

注意:要实现此目的,必须满足“ length(Protein1)> length(Protein2)”的要求。如果OP提供了更多的输入数据清晰度,则可以使用更多的解决方案。

//Creating the paired RDD as provided by OP
var x: RDD[(String, String)] = sc.parallelize(Seq((("P0C2L1,IPR0179"), ("P0CW05,IPR004372;IPR000890")), (("P0CW05,IPR004372;IPR000890"),("P0C2L1,IPR0179") ), (("B2UDV1,IPR0104"),("Q4R8P0,IPR029058;IPR000073;IPR0266")), (("Q4R8P0,IPR029058;IPR000073;IPR0266"),("B2UDV1,IPR0104"))   ))

//Creating as spark dataframe out of this RDD
var combDF = spark.createDataFrame(x).toDF("Protein1","Protein2")
combDF.show(20,false)

//+------------------------------------+------------------------------------+
//|Protein1                            |Protein2                            |
//+------------------------------------+------------------------------------+
//|(P0C2L1,IPR0179)                    |(P0CW05,IPR004372;IPR000890)        |
//|(P0CW05,IPR004372;IPR000890)        |(P0C2L1,IPR0179)                    |
//|(B2UDV1,IPR0104)                    |(Q4R8P0,IPR029058;IPR000073;IPR0266)|
//|(Q4R8P0,IPR029058;IPR000073;IPR0266)|(B2UDV1,IPR0104)                    |
//+------------------------------------+------------------------------------+

// creating temporary views
combDF.createOrReplaceTempView("combDF")

// Below statement is only required for this example just to cast to struct
combDF = spark.sql("""select named_struct("col1", element_at(split(Protein1,","),1), "col2",  element_at(split(Protein1,","),2)) as Protein1,
                       named_struct("col1", element_at(split(Protein2,","),1), "col2",  element_at(split(Protein2,","),2)) as Protein2
                 from combDF""")
//end 

combDF.createOrReplaceTempView("combDF")
combDF.show()
var result = spark.sql("""
  |select case when length(Protein1_m) > length(Protein2_m) then element_at(protein_array, 2) 
  |            else element_at(protein_array, 1) 
  |            end as Protein1,
  |       case when length(Protein1_m) > length(Protein2_m) then element_at(protein_array, 1) 
  |            else element_at(protein_array, 2) 
  |            end as Protein2
  |from 
  |(select Protein1, Protein2, cast(Protein1 as string) as Protein1_m, cast(Protein2 as string) as Protein2_m,
  |        array(Protein1,Protein2) as protein_array
  |from combDF) a
""".stripMargin).dropDuplicates()
// Result in spark dataframe
result.show(20,false)

//+-----------------+-------------------------------------+
//|Protein1         |Protein2                             |
//+-----------------+-------------------------------------+
//|(B2UDV1,IPR0104) |(Q4R8P0,IPR029058;IPR000073;IPR0266) |
//|(P0C2L1,IPR0179) |(P0CW05,IPR004372;IPR000890)         |
//+-------------------+-----------------------------------+

// result in RDD
var resultRDD = result.rdd
resultRDD.collect().foreach(println)

//[(B2UDV1,IPR0104),(Q4R8P0,IPR029058;IPR000073;IPR0266)]   
//[(P0C2L1,IPR0179),(P0CW05,IPR004372;IPR000890)]

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

如何在Scala的Spark Streaming中获得两个DStream的笛卡尔积?

来自分类Dev

如何在Scala的Spark Streaming中获得两个DStream的笛卡尔积?

来自分类Dev

两个RDD的笛卡尔积和笛卡尔的总和导致Spark(scala)?

来自分类Dev

从Spark Scala删除重复的密钥

来自分类Dev

两个数据帧的笛卡尔积和笛卡尔结果的总和在 Spark(scala) 中?

来自分类Dev

Spark - 加速笛卡尔积

来自分类Dev

如何使用 spark-scala 删除 rdd 中的 unicode?

来自分类Dev

如何在Scala中使用Spark RDD删除重复项(更像是基于多个属性的过滤器)?

来自分类Dev

比较文档并删除Spark和Scala中的重复项

来自分类Dev

在Spark Scala中的JSON解析中删除重复属性

来自分类Dev

比较文档并删除Spark和Scala中的重复项

来自分类Dev

Spark Scala 2.10元组限制

来自分类Dev

如何使用Scala在Spark中创建SQLContext?

来自分类Dev

如何使用Spark / Scala展平集合?

来自分类Dev

如何使用Scala在Spark中处理日期?

来自分类Dev

如何使用spark / scala解析YAML

来自分类Dev

如何使用Spark Scala获取年份计数

来自分类Dev

如何使用Scala模拟Spark DataFrameReader?

来自分类Dev

使用SBT运行Scala Spark

来自分类Dev

Spark:在Scala中使用mapPartition

来自分类Dev

如何删除Scala Spark中的多个字符?

来自分类Dev

使用 Scala 删除 Spark 1.6 输出中的 Unicode 值

来自分类Dev

如何在Spark中使用Scala从多个文件中删除前几行/标题

来自分类Dev

Spark Scala运行

来自分类Dev

ClassNotFoundException Spark提交Scala

来自分类Dev

Spark Scala了解reduceByKey(_ + _)

来自分类Dev

在Spark Scala中旋转

来自分类Dev

Spark / Scala近似分组

来自分类Dev

从Spark Scala连接Presto

Related 相关文章

热门标签

归档