Postgres,SQLAlchemy和多处理

埃吉鲁斯·奥尼拉

我是python多重处理的新手,并且正与该主题相关的许多问题苦苦挣扎。我最新的问题是多处理,sqlalchemy和postgres的组合。通过这种组合,我有时会

 sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) SSL error: decryption failed or bad record mac

经过研究,我在文档中找到了这个提示:

https://docs.sqlalchemy.org/zh/13/core/pooling.html “至关重要的是,使用连接池时以及通过使用通过create_engine()创建的Engine扩展时,池共享都不能共享给TCP连接表示为文件描述符,它们通常跨进程边界工作,这意味着它将代表两个或多个完全独立的Python解释器状态并发访问文件描述符。

有两种解决方法。

第一种是在子进程中创建新的Engine,或者在现有Engine上创建新的Engine,然后在子进程使用任何连接之前调用Engine.dispose()。这将从池中删除所有现有连接,以便建立所有新连接。

和这个:

uWSGI,Flask,sqlalchemy和postgres:SSL错误:解密失败或记录不正确的mac “问题最终是uwsgi的分支。

当使用主进程处理多个进程时,uwsgi会在主进程中初始化应用程序,然后将应用程序复制到每个工作进程中。问题是,如果在初始化应用程序时打开数据库连接,那么您将有多个进程共享同一连接,这将导致上述错误。”

我的解释是,在使用多重处理时,我必须确保每个进程都使用一个新的引擎。在我的子进程中,只有一个类可以读写postgres-db,因此我决定在该类中定义一个slqalchemy引擎:

class WS_DB_Booker():
    def __init__(self):

        engine_inside_class = create_engine(botpak.bas.dontgitp.bot_engine_string)
        Base_inside_class = declarative_base()
        Base_inside_class.metadata.create_all(engine_inside_class)
        session_factory_inside_class = sessionmaker(bind=engine_inside_class)
        self.DBSession_inside_class = scoped_session(session_factory_inside_class)

    def example_method_to_read_from_db(self):
        try:
            sql_alc_session = self.DBSession_inside_class()
            sql_alc_session.query(and_so_on....

这项工作正常,在第一次试用中没有任何问题。但是我不确定这是在类中定义引擎的正确方法还是会导致任何问题?

您如何分叉进程或进行分叉的组件真的不是很容易理解。

您需要确保的是在分叉之后实例化WS_DB_Broker该类

如果您以错误的方式(在fork之前实例化)进行操作,则Engine可能已经在的引用中引用了某些dbapi连接Pool请参阅有关使用引擎的SQLAlchemy文档

为了使您的错误更明显,您可以执行以下操作:

import os

class WS_DB_Booker():
    def __init__(self):
        # Remember the process id from the time of instantiation. If the
        # interpreter is forked then the output of `os.getpid()` will change.
        self._pid = os.getpid()

        engine_inside_class = create_engine(botpak.bas.dontgitp.bot_engine_string)
        Base_inside_class = declarative_base()
        Base_inside_class.metadata.create_all(engine_inside_class)
        session_factory_inside_class = sessionmaker(bind=engine_inside_class)
        self._session = scoped_session(session_factory_inside_class)

    def get_session():
        if self._pid != os.getpid():
             raise RuntimeError("Forked after instantiating! Please fix!")
        return self._session()

    def example_method_to_read_from_db(self):
        try:
            sql_alc_session = self.get_session()  
            #                      ^^^^^^^^^^^^^
            # this may throw RuntimeError when used incorrectly, thus saving you
            # from your own mistake.
            sql_alc_session.query(and_so_on....

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章