FastAPI에서 다중 처리를 수행하는 방법

울음 소파

FastAPI 요청을 처리하는 동안 목록의 모든 요소에 대해 수행 할 CPU 바인딩 작업이 있습니다. 여러 CPU 코어에서이 처리를 수행하고 싶습니다.

FastAPI 내에서이를 수행하는 적절한 방법은 무엇입니까? 표준 multiprocessing모듈을 사용할 수 있습니까 ? 지금까지 찾은 모든 자습서 / 질문은 웹 요청과 같은 I / O 바인딩 된 작업 만 다룹니다.

alex_noname

async def 끝점

ProcessPoolExecutor 와 함께 loop.run_in_executor사용 하여 별도의 프로세스에서 함수를 시작할 수 있습니다 .

@app.post("/async-endpoint")
async def test_endpoint():
    loop = asyncio.get_event_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_bound_func)  # wait result

def 끝점

def엔드 포인트는 별도의 스레드에서 암시 적 으로 실행 되므로 모듈 다중 처리동시 처리 의 모든 기능을 사용할 수 있습니다 . 내부 def기능 await은 사용할 수 없습니다. 견본:

@app.post("/def-endpoint")
def test_endpoint():
    ...
    with multiprocessing.Pool(3) as p:
        result = p.map(f, [1, 2, 3])
@app.post("/def-endpoint/")
def test_endpoint():
    ...
    with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
      results = executor.map(f, [1, 2, 3])

참고 : 엔드 포인트에서 프로세스 풀을 생성하고 많은 수의 스레드를 생성하면 요청 수가 증가함에 따라 응답 속도가 느려질 수 있다는 점을 기억해야합니다.


즉석에서 실행

별도의 프로세스에서 함수를 실행하고 즉시 결과를 기다리는 가장 쉽고 기본적인 방법은 ProcessPoolExecutor 와 함께 loop.run_in_executor 를 사용하는 입니다 .

아래 예에서와 같이 풀은 응용 프로그램이 시작될 때 생성 될 수 있으며 응용 프로그램 종료시 종료하는 것을 잊지 마십시오. 풀에서 사용되는 프로세스 수는 max_workers ProcessPoolExecutor 생성자 매개 변수를 사용하여 설정할 수 있습니다 . 하면 max_workers된다 None또는 제공하지, 그것은 기계의 프로세서 수에 기본 설정됩니다.

이 접근 방식의 단점은 요청 처리기 (경로 작업)가 별도의 프로세스에서 계산이 완료 될 때까지 대기하는 반면 클라이언트 연결은 열려 있다는 것입니다. 그리고 어떤 이유로 연결이 끊어지면 결과를 반환 할 곳이 없습니다.

import asyncio
from concurrent.futures.process import ProcessPoolExecutor
from fastapi import FastAPI

from calc import cpu_bound_func

app = FastAPI()


async def run_in_process(fn, *args):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(app.state.executor, fn, *args)  # wait and return result


@app.get("/{param}")
async def handler(param: int):
    res = await run_in_process(cpu_bound_func, param)
    return {"result": res}


@app.on_event("startup")
async def on_startup():
    app.state.executor = ProcessPoolExecutor()


@app.on_event("shutdown")
async def on_shutdown():
    app.state.executor.shutdown()

배경으로 이동

일반적으로 CPU 바운드 작업은 백그라운드에서 실행됩니다. FastAPI는 실행 할 수있는 기능 제공 백그라운드 작업이 실행되도록 한 후 시작하고 비동기 CPU 바인딩 작업의 결과를 기다릴 수있는 응답, 내부를 반환.

예를 들어이 경우 "Accepted"(HTTP 코드 202) 의 응답 과 고유 한 작업을 즉시 반환 ID하고 백그라운드에서 계산을 계속할 수 있으며 클라이언트는 나중에 this를 사용하여 작업의 상태를 요청할 수 있습니다 ID.

