共有データリソースにアクセスできるマルチプロセッシングアプリケーションを実装しようとしています。共有リソースに安全にアクセスできるように、ロックメカニズムを使用しています。しかし、私はエラーを打っています。驚いたことに、プロセス1が最初にロックを取得しようとすると、要求を処理し、ロックを取得しようとしている次のプロセスで失敗しますが、1以外のプロセスが最初にロックを取得しようとすると、最初の実行で失敗します。私はPythonを初めて使用し、ドキュメントを使用してこれを実装しているので、ここで基本的な安全メカニズムが欠落しているかどうかはわかりません。これを目撃している理由としてのデータポイントは非常に役立ちます。
プログラム:
#!/usr/bin/python
from multiprocessing import Process, Manager, Lock
import os
import Queue
import time
lock = Lock()
def launch_worker(d,l,index):
global lock
lock.acquire()
d[index] = "new"
print "in process"+str(index)
print d
lock.release()
return None
def dispatcher():
i=1
d={}
mp = Manager()
d = mp.dict()
d[1] = "a"
d[2] = "b"
d[3] = "c"
d[4] = "d"
d[5] = "e"
l = mp.list(range(10))
for i in range(4):
p = Process(target=launch_worker, args=(d,l,i))
i = i+1
p.start()
return None
if __name__ == '__main__':
dispatcher()
プロセス1が最初にサービスされるときのエラー
in process0
{0: 'new', 1: 'a', 2: 'b', 3: 'c', 4: 'd', 5: 'e'}
Process Process-3:
Traceback (most recent call last):
File "/usr/lib/python2.6/multiprocessing/process.py", line 232, in _bootstrap
self.run()
File "/usr/lib/python2.6/multiprocessing/process.py", line 88, in run
self._target(*self._args, **self._kwargs)
File "dispatcher.py", line 10, in launch_worker
d[index] = "new"
File "<string>", line 2, in __setitem__
File "/usr/lib/python2.6/multiprocessing/managers.py", line 722, in _callmethod
self._connect()
File "/usr/lib/python2.6/multiprocessing/managers.py", line 709, in _connect
conn = self._Client(self._token.address, authkey=self._authkey)
File "/usr/lib/python2.6/multiprocessing/connection.py", line 143, in Client
c = SocketClient(address)
File "/usr/lib/python2.6/multiprocessing/connection.py", line 263, in SocketClient
s.connect(address)
File "<string>", line 1, in connect
error: [Errno 2] No such file or directory
プロセス2が最初にサービスされるときのエラー
Process Process-2:
Traceback (most recent call last):
File "/usr/lib/python2.6/multiprocessing/process.py", line 232, in _bootstrap
self.run()
File "/usr/lib/python2.6/multiprocessing/process.py", line 88, in run
self._target(*self._args, **self._kwargs)
File "dispatcher.py", line 10, in launch_worker
d[index] = "new"
File "<string>", line 2, in __setitem__
File "/usr/lib/python2.6/multiprocessing/managers.py", line 722, in _callmethod
self._connect()
File "/usr/lib/python2.6/multiprocessing/managers.py", line 709, in _connect
conn = self._Client(self._token.address, authkey=self._authkey)
File "/usr/lib/python2.6/multiprocessing/connection.py", line 150, in Client
deliver_challenge(c, authkey)
File "/usr/lib/python2.6/multiprocessing/connection.py", line 373, in deliver_challenge
response = connection.recv_bytes(256) # reject large message
IOError: [Errno 104] Connection reset by peer
ワーカーが変更するdictは、ディスパッチングプロセスによって管理される共有オブジェクトです。ワーカーがそのオブジェクトを変更するには、ディスパッチングプロセスと通信する必要があります。表示されるエラーは、ディスパッチャがワーカープロセスを起動した後、それらを待機していないという事実に起因します。終了が早すぎるため、必要なときに通信するために存在しない可能性があります。
共有dictを更新しようとする最初の1つまたは2つのワーカーは成功するManager
可能性があります。これは、共有dictを変更しても、インスタンスを含むプロセスがまだ存在している可能性があるためです(たとえば、さらにワーカーを作成している途中である可能性があります)。したがって、あなたの例では、いくつかの成功した出力が見られます。ただし、管理プロセスはすぐに終了し、変更を試みる次のワーカーは失敗します。(表示されるエラーメッセージは、プロセス間通信の試行が失敗した場合の典型的なものです。EOF
プログラムをさらに数回実行すると、エラーも表示される可能性があります。)
あなたがする必要があるのはjoin
、Process
オブジェクトのそれぞれが終了するのを待つ方法として、オブジェクトのメソッドを呼び出すことです。次の変更dispatcher
は、基本的な考え方を示しています。
def dispatcher():
mp = Manager()
d = mp.dict()
d[1] = "a"
d[2] = "b"
d[3] = "c"
d[4] = "d"
d[5] = "e"
procs = []
for i in range(4):
p = Process(target=launch_worker, args=(d,i))
procs.append(p)
p.start()
for p in procs:
p.join()
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加