在本文的基础上,我实现了自定义模式公式,但是发现此函数的性能存在问题。本质上,当我进入此聚合时,我的群集仅使用我的一个线程,这对性能没有太大帮助。我正在对16k行中的150多个属性(主要是分类数据)进行计算,我认为我可以将其拆分为单独的线程/进程,并在以后将其放回单个数据帧中。请注意,此聚合必须位于两列上,因此由于无法使用单个列作为索引,我的性能可能会变差。
有没有一种方法可以将迟来的期货或并行处理纳入汇总计算中?
import dask.dataframe as dd
from dask.distributed import Client
from pandas import DataFrame
def chunk(s):
return s.value_counts()
def agg(s):
s = s._selected_obj
return s.groupby(level=list(range(s.index.nlevels))).sum()
def finalize(s):
# s is a multi-index series of the form (group, value): count. First
# manually group on the group part of the index. The lambda will receive a
# sub-series with multi index. Next, drop the group part from the index.
# Finally, determine the index with the maximum value, i.e., the mode.
level = list(range(s.index.nlevels - 1))
return (
s.groupby(level=level)
.apply(lambda s: s.reset_index(level=level, drop=True).argmax())
)
def main() -> DataFrame:
client = Client('scheduler:8786')
ddf = dd.read_csv('/sample/data.csv')
custom_mode = dd.Aggregation('custom mode', chunk, agg, finalize)
result = ddf.groupby(['a','b']).agg(custom_mode).compute()
return result
旁注,我正在使用Docker通过daskdev / dask(2.18.1)Docker镜像启动我的调度程序和工作程序。
最后,我使用期货从本质上并行化了每一列的聚合。因为我有很多列,所以将每个聚合传递给它自己的工作线程节省了我很多时间。感谢David的评论以及dask文档中有关并行工作负载的文章!
from dask.distributed import Client
from pandas import DataFrame
def chunk(s):
return s.value_counts()
def agg(s):
s = s._selected_obj
return s.groupby(level=list(range(s.index.nlevels))).sum()
def finalize(s):
level = list(range(s.index.nlevels - 1))
return (
s.groupby(level=level)
.apply(lambda s: s.reset_index(level=level, drop=True).idxmax())
)
def delayed_mode(ddf, groupby, col, custom_agg):
return ddf.groupby(groupby).agg({col: custom_agg}).compute()
def main() -> DataFrame:
client = Client('scheduler:8786')
ddf = dd.read_csv('/sample/data.csv')
custom_mode = dd.Aggregation('custom mode', chunk, agg, finalize)
futures = []
for col in multiple_trimmed.columns:
future = client.submit(delayed_mode, ddf, ["a", "b"], col, custom_mode_dask)
futures.append(future)
ddfs = client.gather(futures)
result = pd.concat(ddfs, axis=1)
return result
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句