Celery 및 Gino를 사용하여 개체 저장

알렉산더 스코 핀 세프

다음 파이프 라인이 있습니다.

이벤트 모델 (Gino 'db'개체 기반) :

class Event(db.Model):
    __tablename__ = "events"

    id = db.Column(db.BigInteger(), primary_key=True)
    request_timestamp = db.Column(db.DateTime(), nullable=False)
    service = db.Column(db.String(), nullable=False)
    url = db.Column(db.String(), nullable=False)
    status_code = db.Column(db.Integer(), nullable=False)
    response_time = db.Column(db.DateTime(), nullable=False)

FastApi 앱에는 Event 개체의 데이터를 전달하기 위해 셀러리 작업을 호출하는 POST보기가 있습니다.

@router.post("/events/add")
async def add_event(event: EventModel):
    event_data = {'request_timestamp': event.request_timestamp.replace(tzinfo=None),
                  'service': event.service,
                  'url': event.url,
                  'status_code': event.status_code,
                  'response_time': event.response_time.replace(tzinfo=None)
                  }

    task = celery_app.send_task("monitoring_service.src.monitoring_service.worker.celery_worker.add_to_db",
                                kwargs=event_data)

    return JSONResponse(content="Event recorded successfully", status_code=200)

그리고 데이터베이스에 Event 객체를 저장해야하는 셀러리 작업자 :

@celery_app.task(acks_late=True)
async def add_to_db(request_timestamp, service, url, status_code, response_time):
    event = await Event.create(
        request_timestamp=datetime.strptime(request_timestamp, '%Y-%m-%dT%H:%M:%S.%f'),
        service=service,
        url=url,
        status_code=status_code,
        response_time=datetime.strptime(response_time, '%Y-%m-%dT%H:%M:%S.%f'),
    )

    return {"status": True}

이제 개체 저장 프로세스 중에 오류가 발생합니다.

Traceback (most recent call last):

File "/usr/local/lib/python3.8/site-packages/celery/app/trace.py", line 479, in trace_task

