用sqlcontext激发并行查询

希普卢·莫卡丁

我们正在尝试在EMR中运行ETL。S3中有大约2000亿个事件,即gzip json行。它们总共约有30个文件。我正在使用pyspark。

这是代码,

def value_to_list(columns):
    def value_map(values):
        data = []
        for val in values:
            d = val.asDict()
            data.append([d[column] for column in columns])
        return data

    return value_map


def main():
    sc = SparkContext()
    sql_context = SQLContenxt(sc)

    all_events = SQLContenxt(sc).read.json("s3n://...", schema=StructType(fields), timestampFormat="yyyy-MM-dd HH:mm:ss")
    all_events.registerTempTable('allevents')

    for event_type in event_types:
        process_event(sc, event_type, "allevents")


def process_event(sparkcontext, etype, tablename):
    query = "select app_id, source, type, {time_cols}, count(*) as total " \
            "from {table} where  type = '{event_type}' " \
            "group by app_id, source, type, {time_cols}"
    time_cols_spec = [('hour', 'day', 'month', 'year'),
                      ('day', 'month', 'year'),
                      ('month', 'year'),
                      ('year')]

    for time_cols in time_cols_spec:
        final_query = query.format(time_cols=", ".join(time_cols),
                                   table=tablename,
                                   event_type=etype)
        dataframe = sql_context.sql(final_query)

        dataframe.rdd.groupBy(lambda r: r['app_id'])\
            .mapValues(value_to_list(['source'] + time_cols))\
            .saveAsTextFile("s3n://...")

因此,我们大约有30种类型的事件,对于每个事件,我将按小时,日,月和年的4种组合进行汇总。因此,每个查询4个。我们总共有大约2000M个事件。

我正在继续运行

  • AWS EMR(5.0.3)
  • Apache Spark 2.0.1
  • 1个主人,2个工人
  • 每台机器是 m3.2xlarge
  • 总内存为90GB

问题是,最后的保存要花很长时间。上次我查询时花了14个小时进行了2次组合和1次事件:(

我知道我不会并行进行。循环是顺序的。并且有2个循环。但我希望rdd,groupBymapValues以运行并行。当我看到事件时间轴时,我看到它saveAsTextFile花费了99%的时间。可能是因为火花延迟执行。

我需要使此过程并行且快速。我怎样才能做到这一点?

马里乌斯

您可以应用4种主要的优化方法:

  1. 您正在对未针对查询进行优化的纯json文件执行聚合。将它们重写为镶木地板,按事件类型重新分区并存储在S3上-它们将占用更少的空间,并且您的应用程序将获得不错的速度提升。

  2. 增加并行度。在这种功能强大的VM上不需要驱动程序(主机),而是生成一个较小的实例(m3.medium例如),并使用所有三个大型实例供工作人员使用。

  3. 将RDD API调用替换为Dataframes:.rdd.groupBy().mapValues()可以替换为.groupBy(dataframe.app_id).agg(collect_list()),然后进行某些映射。

  4. 您可以对(小时,日,月,年)数据集的原始数据执行查询,然后使用此聚合对给定事件进行所有剩余查询的查询。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

用jdbc和mysql并行化查询是否值得?

来自分类Dev

用激发精神中的其他弦代替点燃

来自分类Dev

带有标头的 Spark SQLContext 查询

来自分类Dev

并行运行查询

来自分类Dev

Firebase 并行查询

来自分类Dev

使Linq查询并行运行

来自分类Dev

如何并行执行 Redshift 查询

来自分类Dev

用数组并行输入GNU?

来自分类Dev

通过流并行执行多个查询

来自分类Dev

实体框架是否支持并行异步查询?

来自分类Dev

PHP / MYSQL并行代码/查询执行

来自分类Dev

与promises / bluebird并行的猫鼬查询?

来自分类Dev

并行查询多个SQL Server表

来自分类Dev

SQL查询以透视和合并行

来自分类Dev

查询是否在Couchbase中并行化

来自分类Dev

Node.js中的并行SQLite查询

来自分类Dev

在MS Access SQL查询中合并行

来自分类Dev

查询重新并行运行循环脚本

来自分类Dev

如何并行而不是顺序执行多个查询?

来自分类Dev

SQL查询根据定义的序列合并行

来自分类Dev

我如何运行合并行的查询?

来自分类Dev

如何并行查询不同类型的文档?

来自分类Dev

Apache Drill 并行查询是按顺序完成的

来自分类Dev

Apache NiFi - “ExecuteSQL”并行运行查询?

来自分类Dev

用gnu并行csh数组/命令替换

来自分类Dev

用wget并行下载几个文件

来自分类Dev

用twitter gem进行并行调用

来自分类Dev

验证Hive单查询和多查询并行性

来自分类Dev

无法从Apache Spark SQL 1.5.2在SQLContext中运行查询,获取java.lang.NoSuchMethodError