BackgroundTasks특히 몇 가지 기능을 제공합니다 (종속성 포함). 그리고 그들에서 당신은 모든 작업이 완료되었을 때만 정리되는 종속성에서 얻은 리소스를 사용할 수 있으며 예외의 경우 올바르게 처리 할 수 ​​있습니다. 이것은이 다이어그램 에서 더 명확하게 볼 수 있습니다 .

다음은 최소한의 작업 추적을 수행하는 예입니다. 실행중인 응용 프로그램의 한 인스턴스가 가정됩니다.

import asyncio
from concurrent.futures.process import ProcessPoolExecutor
from http import HTTPStatus

from fastapi import BackgroundTasks
from typing import Dict
from uuid import UUID, uuid4
from fastapi import FastAPI
from pydantic import BaseModel, Field

from calc import cpu_bound_func


class Job(BaseModel):
    uid: UUID = Field(default_factory=uuid4)
    status: str = "in_progress"
    result: int = None


app = FastAPI()
jobs: Dict[UUID, Job] = {}


async def run_in_process(fn, *args):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(app.state.executor, fn, *args)  # wait and return result


async def start_cpu_bound_task(uid: UUID, param: int) -> None:
    jobs[uid].result = await run_in_process(cpu_bound_func, param)
    jobs[uid].status = "complete"


@app.post("/new_cpu_bound_task/{param}", status_code=HTTPStatus.ACCEPTED)
async def task_handler(param: int, background_tasks: BackgroundTasks):
    new_task = Job()
    jobs[new_task.uid] = new_task
    background_tasks.add_task(start_cpu_bound_task, new_task.uid, param)
    return new_task


@app.get("/status/{uid}")
async def status_handler(uid: UUID):
    return jobs[uid]


@app.on_event("startup")
async def startup_event():
    app.state.executor = ProcessPoolExecutor()


@app.on_event("shutdown")
async def on_shutdown():
    app.state.executor.shutdown()

더 강력한 솔루션

위의 모든 예는 아주 간단했다,하지만 당신은 분산 컴퓨팅 무거운에 대한 좀 더 강력한 시스템을 필요로하는 경우에, 당신은 메시지 브로커 따로 볼 수 RabbitMQ, Kafka, NATS셀러리 같은 themthem을 사용하는 등 및 라이브러리를.

이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.

침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제

에서 수정
0

몇 마디 만하겠습니다

0리뷰
로그인참여 후 검토

관련 기사

분류에서Dev

C #에서 다중 스레딩을 사용하여 일괄 처리를 수행하는 방법

분류에서Dev

for 루프에서 다중 처리를 사용하는 방법-Python

분류에서Dev

__main__.py에서 python3 다중 처리를 처리하는 방법

분류에서Dev

대용량 파일 Python에서 다중 처리를 수행하는 가장 좋은 방법

분류에서Dev

다중 처리 함수에서 인수를 전달하는 방법과 다중 처리 목록을 사용하는 방법은 무엇입니까?

분류에서Dev

클래스 내에서 Python에서 다중 처리를 사용하는 방법

분류에서Dev

AR에서 다중 조인으로 쿼리를 수행하는 방법

분류에서Dev

Django에서 다중 파일 입력 필드를 처리하는 방법

분류에서Dev

Python에서 다중 처리를 구현하는 방법은 무엇입니까?

분류에서Dev

다중 다항 회귀에서 비가역 행렬을 처리하는 방법

분류에서Dev

Mongoose에서 역 중첩 쿼리를 수행하는 방법

분류에서Dev

asyncio (병렬 / 다중 처리)를 사용하여 Python에서 수학 연산을 수행하는 방법은 무엇입니까?

분류에서Dev

FastAPI에서 파일 다중 파일 업로드 필드를 선택 필드로 설정하는 방법

분류에서Dev

다음 중첩 루프에 대해 python3.x에서 다중 처리를 적용하는 방법

분류에서Dev

Ruby에서 Faker의 중복 키를 처리하는 방법

분류에서Dev

Android에서 중첩 된 JSON 개체를 처리하는 방법

분류에서Dev

Python 다중 처리에서 특정 키워드 인수를 지정하는 방법

분류에서Dev

