Pythonの非同期関数で呼び出されるすべてのパラメーターは何ですか?

ガネーシャ
  1. 関数がありますvarシステムが持つすべてのプロセッサ、コア、およびRAMメモリを利用して、マルチプロセッシング/並列処理によってこの関数内でループをすばやく実行するための最良の方法を知りたいです。

    import numpy as np
    from pysheds.grid import Grid
    
    xs = 82.1206, 80.8707, 80.8789, 80.8871, 80.88715
    ys = 25.2111, 16.01259, 16.01259, 16.01259, 15.9956
    
    a = r'/home/test/image1.tif'
    b = r'/home/test/image2.tif'
    
    def var(interest):
    
        variable_avg = []
        for (x,y) in zip(xs,ys):
            grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
    
            grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label') 
    
            grid.clip_to('catch')
    
            grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
    
            variablemask = grid.view('variable', nodata=np.nan)
            variablemask = np.array(variablemask)
            variablemean = np.nanmean(variablemask)
            variable_avg.append(variablemean)
        return(variable_avg)
    
    
  2. 関数varの指定された複数のパラメーターに対して、関数とループの両方を並行して実行できれば素晴らしいと思います。例:通話var(a)var(b)同時に。複数の座標(xs、ys)のみのループを並列化するよりもはるかに時間がかからないためです。

pyshedsドキュメントはここにあります

data.tifコードで使用されているデータは、ここから直接ダウンロードgrid = Grid.from_raster(r'/home/data/data.tif', data_name='map')できます同じデータを異なる名前でディレクトリにコピーし、コードの代わりに、またはコードのテストに使用できます。a = r'/home/test/image1.tif'b = r'/home/test/image2.tif'

上記のコードを高速化するために、ここ提案がありました。それは次のとおりです。

def process_poi(interest, x, y):
    grid = Grid.from_raster(interest, data_name='map')

    grid.catchment(data='map', x=x, y=y, out_name='catch')

    variable = grid.view('catch', nodata=np.nan)
    variable = np.array(variable)
    return variable.mean()

async def var_loop_async(interest, pool, loop):
    tasks = []
    for (x,y) in zip(xs,ys):
        function_call = functools.partial(process_poi, interest, x, y)
        tasks.append(loop.run_in_executor(pool, function_call))

    return await asyncio.gather(*tasks)

async def main():
    loop = asyncio.get_event_loop()
    pool_start = time.time()
    tasks = []
    with ProcessPoolExecutor() as pool:
        for _ in range(100):
            tasks.append(var_loop_async(a, pool, loop))
        results = await asyncio.gather(*tasks)
        pool_end = time.time()
        print(f'Process pool took {pool_end-pool_start}')

    serial_start = time.time() 

しかし、関数の呼び出し方がわかりませんでしたvar_loop_async(interest, pool, loop)実際、poolとの代わりにどのパラメーターを呼び出すかを取得できませんでしたloop

私はPythonプログラミングにとても慣れていません。

可能であれば、上記の提案を再現可能な解決策にして、Pythonで直接実行できるようにしてください。または、元のコードを高速化するためのより良い提案が他にある場合は、私に知らせてください。

ブーブー

まず、元のコードでは、次のように表示されます。

for (x,y) in zip(xs,ys):
    grid = Grid.from_raster(interest, data_name='map')

私はpyshedsモジュールに精通しておらず、モジュールに関するドキュメントも見つからなかったためGrid.from_raster、費用のかかる操作であるかどうかわかりません。しかし、このステートメントは、forループ内で再計算するのではなくループの上に移動するための候補であるようです。おそらく、これだけでパフォーマンスが大幅に向上します。リンク、Pythonの非同期関数で呼び出されるすべてのパラメーター何ですか?、あなたが言及したことは、プロセスプールを作成するオーバーヘッドが問題に見合うだけの十分な補償をしないかもしれないことを示唆しています。また、Grid.from_raster は高価であり、ループから削除することで利益を得ることができます。マルチプロセッシングソリューションは、本質的に、x、yペアごとに実行することで「ループに戻す」ため、パフォーマンスの向上を引き起こす可能性が低くなります。 。

とにかく、提案された手法を使用してコードを実行するには、以下を参照してください。残念ながら、プロセッサプールprocess_poiとの両方実行することはできませんvar_loop_asyncしかし、別の解決策については、以下をさらに見てください。

