我也尝试了PyArrow,在我的示例中,我使用spark.sql语句获取了spark datframe。之后,我想转换为pandas数据框。为了显示执行时间,我运行了以下语句。
import time
startTime = time.time()
df=df.toPandas()
executionTime = (time.time() - startTime)
executionTime
这给了1021.55
我也试过了
import time
startTime = time.time()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
df=df.toPandas()
executionTime = (time.time() - startTime)
executionTime
这给了1008.71
简要介绍一下数据框的形状是(944,5)。以下是Spark数据框中的数据类型
import pandas as pd
pd.set_option('max_colwidth', -1) # to prevent truncating of columns in jupyter
def count_column_types(spark_df):
"""Count number of columns per type"""
return pd.DataFrame(spark_df.dtypes).groupby(1, as_index=False)[0].agg({'count':'count', 'names':lambda x: " | ".join(set(x))}).rename(columns={1:"type"})
count_column_types(df)
type count names
0 bigint 1 col4
1 date 1 col1
2 decimal(20,4) 1 col5
3 int 1 col2
4 string 1 col3
请让我知道我有什么办法可以提高效率
的spark.sql.execution.arrow.pyspark.enabled
,如果你正在使用所谓的熊猫UDF的有效果,但不是在你的情况。
您的问题是toPandas
需要从执行程序到驱动程序节点收集所有数据,但是在此之前,它需要处理SQL查询,并且可能存在主要瓶颈(您没有显示示例,所以很难说)。您可以尝试了解瓶颈所在的位置-在SQL查询执行中,或者实际上在瓶颈中toPandas
。为此,请尝试以下操作:
df = spark.sql(....)
import time
startTime = time.time()
df.write.format("noop").mode("overwrite").save()
executionTime = (time.time() - startTime)
executionTime
并将执行时间与您从中获得的时间进行比较toPandas
。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句