我有许多gz
文件存储在 20 个节点HDFS
集群中,需要按列聚合。这些gz
文件非常大(每个 1GByte,总共 200 个文件)。数据格式为key value,有2个列值: ['key','value1','value2']
, 需要按key分组,按列聚合:sum(value1)
, count(value2)
。
数据已经按键排序,每个 gz 文件都有唯一的键值。
例如:
File 1:
k1,v1,u1
k1,v2,u1
k2,v2,u2
k3,v3,u3
k3,v4,u4
File 2:
k4,v5,u6
k4,v7,u8
k5,v9,v10
File 3:
k6,...
...
...
File 200:
k200,v200,u200
k201,v201,u201
我首先解析日期并将数据转换为(key, list of (values))
结构。解析器输出将是这样的:
parser output
(k1,[v1,u1])
(k1,[v2,u1])
(k2,[v2,u2])
(k3,[v3,u3])
(k3,[v4,u4])
然后使用reduceByKey
函数按键值分组,这比groupByKey
函数更有效。
reducer output:
(k1,[[v1,u1],[v2,u1])
(k2,[[v2,u2]])
(k3,[[v3,u3],[v4,u4]])
然后使用 process 函数聚合列:
process
(k1, sum([v1,v2], len([u1,u3])))
(k2, sum([v2], len([u2])))
(k3, sum([v3,v4], len([u3,u4])))
这是该过程的示例代码
import pyspark
from pyspark import SparkFiles
def parser(line):
try:
key,val=line.split('\t)
return (key,[val1,val2])
except:
return None
def process(line):
key,gr= line[0],line[1]
vals=zip(*gr)
val1=sum(vals[0])
val2=len(vals[1])
return ('\t'.join([key,val1,val2]))
sc = pyspark.SparkContext(appName="parse")
logs=sc.textFile("hdfs:///home/user1/*.gz")
proc=logs.map(parser).filter(bool).reduceByKey(lambda acc,x: acc+x).map(process)
proc.saveAsTextFile('hdfs:///home/user1/output1')
我认为这段代码没有充分利用火花集群。我喜欢优化代码以充分利用考虑的处理。
1. 在 HDFS 和 Pyspark 中处理 gz 文件的最佳方法是什么?--如何将gz文件处理完全分发到整个集群?
2、如何充分利用每个节点的所有CPU?用于聚合和解析过程
您至少应该考虑以下几点:
两者都可以限制作业使用的核心,从而限制集群的使用。此外,请考虑到使用更多 CPU 不一定意味着更好的性能(或执行时间)。这将取决于集群的大小和问题的大小,我不知道有什么简单的规则可以决定这一点。对我来说,通常归结为试验不同的配置,看看哪一个具有更好的性能。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句