import numpy
from pysheds.grid import Grid
from concurrent.futures.process import ProcessPoolExecutor
import asyncio


xs = (82.1206, 72.4542, 65.0431, 83.8056, 35.6744)
ys = (25.2111, 17.9458, 13.8844, 10.0833, 24.8306)

file_list = (
    r'/home/test/image1.tif',
    r'/home/test/image2.tif'
)


def process_point(interest, x, y):
    grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
    grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
    grid.clip_to('catch')
    grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
    variablemask = grid.view('variable', nodata=np.nan)
    variablemask = numpy.array(variablemask)
    variablemean = np.nanmean(variablemask)
    return variablemean


async def var_loop_async(interest, pool, loop):
    tasks = [loop.run_in_executor(pool, process_point, interest, x, y) for (x, y) in zip(xs, ys)]
    return await asyncio.gather(*tasks)


async def main():
    loop = asyncio.get_event_loop()
    with ProcessPoolExecutor() as pool:
        tasks = [var_loop_async(file, pool, loop) for file in file_list]
        results = await asyncio.gather(*tasks)
        print(results)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()

別の解決策

var処理する各ファイルのプロセスプールで実行してから、サブプロセスで各x、yペアを処理できるようにする必要があります。つまり、ファイルを処理している各サブプロセスには、x、yペアを処理するための独自のプロセスプールが必要です。プロセスプール用に作成されたプロセスはデーモンプロセスであり(メインプロセスが終了すると自動的に終了します)、独自のサブプロセスを作成することは許可されていないため、これは通常は不可能です。これを克服するには、独自の特殊mutliprocessor.Pool化を作成し、各サブプロセスを独自のプールで初期化する必要があります。

しかし、これはパフォーマンスの向上になりますか?varサブプロセスは、基本的に待っている以外何もしないされているprocess_poi自分の仕事を完了するために、サブプロセス。したがって、これが以前のコードよりも大幅に改善されるとは思いません。そして、私が述べたように、どちらのマルチプロセッシングソリューションが元のコード、特にGrid.from_raster呼び出しを再配置するために変更されたコードよりも改善されるかどうかは明らかではありません

import numpy
from pysheds.grid import Grid
import functools
from multiprocessing.pool import Pool
import multiprocessing
import os

# This allows subprocesses to have their own pools:

class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

class NoDaemonContext(type(multiprocessing.get_context())):
    Process = NoDaemonProcess

class MyPool(multiprocessing.pool.Pool):
    def __init__(self, *args, **kwargs):
        kwargs['context'] = NoDaemonContext()
        super(MyPool, self).__init__(*args, **kwargs)


xs = 82.1206, 72.4542, 65.0431, 83.8056, 35.6744
ys = 25.2111, 17.9458, 13.8844, 10.0833, 24.8306

a = r'/home/test/image1.tif'
b = r'/home/test/image2.tif'


pool2 = None