mark_as_done(

File "/usr/local/lib/python3.8/site-packages/celery/backends/base.py", line 158, in mark_as_done

self.store_result(task_id, result, state, request=request)

File "/usr/local/lib/python3.8/site-packages/celery/backends/base.py", line 442, in store_result

self._store_result(task_id, result, state, traceback,

File "/usr/local/lib/python3.8/site-packages/celery/backends/database/__init__.py", line 51, in _inner

return fun(*args, **kwargs)

File "/usr/local/lib/python3.8/site-packages/celery/backends/database/__init__.py", line 130, in _store_result

session.commit()

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 1042, in commit

self.transaction.commit()

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 504, in commit

self._prepare_impl()

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 483, in _prepare_impl

self.session.flush()

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2523, in flush

self._flush(objects)

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2664, in _flush

transaction.rollback(_capture_exception=True)

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__

compat.raise_(

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 178, in raise_

raise exception

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2624, in _flush

flush_context.execute()

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute

rec.execute(self)

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py", line 586, in execute

persistence.save_obj(

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 230, in save_obj

_emit_update_statements(

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 994, in _emit_update_statements

c = cached_connections[connection].execute(

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1014, in execute

return meth(self, multiparams, params)

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection

return connection._execute_clauseelement(self, multiparams, params)

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1127, in _execute_clauseelement

ret = self._execute_context(

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1207, in _execute_context

self._handle_dbapi_exception(

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1511, in _handle_dbapi_exception

util.raise_(

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 178, in raise_

raise exception

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1205, in _execute_context

context = constructor(dialect, self, conn, *args)

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 858, in _init_compiled

param = dict(

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 861, in <genexpr>

processors[key](compiled_params[key])

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/sqltypes.py", line 1689, in process

value = dumps(value, protocol)

sqlalchemy.exc.SQLAlchemyError: (builtins.TypeError) cannot pickle 'coroutine' object


warn(RuntimeWarning(

[2021-02-19 10:19:39,010: ERROR/ForkPoolWorker-1] Task monitoring_service.src.monitoring_service.worker.celery_worker.add_to_db[994ae754-f216-4fa9-b502-5fbb0221011c] raised unexpected: SQLAlchemyError("(builtins.TypeError) cannot pickle 'coroutine' object")

Traceback (most recent call last):

File "/usr/local/lib/python3.8/site-packages/celery/app/trace.py", line 479, in trace_task

mark_as_done(

File "/usr/local/lib/python3.8/site-packages/celery/backends/base.py", line 158, in mark_as_done

self.store_result(task_id, result, state, request=request)

File "/usr/local/lib/python3.8/site-packages/celery/backends/base.py", line 442, in store_result

self._store_result(task_id, result, state, traceback,

File "/usr/local/lib/python3.8/site-packages/celery/backends/database/__init__.py", line 51, in _inner

return fun(*args, **kwargs)

File "/usr/local/lib/python3.8/site-packages/celery/backends/database/__init__.py", line 130, in _store_result

session.commit()

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 1042, in commit

self.transaction.commit()

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 504, in commit

self._prepare_impl()

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 483, in _prepare_impl

self.session.flush()

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2523, in flush

self._flush(objects)

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2664, in _flush

transaction.rollback(_capture_exception=True)

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__

compat.raise_(

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 178, in raise_

raise exception

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2624, in _flush

flush_context.execute()

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute

rec.execute(self)

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py", line 586, in execute

persistence.save_obj(

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 230, in save_obj

_emit_update_statements(

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 994, in _emit_update_statements

c = cached_connections[connection].execute(

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1014, in execute

return meth(self, multiparams, params)

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection

return connection._execute_clauseelement(self, multiparams, params)

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1127, in _execute_clauseelement

ret = self._execute_context(

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1207, in _execute_context

self._handle_dbapi_exception(

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1511, in _handle_dbapi_exception

util.raise_(

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 178, in raise_

raise exception

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1205, in _execute_context

context = constructor(dialect, self, conn, *args)

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 858, in _init_compiled

param = dict(

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 861, in <genexpr>

processors[key](compiled_params[key])

File "/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/sqltypes.py", line 1689, in process

value = dumps(value, protocol)

sqlalchemy.exc.SQLAlchemyError: (builtins.TypeError) cannot pickle 'coroutine' object

event = await Event.create(...)뷰 내에서 호출하면 성공하고 데이터를 db에 저장합니다. Celery 작업자에서 'async / await'를 삭제하면이 오류가 표시되지 않고 Celery 작업은 'SUCCESS'상태이지만 데이터베이스가 비어 있습니다. 옳지 않은 것이 무엇인지 정말로 이해할 수 없습니다.

프랭키 567

단순함 : 현재 Celery는 비동기 기능을 작업으로 처리 할 수 ​​없습니다. asyncio.run( https://docs.python.org/3/library/asyncio-task.html#asyncio.run ) 으로 래핑해야합니다 .

async def add_to_db(request_timestamp, service, url, status_code, response_time):
    event = await Event.create(
        request_timestamp=datetime.strptime(request_timestamp, '%Y-%m-%dT%H:%M:%S.%f'),
        service=service,
        url=url,
        status_code=status_code,
        response_time=datetime.strptime(response_time, '%Y-%m-%dT%H:%M:%S.%f'),
    )

@celery_app.task(acks_late=True)
def add_to_db_task(request_timestamp, service, url, status_code, response_time):
    asyncio.run(add_to_db(request_timestamp, service, url, status_code, response_time))
    return {"status": True}

이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.

침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제

에서 수정
0

몇 마디 만하겠습니다

0리뷰
로그인참여 후 검토

관련 기사

분류에서Dev

Application 개체를 사용하여 데이터 및 전역 변수 저장

분류에서Dev

JavaScript 저장 프로 시저를 사용하여 ID 교체 및 업데이트

분류에서Dev

개체 및 메서드를 사용하여 구분 기호로 구분 된 목록에 텍스트 저장

분류에서Dev

ormlite Android를 사용하여 개체 필드 저장

분류에서Dev

키를 사용하여 데이터 저장소 개체 검색

분류에서Dev

HTML 5를 사용하여 개체 저장

분류에서Dev

Entity Framework를 사용하여 SQL Server에 개체 목록 저장

분류에서Dev

Mongoose 및 Node를 사용하여 레코드를 저장하기 전에 여러 개체 참조를 추가하려면 어떻게해야합니까?

분류에서Dev

NSKeyedArchiver를 사용하여 UserDefaults에 사용자 지정 개체 저장 시도

분류에서Dev

SQL Server에서 매개 변수를 사용하여 저장 프로 시저 생성 및 실행

분류에서Dev

JMeter JDBC 요청 및 매개 변수화를 사용하여 저장 프로 시저의 병렬 실행

분류에서Dev

iOS 및 OSX에 사용자 지정 개체를로드하고 저장하는 방법은 무엇입니까?

분류에서Dev

매개 변수 및 쿼리를 사용하는 MySQL 저장 프로 시저

분류에서Dev

mPDF를 사용하여 여러 pdf 파일 생성 및 저장

분류에서Dev

django_storages를 사용하여 베개 객체를 S3에 저장

분류에서Dev

openssl을 사용하여 인증서 체인 저장 및 검색

분류에서Dev

HashMap을 사용하여 Java에서 객체 저장 및 검색

분류에서Dev

기본 및 파생 클래스 개체를 함께 저장하는 C ++

분류에서Dev

Djcelery없이 Celery 및 Django를 사용하여 Raven 구성

분류에서Dev

Djcelery없이 Celery 및 Django를 사용하여 Raven 구성

분류에서Dev

사용자 지정 개체를 데이터로 변환하여 NSUserDefauts에 저장

분류에서Dev

sencha touch를 사용하여 localStorage에 배열 데이터 개체 저장

분류에서Dev

Rhino Mocks를 사용하여 저장소 개체의 스터 빙 함수 표현

분류에서Dev

For 루프를 사용하여 Grep을 사용하여 파일 정렬 및 저장

분류에서Dev

For 루프를 사용하여 Grep을 사용하여 파일 정렬 및 저장

분류에서Dev

반응 형 프로그래밍 및 JPA 저장소를 사용하여 DB에 객체 유지

분류에서Dev

개인적인 사용 및 기여를위한 Fork GitHub 저장소

분류에서Dev

로컬 저장소를 사용하여 중첩 된 개체에 동적 키로 데이터 저장

분류에서Dev

mongoose 및 nodejs를 사용하여 mongodb에 모델 배열 저장

Related 관련 기사

  1. 1

    Application 개체를 사용하여 데이터 및 전역 변수 저장

  2. 2

    JavaScript 저장 프로 시저를 사용하여 ID 교체 및 업데이트

  3. 3

    개체 및 메서드를 사용하여 구분 기호로 구분 된 목록에 텍스트 저장

  4. 4

    ormlite Android를 사용하여 개체 필드 저장

  5. 5

    키를 사용하여 데이터 저장소 개체 검색

  6. 6

    HTML 5를 사용하여 개체 저장

  7. 7

    Entity Framework를 사용하여 SQL Server에 개체 목록 저장

  8. 8

    Mongoose 및 Node를 사용하여 레코드를 저장하기 전에 여러 개체 참조를 추가하려면 어떻게해야합니까?

  9. 9

    NSKeyedArchiver를 사용하여 UserDefaults에 사용자 지정 개체 저장 시도

  10. 10

    SQL Server에서 매개 변수를 사용하여 저장 프로 시저 생성 및 실행

  11. 11

    JMeter JDBC 요청 및 매개 변수화를 사용하여 저장 프로 시저의 병렬 실행

  12. 12

    iOS 및 OSX에 사용자 지정 개체를로드하고 저장하는 방법은 무엇입니까?

  13. 13

    매개 변수 및 쿼리를 사용하는 MySQL 저장 프로 시저

  14. 14

    mPDF를 사용하여 여러 pdf 파일 생성 및 저장

  15. 15

    django_storages를 사용하여 베개 객체를 S3에 저장

  16. 16

    openssl을 사용하여 인증서 체인 저장 및 검색

  17. 17

    HashMap을 사용하여 Java에서 객체 저장 및 검색

  18. 18

    기본 및 파생 클래스 개체를 함께 저장하는 C ++

  19. 19

    Djcelery없이 Celery 및 Django를 사용하여 Raven 구성

  20. 20

    Djcelery없이 Celery 및 Django를 사용하여 Raven 구성

  21. 21

    사용자 지정 개체를 데이터로 변환하여 NSUserDefauts에 저장

  22. 22

    sencha touch를 사용하여 localStorage에 배열 데이터 개체 저장

  23. 23

    Rhino Mocks를 사용하여 저장소 개체의 스터 빙 함수 표현

  24. 24

    For 루프를 사용하여 Grep을 사용하여 파일 정렬 및 저장

  25. 25

    For 루프를 사용하여 Grep을 사용하여 파일 정렬 및 저장

  26. 26

    반응 형 프로그래밍 및 JPA 저장소를 사용하여 DB에 객체 유지

  27. 27

    개인적인 사용 및 기여를위한 Fork GitHub 저장소

  28. 28

    로컬 저장소를 사용하여 중첩 된 개체에 동적 키로 데이터 저장

  29. 29

    mongoose 및 nodejs를 사용하여 mongodb에 모델 배열 저장

뜨겁다태그

보관