関数がありますvar
。システムが持つすべてのプロセッサ、コア、およびRAMメモリを利用して、マルチプロセッシング/並列処理によってこの関数内でループをすばやく実行するための最良の方法を知りたいです。
import numpy as np
from pysheds.grid import Grid
xs = 82.1206, 80.8707, 80.8789, 80.8871, 80.88715
ys = 25.2111, 16.01259, 16.01259, 16.01259, 15.9956
a = r'/home/test/image1.tif'
b = r'/home/test/image2.tif'
def var(interest):
variable_avg = []
for (x,y) in zip(xs,ys):
grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
grid.clip_to('catch')
grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
variablemask = grid.view('variable', nodata=np.nan)
variablemask = np.array(variablemask)
variablemean = np.nanmean(variablemask)
variable_avg.append(variablemean)
return(variable_avg)
関数var
の指定された複数のパラメーターに対して、関数とループの両方を並行して実行できれば素晴らしいと思います。例:通話var(a)
とvar(b)
同時に。複数の座標(xs、ys)のみのループを並列化するよりもはるかに時間がかからないためです。
pysheds
ドキュメントはここにあります。
のdata.tif
コードで使用されているデータは、ここから直接ダウンロードgrid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
できます。同じデータを異なる名前でディレクトリにコピーし、コードの代わりに、またはコードのテストに使用できます。a = r'/home/test/image1.tif'
b = r'/home/test/image2.tif'
上記のコードを高速化するために、ここで提案がありました。それは次のとおりです。
def process_poi(interest, x, y):
grid = Grid.from_raster(interest, data_name='map')
grid.catchment(data='map', x=x, y=y, out_name='catch')
variable = grid.view('catch', nodata=np.nan)
variable = np.array(variable)
return variable.mean()
async def var_loop_async(interest, pool, loop):
tasks = []
for (x,y) in zip(xs,ys):
function_call = functools.partial(process_poi, interest, x, y)
tasks.append(loop.run_in_executor(pool, function_call))
return await asyncio.gather(*tasks)
async def main():
loop = asyncio.get_event_loop()
pool_start = time.time()
tasks = []
with ProcessPoolExecutor() as pool:
for _ in range(100):
tasks.append(var_loop_async(a, pool, loop))
results = await asyncio.gather(*tasks)
pool_end = time.time()
print(f'Process pool took {pool_end-pool_start}')
serial_start = time.time()
しかし、関数の呼び出し方がわかりませんでしたvar_loop_async(interest, pool, loop)
。実際、pool
との代わりにどのパラメーターを呼び出すかを取得できませんでしたloop
。
私はPythonプログラミングにとても慣れていません。
可能であれば、上記の提案を再現可能な解決策にして、Pythonで直接実行できるようにしてください。または、元のコードを高速化するためのより良い提案が他にある場合は、私に知らせてください。
まず、元のコードでは、次のように表示されます。
for (x,y) in zip(xs,ys):
grid = Grid.from_raster(interest, data_name='map')
私はpysheds
モジュールに精通しておらず、モジュールに関するドキュメントも見つからなかったためGrid.from_raster
、費用のかかる操作であるかどうかわかりません。しかし、このステートメントは、for
ループ内で再計算するのではなく、ループの上に移動するための候補であるようです。おそらく、これだけでパフォーマンスが大幅に向上します。リンク、Pythonの非同期関数で呼び出されるすべてのパラメーターは何ですか?、あなたが言及したことは、プロセスプールを作成するオーバーヘッドが問題に見合うだけの十分な補償をしないかもしれないことを示唆しています。また、Grid.from_raster
は高価であり、ループから削除することで利益を得ることができます。マルチプロセッシングソリューションは、本質的に、x、yペアごとに実行することで「ループに戻す」ため、パフォーマンスの向上を引き起こす可能性が低くなります。 。
とにかく、提案された手法を使用してコードを実行するには、以下を参照してください。残念ながら、プロセッサプールprocess_poi
との両方を実行することはできませんvar_loop_async
。しかし、別の解決策については、以下をさらに見てください。
import numpy
from pysheds.grid import Grid
from concurrent.futures.process import ProcessPoolExecutor
import asyncio
xs = (82.1206, 72.4542, 65.0431, 83.8056, 35.6744)
ys = (25.2111, 17.9458, 13.8844, 10.0833, 24.8306)
file_list = (
r'/home/test/image1.tif',
r'/home/test/image2.tif'
)
def process_point(interest, x, y):
grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
grid.clip_to('catch')
grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
variablemask = grid.view('variable', nodata=np.nan)
variablemask = numpy.array(variablemask)
variablemean = np.nanmean(variablemask)
return variablemean
async def var_loop_async(interest, pool, loop):
tasks = [loop.run_in_executor(pool, process_point, interest, x, y) for (x, y) in zip(xs, ys)]
return await asyncio.gather(*tasks)
async def main():
loop = asyncio.get_event_loop()
with ProcessPoolExecutor() as pool:
tasks = [var_loop_async(file, pool, loop) for file in file_list]
results = await asyncio.gather(*tasks)
print(results)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
別の解決策
var
処理する各ファイルのプロセスプールで実行してから、サブプロセスで各x、yペアを処理できるようにする必要があります。つまり、ファイルを処理している各サブプロセスには、x、yペアを処理するための独自のプロセスプールが必要です。プロセスプール用に作成されたプロセスはデーモンプロセスであり(メインプロセスが終了すると自動的に終了します)、独自のサブプロセスを作成することは許可されていないため、これは通常は不可能です。これを克服するには、独自の特殊mutliprocessor.Pool
化を作成し、各サブプロセスを独自のプールで初期化する必要があります。
しかし、これはパフォーマンスの向上になりますか?var
サブプロセスは、基本的に待っている以外何もしないされているprocess_poi
自分の仕事を完了するために、サブプロセス。したがって、これが以前のコードよりも大幅に改善されるとは思いません。そして、私が述べたように、どちらのマルチプロセッシングソリューションが元のコード、特にGrid.from_raster
呼び出しを再配置するために変更されたコードよりも改善されるかどうかは明らかではありません。
import numpy
from pysheds.grid import Grid
import functools
from multiprocessing.pool import Pool
import multiprocessing
import os
# This allows subprocesses to have their own pools:
class NoDaemonProcess(multiprocessing.Process):
# make 'daemon' attribute always return False
def _get_daemon(self):
return False
def _set_daemon(self, value):
pass
daemon = property(_get_daemon, _set_daemon)
class NoDaemonContext(type(multiprocessing.get_context())):
Process = NoDaemonProcess
class MyPool(multiprocessing.pool.Pool):
def __init__(self, *args, **kwargs):
kwargs['context'] = NoDaemonContext()
super(MyPool, self).__init__(*args, **kwargs)
xs = 82.1206, 72.4542, 65.0431, 83.8056, 35.6744
ys = 25.2111, 17.9458, 13.8844, 10.0833, 24.8306
a = r'/home/test/image1.tif'
b = r'/home/test/image2.tif'
pool2 = None
def init_pool():
global pool2
#pool2 = Pool(5)
pool2 = Pool(os.cpu_count // 2) # half the available number of processors
def process_poi(interest, x, y):
grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
grid.clip_to('catch')
grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
variablemask = grid.view('variable', nodata=np.nan)
variablemask = numpy.array(variablemask)
variablemean = np.nanmean(variablemask)
return variablemean
def var(interest):
task = functools.partial(process_poi, interest)
return pool2.starmap(task, zip(xs, ys))
def main():
# This will create non-daemon processes so that these processes can create their own pools:
with MyPool(2, init_pool) as pool:
results = pool.map(var, [a, b])
print(results)
if __name__ == "__main__":
main()
スレッドを使用した3番目のソリューション
使用asyncio
:
import numpy
from pysheds.grid import Grid
from concurrent.futures import ThreadPoolExecutor
import asyncio
xs = (82.1206, 72.4542, 65.0431, 83.8056, 35.6744)
ys = (25.2111, 17.9458, 13.8844, 10.0833, 24.8306)
file_list = [
r'/home/test/image1.tif',
r'/home/test/image2.tif'
]
def process_point(interest, x, y):
grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
grid.clip_to('catch')
grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
variablemask = grid.view('variable', nodata=np.nan)
variablemask = numpy.array(variablemask)
variablemean = np.nanmean(variablemask)
return variablemean
async def var_loop_async(interest, pool, loop):
tasks = [loop.run_in_executor(pool, process_point, interest, x, y) for (x, y) in zip(xs, ys)]
return await asyncio.gather(*tasks)
async def main():
loop = asyncio.get_event_loop()
with ThreadPoolExecutor(max_workers=100) as pool:
tasks = [var_loop_async(file, pool, loop) for file in file_list]
results = await asyncio.gather(*tasks)
print(results)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
代替:
import numpy
from pysheds.grid import Grid
import functools
from concurrent.futures import ThreadPoolExecutor
xs = 82.1206, 72.4542, 65.0431, 83.8056, 35.6744
ys = 25.2111, 17.9458, 13.8844, 10.0833, 24.8306
a = r'/home/test/image1.tif'
b = r'/home/test/image2.tif'
def process_poi(interest, x, y):
grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
grid.clip_to('catch')
grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
variablemask = grid.view('variable', nodata=np.nan)
variablemask = numpy.array(variablemask)
variablemean = np.nanmean(variablemask)
return variablemean
def var(executor, interest):
return list(executor.map(functools.partial(process_poi, interest), xs, ys))
def main():
with ThreadPoolExecutor(max_workers=100) as executor:
results = list(executor.map(functools.partial(var, executor), [a, b]))
print(results)
if __name__ == "__main__":
main()
OPの更新されたコードに基づくスレッドを使用した更新されたソリューション
import numpy
from pysheds.grid import Grid
import functools
from concurrent.futures import ThreadPoolExecutor
xs = (82.1206, 72.4542, 65.0431, 83.8056, 35.6744)
ys = (25.2111, 17.9458, 13.8844, 10.0833, 24.8306)
file_list = (
r'/home/test/image1.tif',
r'/home/test/image2.tif'
)
def process_point(interest, x, y):
grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
grid.clip_to('catch')
grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
variablemask = grid.view('variable', nodata=np.nan)
variablemask = numpy.array(variablemask)
variablemean = np.nanmean(variablemask)
return variablemean
def var(executor, interest):
return list(executor.map(functools.partial(process_point, interest), xs, ys))
def main():
with ThreadPoolExecutor(max_workers=100) as executor:
results = list(executor.map(functools.partial(var, executor), file_list))
print(results)
if __name__ == "__main__":
main()
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加