def init_pool():
    global pool2
    #pool2 = Pool(5)
    pool2 = Pool(os.cpu_count // 2) # half the available number of processors


def process_poi(interest, x, y):
    grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
    grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
    grid.clip_to('catch')
    grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
    variablemask = grid.view('variable', nodata=np.nan)
    variablemask = numpy.array(variablemask)
    variablemean = np.nanmean(variablemask)
    return variablemean


def var(interest):
    task = functools.partial(process_poi, interest)
    return pool2.starmap(task, zip(xs, ys))


def main():
    # This will create non-daemon processes so that these processes can create their own pools:
    with MyPool(2, init_pool) as pool:
        results = pool.map(var, [a, b])
        print(results)


if __name__ == "__main__":
    main()

スレッドを使用した3番目のソリューション

使用asyncio

import numpy
from pysheds.grid import Grid
from concurrent.futures import ThreadPoolExecutor
import asyncio

xs = (82.1206, 72.4542, 65.0431, 83.8056, 35.6744)
ys = (25.2111, 17.9458, 13.8844, 10.0833, 24.8306)

file_list = [
    r'/home/test/image1.tif',
    r'/home/test/image2.tif'
]

def process_point(interest, x, y):
    grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
    grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
    grid.clip_to('catch')
    grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
    variablemask = grid.view('variable', nodata=np.nan)
    variablemask = numpy.array(variablemask)
    variablemean = np.nanmean(variablemask)
    return variablemean


async def var_loop_async(interest, pool, loop):
    tasks = [loop.run_in_executor(pool, process_point, interest, x, y) for (x, y) in zip(xs, ys)]
    return await asyncio.gather(*tasks)


async def main():
    loop = asyncio.get_event_loop()
    with ThreadPoolExecutor(max_workers=100) as pool:
        tasks = [var_loop_async(file, pool, loop) for file in file_list]
        results = await asyncio.gather(*tasks)
        print(results)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()

代替:

import numpy
from pysheds.grid import Grid
import functools
from concurrent.futures import ThreadPoolExecutor


xs = 82.1206, 72.4542, 65.0431, 83.8056, 35.6744
ys = 25.2111, 17.9458, 13.8844, 10.0833, 24.8306

a = r'/home/test/image1.tif'
b = r'/home/test/image2.tif'



def process_poi(interest, x, y):
    grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
    grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
    grid.clip_to('catch')
    grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
    variablemask = grid.view('variable', nodata=np.nan)
    variablemask = numpy.array(variablemask)
    variablemean = np.nanmean(variablemask)
    return variablemean


def var(executor, interest):
    return list(executor.map(functools.partial(process_poi, interest), xs, ys))


def main():
    with ThreadPoolExecutor(max_workers=100) as executor:
        results = list(executor.map(functools.partial(var, executor), [a, b]))
        print(results)


if __name__ == "__main__":
    main()

OPの更新されたコードに基づくスレッドを使用した更新されたソリューション

import numpy
from pysheds.grid import Grid
import functools
from concurrent.futures import ThreadPoolExecutor


xs = (82.1206, 72.4542, 65.0431, 83.8056, 35.6744)
ys = (25.2111, 17.9458, 13.8844, 10.0833, 24.8306)

file_list = (
    r'/home/test/image1.tif',
    r'/home/test/image2.tif'
)


def process_point(interest, x, y):
    grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
    grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
    grid.clip_to('catch')
    grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
    variablemask = grid.view('variable', nodata=np.nan)
    variablemask = numpy.array(variablemask)
    variablemean = np.nanmean(variablemask)
    return variablemean

def var(executor, interest):
    return list(executor.map(functools.partial(process_point, interest), xs, ys))


def main():
    with ThreadPoolExecutor(max_workers=100) as executor:
        results = list(executor.map(functools.partial(var, executor), file_list))
        print(results)


if __name__ == "__main__":
    main()

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

すべてのパラメーターなしで非同期関数を呼び出す/待つことは可能ですか?

分類Dev

関数呼び出しでパラメーターとして渡されるときの太い矢印構文の意味は何ですか?

分類Dev

オプションのパラメーターのデフォルト値は、すべての関数呼び出しで評価されますか?

分類Dev

すべてのOpenGLテクスチャ呼び出しで使用される「レベル」パラメータは正確には何ですか?

分類Dev

この関数がパラメーターとして呼び出されない変数に適用されるのはなぜですか?

分類Dev

ある非同期関数を別の非同期関数内で呼び出す最良の方法は何ですか?

分類Dev

Swiftで呼び出されるこれらのメソッドパラメータは何ですか?

分類Dev

非同期関数を同時に呼び出してすべてのコールバックを待機するにはどうすればよいですか?

分類Dev

別の関数でパラメータとして渡される関数を呼び出す方法

分類Dev

非同期メソッドC#のループ内で呼び出されるパラメーターとしてタスクを渡す

分類Dev

mmap(2)など、非同期セーフとして定義されていない関数は、シグナルハンドラーで呼び出される他の非同期セーフ関数に影響しますか?

分類Dev

関数の呼び出しで使用されていないパラメーターを渡す方法は?

分類Dev

関数Aを呼び出したときに、関数A内の関数Bにパラメーターが渡されるのはなぜですか?

分類Dev

リテラルに格納されているメンバー関数を動的に呼び出すのはTypeScriptアンチパターンですか?

分類Dev

関数呼び出しのパラメーターの前の@は、Pascalで何を意味しますか

分類Dev

$ .eachループ内の関数呼び出し、それは非同期ですか、それとも同期ですか?

分類Dev

interface {}パラメータを受け入れる関数は、参照によって呼び出される値をどのように更新できますか?

分類Dev

パラメータがintパラメータのみを持つ呼び出された関数のcharであるかどうかを検出する方法

分類Dev

呼び出されたときにパラメーターがコロンではなくコンマで分割されている場合の関数シグネチャとは何ですか?

分類Dev

なぜ非同期関数が2回呼び出されるのですか?

分類Dev

SwiftUIで非同期呼び出しクロージャを使用してパラメータを初期化するにはどうすればよいですか?

分類Dev

パラメータパック拡張のすべての基本クラスで関数を呼び出すことはできますか?

分類Dev

PythonでTkinterを使用してパラメーター化された関数を呼び出す方法は?

分類Dev

呼び出されるSessionRunHookのメンバー関数のシーケンスは何ですか?

分類Dev

別の関数のデフォルトパラメータとして非同期関数を使用するにはどうすればよいですか?

分類Dev

関数パラメーターは、呼び出される前に設定できますか?

分類Dev

パラメータとして渡された要素の関数を呼び出す方法はありますか?

分類Dev

Perlモジュールで呼び出される参照関数のドメインは何ですか?

分類Dev

Pythonの辞書によって呼び出される関数にパラメーターを渡す

Related 関連記事

  1. 1

    すべてのパラメーターなしで非同期関数を呼び出す/待つことは可能ですか?

  2. 2

    関数呼び出しでパラメーターとして渡されるときの太い矢印構文の意味は何ですか?

  3. 3

    オプションのパラメーターのデフォルト値は、すべての関数呼び出しで評価されますか?

  4. 4

    すべてのOpenGLテクスチャ呼び出しで使用される「レベル」パラメータは正確には何ですか?

  5. 5

    この関数がパラメーターとして呼び出されない変数に適用されるのはなぜですか?

  6. 6

    ある非同期関数を別の非同期関数内で呼び出す最良の方法は何ですか?

  7. 7

    Swiftで呼び出されるこれらのメソッドパラメータは何ですか?

  8. 8

    非同期関数を同時に呼び出してすべてのコールバックを待機するにはどうすればよいですか?

  9. 9

    別の関数でパラメータとして渡される関数を呼び出す方法

  10. 10

    非同期メソッドC#のループ内で呼び出されるパラメーターとしてタスクを渡す

  11. 11

    mmap(2)など、非同期セーフとして定義されていない関数は、シグナルハンドラーで呼び出される他の非同期セーフ関数に影響しますか?

  12. 12

    関数の呼び出しで使用されていないパラメーターを渡す方法は?

  13. 13

    関数Aを呼び出したときに、関数A内の関数Bにパラメーターが渡されるのはなぜですか?

  14. 14

    リテラルに格納されているメンバー関数を動的に呼び出すのはTypeScriptアンチパターンですか?

  15. 15

    関数呼び出しのパラメーターの前の@は、Pascalで何を意味しますか

  16. 16

    $ .eachループ内の関数呼び出し、それは非同期ですか、それとも同期ですか?

  17. 17

    interface {}パラメータを受け入れる関数は、参照によって呼び出される値をどのように更新できますか?

  18. 18

    パラメータがintパラメータのみを持つ呼び出された関数のcharであるかどうかを検出する方法

  19. 19

    呼び出されたときにパラメーターがコロンではなくコンマで分割されている場合の関数シグネチャとは何ですか?

  20. 20

    なぜ非同期関数が2回呼び出されるのですか?

  21. 21

    SwiftUIで非同期呼び出しクロージャを使用してパラメータを初期化するにはどうすればよいですか?

  22. 22

    パラメータパック拡張のすべての基本クラスで関数を呼び出すことはできますか?

  23. 23

    PythonでTkinterを使用してパラメーター化された関数を呼び出す方法は?

  24. 24

    呼び出されるSessionRunHookのメンバー関数のシーケンスは何ですか?

  25. 25

    別の関数のデフォルトパラメータとして非同期関数を使用するにはどうすればよいですか?

  26. 26

    関数パラメーターは、呼び出される前に設定できますか?

  27. 27

    パラメータとして渡された要素の関数を呼び出す方法はありますか?

  28. 28

    Perlモジュールで呼び出される参照関数のドメインは何ですか?

  29. 29

    Pythonの辞書によって呼び出される関数にパラメーターを渡す

ホットタグ

アーカイブ