我有这个2虚拟数据帧
np.random.seed(12345)
df1=pd.DataFrame({'name' : ['A']*4+['B']*4,
'start_date': pd.to_datetime(['2000-03-15', '2000-06-12','2000-09-01', '2001-01-17','2000-03-19', '2000-06-14','2000-09-14', '2001-01-22']),
'end_date':pd.to_datetime(['2000-06-12','2000-09-01', '2001-01-17','2001-03-19', '2000-06-14','2000-09-14', '2001-01-22','2001-02-01'])})
date=pd.date_range('2000-01-01','2002-01-01')
name=['A']*len(date)+['B']*len(date)
date=date.append(date)
import numpy as np
low=np.random.rand(len(date))
high=low+np.random.rand(len(date))
df2=pd.DataFrame({'name': name, 'date': date, 'low':low,'high':high})
对于df1中的每一行,我都会得到名称,开始日期和结束日期。
我想找到与名称相同且在df2中的开始和结束日期之间的最大值的最大值和最小值的最小值
以下是我当前的解决方案。
df1=df1.set_index('name')
df2=df2.set_index(['name','date'])
df2=df2.sort_index()
df1['max']=-1
df1['min']=-1
for name in df1.index.unique():
df=df2.loc[name]
tmphigh=[]
tmplow=[]
for (_,start_date,end_date,_,_) in df1.loc[name].itertuples(name=None):
newdf=df.iloc[df.index.searchsorted(start_date): df.index.searchsorted(end_date)]
tmphigh.append(newdf.high.max())
tmplow.append(newdf.low.min())
df1.loc[[name],['max']]=tmphigh
df1.loc[[name],['min']]=tmplow
但是,应用超过一百万行仍然需要相当长的时间。我想知道是否有更快的方法。
[编辑]:感谢Pramote Kuacharoen,我能够修改他的一些代码,并比现有代码实现6倍的加速。
分成多个循环的原因是,我发现将df2 [name]的生成包含在apply函数中将导致计算时间的显着增加。
因此,我将其分开进行计算,这可能有助于减少函数调用以提取df2名称下的所有值。
如果有人可以提出比我更好的方法,我将感到非常高兴。但这对我来说已经足够了。
下面是我目前的解决方案
from tqdm import tqdm
df1a=df1.groupby('name')
df2a=df2.groupby('name')
mergedf=df1
mergedf['maximum']=-1
mergedf['minimum']=-1
def get_min_max(row):
dfx=df2x.iloc[df2x.index.searchsorted(row['start_date']): df2x.index.searchsorted(row['end_date'])]
maximum = dfx['high'].max()
minimum = dfx['low'].min()
return pd.Series({'maximum': maximum, 'minimum': minimum})
for name,df in tqdm(df1a):
df2x=df2a.get_group(name)
mergedf.loc[[name],['maximum','minimum']]=df.apply(get_min_max,axis=1)
import pandas as pd
df1=pd.DataFrame({'name' : ['A']*4+['B']*4,
'start_date': pd.to_datetime(['2000-03-15', '2000-06-12','2000-09-01', '2001-01-17','2000-03-19', '2000-06-14','2000-09-14', '2001-01-22']),
'end_date':pd.to_datetime(['2000-06-12','2000-09-01', '2001-01-17','2001-03-19', '2000-06-14','2000-09-14', '2001-01-22','2001-02-01'])})
date=pd.date_range('2000-01-01','2002-01-01')
name=['A']*len(date)+['B']*len(date)
date=date.append(date)
import numpy as np
low=np.random.rand(len(date))
high=low+np.random.rand(len(date))
df2=pd.DataFrame({'name': name, 'date': date, 'low':low,'high':high})
df2 = df2.set_index('date')
def find_max(row):
return df2[df2['name'] == row['name']].loc[row['start_date']:row['end_date'], 'high'].max()
def find_min(row):
return df2[df2['name'] == row['name']].loc[row['start_date']:row['end_date'], 'low'].min()
df1['maximum'] = df1.apply(find_max, axis=1)
df1['minimum'] = df1.apply(find_min, axis=1)
尝试一次通话找到最大和最小。这样可以节省一些时间。
def find_min_max(row):
dfx = df2[df2['name'] == row['name']].loc[row['start_date']:row['end_date'], ['high', 'low']]
maximum = dfx['high'].max()
minimum = dfx['low'].min()
return pd.Series({'maximum': maximum, 'minimum': minimum})
df1.merge(df1.apply(find_min_max, axis=1), left_index=True, right_index=True)
尝试以下一项:多处理和共享内存。将其保存在.py文件中,然后使用命令行运行它。它应该快得多。我将n_workers设置为4。您可以更改它。
import numpy as np
import pandas as pd
from multiprocessing.shared_memory import SharedMemory
from concurrent.futures import ProcessPoolExecutor, as_completed
def find_min_max(name, data_info):
shm_name, shape, dtype = data_info[0]
shm1 = SharedMemory(shm_name)
np1 = np.recarray(shape=shape, dtype=dtype, buf=shm1.buf)
shm_name, shape, dtype = data_info[1]
shm2 = SharedMemory(shm_name)
np2 = np.recarray(shape=shape, dtype=dtype, buf=shm2.buf)
data1 = np1[np1['name'] == name]
data2 = np2[np2['name'] == name]
for rec in data1:
idx1 = np.searchsorted(data2['date'], rec['start_date'])
idx2 = np.searchsorted(data2['date'], rec['end_date'])
data = data2[idx1:idx2]
np1[rec['index']]['maximum'] = data['high'].max()
np1[rec['index']]['minimum'] = data['low'].min()
def main():
np.random.seed(12345)
df1 = pd.DataFrame({'name': ['A']*4+['B']*4,
'start_date': pd.to_datetime(['2000-03-15', '2000-06-12', '2000-09-01', '2001-01-17', '2000-03-19', '2000-06-14', '2000-09-14', '2001-01-22']),
'end_date': pd.to_datetime(['2000-06-12', '2000-09-01', '2001-01-17', '2001-03-19', '2000-06-14', '2000-09-14', '2001-01-22', '2001-02-01'])})
date = pd.date_range('2000-01-01', '2002-01-01')
name = ['A']*len(date)+['B']*len(date)
date = date.append(date)
low = np.random.rand(len(date))
high = low+np.random.rand(len(date))
df2 = pd.DataFrame({'name': name, 'date': date, 'low': low, 'high': high})
df1 = df1.sort_values('name')
df2 = df2.sort_values(['name', 'date'])
df1['maximum'] = -1.0
df1['minimum'] = -1.0
np1 = df1.to_records(column_dtypes={
'name': '|S20', 'start_date': '<M8[ns]', 'end_date': '<M8[ns]'})
np2 = df2.to_records(column_dtypes={
'name': '|S20', 'date': '<M8[ns]', 'low': '<f8', 'high': '<f8'})
names = [str.encode(name) for name in df1['name'].unique()]
del df1
del df2
shm1 = SharedMemory(name='d1', create=True, size=np1.nbytes)
shm2 = SharedMemory(name='d2', create=True, size=np2.nbytes)
shm1_np_array = np.recarray(
shape=np1.shape, dtype=np1.dtype, buf=shm1.buf)
np.copyto(shm1_np_array, np1)
shm2_np_array = np.recarray(
shape=np2.shape, dtype=np2.dtype, buf=shm2.buf)
np.copyto(shm2_np_array, np2)
data_info = [
(shm1.name, np1.shape, np1.dtype),
(shm2.name, np2.shape, np2.dtype)
]
del np1
del np2
# Set number of workers
n_workers = 4
with ProcessPoolExecutor(n_workers) as exe:
fs = [exe.submit(find_min_max, name, data_info)
for name in names]
for _ in as_completed(fs):
pass
print(shm1_np_array)
shm1.close()
shm2.close()
shm1.unlink()
shm2.unlink()
if __name__ == "__main__":
main()
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句