"WPF : 다중 XAML UI를 처리하는 방법"에 대해 논의하십시오. 발행물

분류에서Dev

슬라이딩 윈도우에 다중 처리를 적용하는 방법

분류에서Dev

스레딩 / 다중 처리를 구현하는 방법에 대해 혼동

분류에서Dev

Azure Service Bus에 대한 다중 액세스를 처리하는 방법

분류에서Dev

Swagger 2.0에서 다중 400 응답을 처리하는 방법

분류에서Dev

함수를 다중 처리하는 방법

분류에서Dev

Spring에서 다중 파일 업로드를 수행하는 방법-Java Config

분류에서Dev

재귀를 수행하는 for 루프에서 promise를 처리하는 방법

분류에서Dev

Laravel 5.1에서 Eloquent를 사용하여 원시 쿼리로 다중 선택을 수행하는 방법

분류에서Dev

형식에서 다음 관계를 처리하는 방법

분류에서Dev

다중 처리에서 Pool을 사용할 때 함수에서 프로세스 번호를 얻는 방법

분류에서Dev

Excel에서 다른 쿼리를 수행하는 경우 다중 수준을 수행하는 방법은 무엇입니까?

Related 관련 기사

  1. 1

    C #에서 다중 스레딩을 사용하여 일괄 처리를 수행하는 방법

  2. 2

    for 루프에서 다중 처리를 사용하는 방법-Python

  3. 3

    __main__.py에서 python3 다중 처리를 처리하는 방법

  4. 4

    대용량 파일 Python에서 다중 처리를 수행하는 가장 좋은 방법

  5. 5

    다중 처리 함수에서 인수를 전달하는 방법과 다중 처리 목록을 사용하는 방법은 무엇입니까?

  6. 6

    클래스 내에서 Python에서 다중 처리를 사용하는 방법

  7. 7

    AR에서 다중 조인으로 쿼리를 수행하는 방법

  8. 8

    Django에서 다중 파일 입력 필드를 처리하는 방법

  9. 9

    Python에서 다중 처리를 구현하는 방법은 무엇입니까?

  10. 10

    다중 다항 회귀에서 비가역 행렬을 처리하는 방법

  11. 11

    Mongoose에서 역 중첩 쿼리를 수행하는 방법

  12. 12

    asyncio (병렬 / 다중 처리)를 사용하여 Python에서 수학 연산을 수행하는 방법은 무엇입니까?

  13. 13

    FastAPI에서 파일 다중 파일 업로드 필드를 선택 필드로 설정하는 방법

  14. 14

    다음 중첩 루프에 대해 python3.x에서 다중 처리를 적용하는 방법

  15. 15

    Ruby에서 Faker의 중복 키를 처리하는 방법

  16. 16

    Android에서 중첩 된 JSON 개체를 처리하는 방법

  17. 17

    Python 다중 처리에서 특정 키워드 인수를 지정하는 방법

  18. 18

    "WPF : 다중 XAML UI를 처리하는 방법"에 대해 논의하십시오. 발행물

  19. 19

    슬라이딩 윈도우에 다중 처리를 적용하는 방법

  20. 20

    스레딩 / 다중 처리를 구현하는 방법에 대해 혼동

  21. 21

    Azure Service Bus에 대한 다중 액세스를 처리하는 방법

  22. 22

    Swagger 2.0에서 다중 400 응답을 처리하는 방법

  23. 23

    함수를 다중 처리하는 방법

  24. 24

    Spring에서 다중 파일 업로드를 수행하는 방법-Java Config

  25. 25

    재귀를 수행하는 for 루프에서 promise를 처리하는 방법

  26. 26

    Laravel 5.1에서 Eloquent를 사용하여 원시 쿼리로 다중 선택을 수행하는 방법

  27. 27

    형식에서 다음 관계를 처리하는 방법

  28. 28

    다중 처리에서 Pool을 사용할 때 함수에서 프로세스 번호를 얻는 방법

  29. 29

    Excel에서 다른 쿼리를 수행하는 경우 다중 수준을 수행하는 방법은 무엇입니까?

뜨겁다태그

보관