考虑以下代码(解释如下):
import asyncio
class MyClass(object):
def __init__(self):
self.a = 0
def incr(self, data):
self.a += 1
print(self.a)
class GenProtocol(asyncio.SubprocessProtocol):
def __init__(self, handler, exit_future):
self.exit_future = exit_future
self.handler = handler
def pipe_data_received(self, fd, data):
if fd == 1:
self.handler(data)
else:
print('An error occurred')
def process_exited(self):
self.exit_future.set_result(True)
def start_proc(stdout_handler, *command):
loop = asyncio.get_event_loop()
exit_f = asyncio.Future(loop=loop)
subpr = loop.subprocess_exec(lambda: GenProtocol(stdout_handler, exit_f),
*command,
stdin=None)
transport, protocol = loop.run_until_complete(subpr)
@asyncio.coroutine
def waiter(exit_future):
yield from exit_future
return waiter, exit_f
def main():
my_instance = MyClass()
loop = asyncio.get_event_loop()
waiter, exit_f = start_proc(my_instance.incr, 'bash', 'myscript.sh')
loop.run_until_complete(waiter(exit_f))
loop.close()
if __name__ == '__main__':
main()
组件的简要说明如下:
MyClass
非常简单GenProtocol
是一个类,它允许为子流程的stdout上接收的数据指定自定义处理程序。start_proc
允许您通过以下方式启动自定义过程,为在stdout上接收的数据指定自定义处理程序 GenProtocol
my_proc
是一个永远运行的过程,可以在任意时间将数据发送到管道现在,我的问题是:由于我将方法用作处理程序,并且由于该方法以非原子方式更改实例属性,因此这是否潜在危险?例如,当我在子进程的管道上异步接收数据时,该处理程序是并发调用两次(因此有可能损坏MyClass.a中的数据)还是被序列化了(即第二次调用该处理程序,直到该处理程序才执行首先完成)?
协议方法是常规功能,而不是协程。他们里面没有屈服点。
因此,执行顺序非常简单:所有调用都已序列化,没有竞争条件。
UPD
在该示例pipe_data_received()
中,不是协程,而是没有await
/yield from
内部的函数。
asyncio
始终一次执行整个过程,而无需在中间进行任何上下文切换。
您可能认为这pipe_data_received()
是受锁保护的,但实际上该案例不需要任何锁。
当您拥有这样的协同程序时,必须使用锁:
async def incr(self):
await asyncio.sleep(0.1)
self.counter +=1
在后者中incr()
是一个协程,此外,随时可以进行上下文切换sleep()
。如果要保护并行增量,可以使用asyncio.Lock()
:
def __init__(self):
self.counter = 0
self.lock = asyncio.Lock()
async def incr(self):
async with self._lock:
await asyncio.sleep(0.1)
self.counter +=1
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句