私はasyncioを使用して、行ごとに解析し、表示された出力に基づいてさまざまなことを行う出力を出力するサブプロセスを実行しています。このプロセスにタイムアウトを設定したいのですが、プロセス全体の存続期間中はグローバルタイムアウトであってはなりません。代わりに、プロセスから特定の出力が表示された場合は常に、タイムアウトをリセットして最初からやり直したいと思います。どうすればこれを実装できますか?
グローバルタイムアウトの場合、これは機能していて簡単asyncio.wait_for(_foo(), timeout)
です。単にを呼び出します。しかし、タイムアウトをリセットしてもこれを機能させることはできません。これが私がこれまでに持っているものです:
# here invocation is my own data structure with some bookkeeping information in it
# (such as the start time from which I want to base my timeout decisions on).
# and process is the value returned by await asyncio.create_subprocess_exec(...)
# _run_one_invocation() is my own function which is just a stdout readline loop
# and some dispatching.
# Make a Task out of the co-routine so that we decide when it gets cancelled, not Python.
run_task = asyncio.Task(_run_one_invocation(invocation, process))
while True:
try:
# Use asyncio.shield so it doesn't get cancelled if the timeout expires
await asyncio.shield(asyncio.wait_for(run_task, 1))
# If the await returns without raising an exception, we got to EOF and we're done.
break
except asyncio.TimeoutError:
# If it's been too long since last reset, this is a "real" timeout.
duration = time.time() - invocation.start_time
if duration > timeout:
run_task.cancel()
raise
私はこれを実行すると、呼び出し、if文がrun_task.cancel()
されていない入力されている、と私はループの先頭に戻り、呼び出しにもかかわらずときasyncio.wait_for()
再びそれはすぐに上げますasyncio.CancelledError
。
私は何が間違っているのですか?
問題を修正し、コードを単純化するには、完全に回避してwait_for()
(したがってshield()
)、wait(return_when=FIRST_COMPLETED)
必要な種類のタイムアウトを実装するために使用します。
run_task = asyncio.create_task(_run_one_invocation(invocation, process))
while True:
await asyncio.wait([run_task], timeout=1)
if run_task.done():
break
if time.time() - invocation.start_time > timeout:
run_task.cancel()
raise asyncio.TimeoutErrror()
このアプローチの欠点は、1秒のウェイクアップが導入され、タスクが何時間も休止している場合でも、プログラム(およびその結果としてコンピューター)がスリープ状態になるのを防ぐことです。サーバーではおそらく大したことではありませんが、そのような方法はラップトップのバッテリーの消耗につながるため、回避することをお勧めします。また、1秒のスリープでは、タイムアウトの変化に反応するために最大1秒の遅延が発生します。
これを解決するには、タイムアウトを変更するコードによって発生するイベントを作成し、タイムアウトとタスクの完了に加えて、そのイベントに反応します。
timeout_changed = asyncio.Event()
# pass timeout_changed where needed, and have the code that changes
# the timeout also call timeout_changed.set()
run_task = asyncio.create_task(_run_one_invocation(invocation, process))
while True:
remaining = timeout - (time.time() - invocation.start_time)
timeout_changed_task = asyncio.ensure_future(timeout_changed.wait())
await asyncio.wait([run_task, timeout_changed_task],
return_when=asyncio.FIRST_COMPLETED, timeout=remaining)
timeout_changed_task.cancel()
# either: 1) the task has completed, 2) the previous timeout has
# expired, or 3) the timeout has changed
if run_task.done():
break # 1
if time.time() - invocation.start_time > timeout:
# 2 or 2+3
run_task.cancel()
raise asyncio.TimeoutErrror()
# 3 - continue waiting with the new timeout
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加