Pyspark:如何在 HDFS 中并行化多 gz 文件处理

诺蒂拉斯

我有许多gz文件存储在 20 个节点HDFS集群中,需要按列聚合。这些gz文件非常大(每个 1GByte,总共 200 个文件)。数据格式为key value,有2个列值: ['key','value1','value2'], 需要按key分组,按列聚合:sum(value1), count(value2)

数据已经按键排序,每个 gz 文件都有唯一的键值。

例如:

File 1:
k1,v1,u1
k1,v2,u1
k2,v2,u2
k3,v3,u3
k3,v4,u4

File 2:
k4,v5,u6
k4,v7,u8
k5,v9,v10

File 3:
k6,...
...
...

File 200:
k200,v200,u200
k201,v201,u201

我首先解析日期并将数据转换为(key, list of (values))结构。解析器输出将是这样的:

parser output
(k1,[v1,u1])
(k1,[v2,u1])
(k2,[v2,u2])
(k3,[v3,u3])
(k3,[v4,u4])

然后使用reduceByKey函数按键值分组,这比groupByKey函数更有效

reducer output:
(k1,[[v1,u1],[v2,u1])
(k2,[[v2,u2]])
(k3,[[v3,u3],[v4,u4]])

然后使用 process 函数聚合列:

process 
(k1, sum([v1,v2], len([u1,u3])))
(k2, sum([v2], len([u2])))
(k3, sum([v3,v4], len([u3,u4])))

这是该过程的示例代码

import pyspark
from pyspark import SparkFiles

def parser(line):
    try:
        key,val=line.split('\t)
        return (key,[val1,val2])
    except:
        return None

def process(line):
    key,gr= line[0],line[1]
    vals=zip(*gr)

    val1=sum(vals[0])
    val2=len(vals[1])
    return ('\t'.join([key,val1,val2]))

sc = pyspark.SparkContext(appName="parse")
logs=sc.textFile("hdfs:///home/user1/*.gz")
proc=logs.map(parser).filter(bool).reduceByKey(lambda acc,x: acc+x).map(process)
proc.saveAsTextFile('hdfs:///home/user1/output1')

我认为这段代码没有充分利用火花集群。我喜欢优化代码以充分利用考虑的处理。

1. 在 HDFS 和 Pyspark 中处理 gz 文件的最佳方法是什么?--如何将gz文件处理完全分发到整个集群?

2、如何充分利用每个节点的所有CPU?用于聚合和解析过程

塞尔奈

您至少应该考虑以下几点:

  1. 如果您使用的是 YARN,则为您分配给 Spark 应用程序的执行程序数量和每个执行程序的内核数。它们可以由 --num-executors 和 --executor-cores 控制。如果您不使用 YARN,您的调度程序可能会有类似的机制来控制并行性,请尝试寻找它。
  2. DataFrame 中的分区数,直接影响作业中的并行度。您可以使用repartition和/或coalesce来控制它

两者都可以限制作业使用的核心,从而限制集群的使用。此外,请考虑到使用更多 CPU 不一定意味着更好的性能(或执行时间)。这将取决于集群的大小和问题的大小,我不知道有什么简单的规则可以决定这一点。对我来说,通常归结为试验不同的配置,看看哪一个具有更好的性能。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

如何处理 tar.gz 中的 .desktop 文件

来自分类Dev

HDFS中的文件如何处理Spark分区?

来自分类Dev

从pyspark读取hdfs中的文件

来自分类Dev

如何在 HDFS 文件中搜索

来自分类Dev

如何将HDFS文件输入到R mapreduce中进行处理并将结果获取到HDFS文件中

来自分类Dev

通过Python处理HDFS中的多个文件

来自分类Dev

如何在R中创建tar.gz文件?

来自分类Dev

如何在Windows中打开.tar.gz文件?

来自分类Dev

如何在Java中合并多个.gz文件?

来自分类Dev

如何在C#中创建tar.gz文件

来自分类Dev

如何在Java中找出Corrupt gz文件

来自分类Dev

Hadoop:如何将XML文件存储在HDFS中并由Mappers处理?

来自分类Dev

如何在HDFS中检索已永久删除的文件

来自分类Dev

如何在Spark中读取HDFS序列文件

来自分类Dev

如何在 HDFS 中查找访问频率较低的文件

来自分类Dev

gzip文件如何存储在HDFS中

来自分类Dev

小文件如何存储在HDFS中

来自分类Dev

如何从HDFS删除文件?

来自分类Dev

如何在pyspark中处理异常?

来自分类Dev

如何在现有的tar.gz归档文件中添加/更新文件?

来自分类Dev

如何在.tar.gz中获取文件大小(原始文件大小)而不解压缩?

来自分类Dev

Pyspark:hdfs 中没有这样的文件或目录

来自分类Dev

如何在Apache NiFi中配置putHDFS处理器,以便可以通过网络将文件从本地计算机传输到HDFS?

来自分类Dev

如何在 spark 中解析 json 文件?以及如何在 spark 或 hdfs 中插入 dynamo DB?

来自分类Dev

如何使用wholeTextFiles在Spark中读取gz文件

来自分类Dev

如何从.gz文件中搜索特定的字符串?

来自分类Dev

如何从github R包中获取tar.gz文件

来自分类Dev

如何使用wholeTextFiles在Spark中读取gz文件

来自分类Dev

如何在Ubuntu 15.10中将tar.gz文件中的Robomongo安装为程序

Related 相关文章

  1. 1

    如何处理 tar.gz 中的 .desktop 文件

  2. 2

    HDFS中的文件如何处理Spark分区?

  3. 3

    从pyspark读取hdfs中的文件

  4. 4

    如何在 HDFS 文件中搜索

  5. 5

    如何将HDFS文件输入到R mapreduce中进行处理并将结果获取到HDFS文件中

  6. 6

    通过Python处理HDFS中的多个文件

  7. 7

    如何在R中创建tar.gz文件?

  8. 8

    如何在Windows中打开.tar.gz文件?

  9. 9

    如何在Java中合并多个.gz文件?

  10. 10

    如何在C#中创建tar.gz文件

  11. 11

    如何在Java中找出Corrupt gz文件

  12. 12

    Hadoop:如何将XML文件存储在HDFS中并由Mappers处理?

  13. 13

    如何在HDFS中检索已永久删除的文件

  14. 14

    如何在Spark中读取HDFS序列文件

  15. 15

    如何在 HDFS 中查找访问频率较低的文件

  16. 16

    gzip文件如何存储在HDFS中

  17. 17

    小文件如何存储在HDFS中

  18. 18

    如何从HDFS删除文件?

  19. 19

    如何在pyspark中处理异常?

  20. 20

    如何在现有的tar.gz归档文件中添加/更新文件?

  21. 21

    如何在.tar.gz中获取文件大小(原始文件大小)而不解压缩?

  22. 22

    Pyspark:hdfs 中没有这样的文件或目录

  23. 23

    如何在Apache NiFi中配置putHDFS处理器,以便可以通过网络将文件从本地计算机传输到HDFS?

  24. 24

    如何在 spark 中解析 json 文件?以及如何在 spark 或 hdfs 中插入 dynamo DB?

  25. 25

    如何使用wholeTextFiles在Spark中读取gz文件

  26. 26

    如何从.gz文件中搜索特定的字符串?

  27. 27

    如何从github R包中获取tar.gz文件

  28. 28

    如何使用wholeTextFiles在Spark中读取gz文件

  29. 29

    如何在Ubuntu 15.10中将tar.gz文件中的Robomongo安装为程序

热门标签

归档