我们正在尝试在EMR中运行ETL。S3中有大约2000亿个事件,即gzip json行。它们总共约有30个文件。我正在使用pyspark。
这是代码,
def value_to_list(columns):
def value_map(values):
data = []
for val in values:
d = val.asDict()
data.append([d[column] for column in columns])
return data
return value_map
def main():
sc = SparkContext()
sql_context = SQLContenxt(sc)
all_events = SQLContenxt(sc).read.json("s3n://...", schema=StructType(fields), timestampFormat="yyyy-MM-dd HH:mm:ss")
all_events.registerTempTable('allevents')
for event_type in event_types:
process_event(sc, event_type, "allevents")
def process_event(sparkcontext, etype, tablename):
query = "select app_id, source, type, {time_cols}, count(*) as total " \
"from {table} where type = '{event_type}' " \
"group by app_id, source, type, {time_cols}"
time_cols_spec = [('hour', 'day', 'month', 'year'),
('day', 'month', 'year'),
('month', 'year'),
('year')]
for time_cols in time_cols_spec:
final_query = query.format(time_cols=", ".join(time_cols),
table=tablename,
event_type=etype)
dataframe = sql_context.sql(final_query)
dataframe.rdd.groupBy(lambda r: r['app_id'])\
.mapValues(value_to_list(['source'] + time_cols))\
.saveAsTextFile("s3n://...")
因此,我们大约有30种类型的事件,对于每个事件,我将按小时,日,月和年的4种组合进行汇总。因此,每个查询4个。我们总共有大约2000M个事件。
我正在继续运行
m3.2xlarge
问题是,最后的保存要花很长时间。上次我查询时花了14个小时进行了2次组合和1次事件:(
我知道我不会并行进行。循环是顺序的。并且有2个循环。但我希望rdd,groupBy
,mapValues
以运行并行。当我看到事件时间轴时,我看到它saveAsTextFile
花费了99%的时间。可能是因为火花延迟执行。
我需要使此过程并行且快速。我怎样才能做到这一点?
您可以应用4种主要的优化方法:
您正在对未针对查询进行优化的纯json文件执行聚合。将它们重写为镶木地板,按事件类型重新分区并存储在S3上-它们将占用更少的空间,并且您的应用程序将获得不错的速度提升。
增加并行度。在这种功能强大的VM上不需要驱动程序(主机),而是生成一个较小的实例(m3.medium
例如),并使用所有三个大型实例供工作人员使用。
将RDD API调用替换为Dataframes:.rdd.groupBy().mapValues()
可以替换为.groupBy(dataframe.app_id).agg(collect_list())
,然后进行某些映射。
您可以对(小时,日,月,年)数据集的原始数据执行查询,然后使用此聚合对给定事件进行所有剩余查询的查询。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句