httpx를 사용하여 코 루틴 내부의 여러 동시 HTTP 스트리밍 요청에서 읽고 , 최종 데이터를 반환하는 대신 이벤트 루프를 실행하는 비동기 함수로 데이터를 되돌리고 싶습니다.
그러나 비동기 함수를 반환하는 대신 yield로 만들면 불평을 asyncio.as_completed()
받고 loop.run_until_complete()
비동기 생성기가 아닌 코 루틴 또는 Future를 기대합니다.
그래서 이것이 작동하도록 할 수있는 유일한 방법은 각 코 루틴 내에서 스트리밍 된 모든 데이터를 수집하고 요청이 완료되면 모든 데이터를 반환하는 것입니다. 그런 다음 모든 코 루틴 결과를 수집하고 마지막으로 비동기 호출 함수로 반환합니다.
즉 , 모든 것을 메모리에 보관하고 모든 데이터를 얻기 전에 가장 느린 요청이 완료 될 때까지 기다려야하므로 HTTP 요청 스트리밍의 전체 지점을 무력화합니다.
이런 일을 할 수있는 방법이 있습니까? 내 현재 어리석은 구현은 다음과 같습니다.
def collect_data(urls):
"""Non-async function wishing it was a non-async generator"""
async def stream(async_client, url, payload):
data = []
async with async_client.stream("GET", url=url) as ar:
ar.raise_for_status()
async for line in ar.aiter_lines():
data.append(line)
# would like to yield each line here
return data
async def execute_tasks(urls):
all_data = []
async with httpx.AsyncClient() as async_client:
tasks = [stream(async_client, url) for url in urls]
for coroutine in asyncio.as_completed(tasks):
all_data += await coroutine
# would like to iterate and yield each line here
return all_events
try:
loop = asyncio.get_event_loop()
data = loop.run_until_complete(execute_tasks(urls=urls))
return data
# would like to iterate and yield the data here as it becomes available
finally:
loop.close()
편집 : 나는 asyncio.Queue
또한 trio
메모리 채널을 사용하여 몇 가지 솔루션을 시도했지만 비동기 범위의 항목에서만 읽을 수 있기 때문에 솔루션에 더 가까이 가지 않습니다.
편집 2 : 비동기 생성기에서 이것을 사용하려는 이유는 Django Rest Framework 스트리밍 API를 사용하여 Django 앱에서 사용하고 싶기 때문입니다.
일반적으로 비 collect_data
동기화하고 전체적으로 비동기 코드를 사용해야합니다. 이것이 asyncio가 사용되도록 설계된 방식입니다. 그러나 이것이 어떤 이유로 실행 가능하지 않은 경우 몇 가지 글루 코드를 적용하여 비동기 반복기를 수동으로 반복 할 수 있습니다 .
def iter_over_async(ait, loop):
ait = ait.__aiter__()
async def get_next():
try:
obj = await ait.__anext__()
return False, obj
except StopAsyncIteration:
return True, None
while True:
done, obj = loop.run_until_complete(get_next())
if done:
break
yield obj
위의 작동 방식은 __anext__
매직 메서드를 사용하여 비동기 반복기에서 값을 계속 검색 하고 객체가 도착할 때 반환 하는 비동기 클로저를 제공하는 것입니다. 이 비동기 클로저는 run_until_complete()
일반 동기화 생성기 내부의 루프에서 호출됩니다 . (클로저는 실제로 지원되지 않을 수있는를 StopAsyncIteration
통해 전파되는 것을 방지하기 위해 완료 표시기와 실제 객체 쌍을 반환합니다 run_until_complete
.)
이를 통해 execute_tasks
비동기 생성기 ( async def
with yield
)를 만들고 다음을 사용하여 반복 할 수 있습니다.
for chunk in iter_over_async(execute_tasks(urls), loop):
...
이 접근 방식은와 호환되지 asyncio.run
않으며 나중에 문제가 발생할 수 있습니다.
이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.
침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제
몇 마디 만하겠습니다