ProcessPoolでSQLAlchemy接続を処理する方法は?

ローマ:

RabbitMQブローカーからメッセージをフェッチし、ワーカーメソッドをトリガーしてこれらのメッセージをプロセスプールで処理するリアクターがあり、次のようになっています。

リアクター

これは、Pythonを使用して実装されasyncioloop.run_in_executor()そしてconcurrent.futures.ProcessPoolExecutor

次に、SQLAlchemyを使用してワーカーメソッドでデータベースにアクセスします。ほとんどの場合、処理は非常に単純で迅速なCRUD操作です。

reactorは最初は1秒あたり10〜50のメッセージを処理するため、すべてのリクエストに対して新しいデータベース接続を開くことはできません。むしろ、プロセスごとに1つの永続的な接続を維持したいと考えています。

私の質問は、次のとおりです。それらをグローバル変数に格納できますか?SQA接続プールはこれを処理しますか?原子炉が停止したときにクリーンアップする方法は?

[更新]

  • データベースはMySQLとInnoDBです。

なぜプロセスプールでこのパターンを選択するのですか?

現在の実装では、各コンシューマーが独自のスレッドで実行される異なるパターンを使用しています。どういうわけか、これはうまく機能しません。すでに約200のコンシューマーがそれぞれ独自のスレッドで実行されており、システムは急速に成長しています。より適切にスケーリングするためのアイデアは、懸念事項を分離し、I / Oループでメッセージを消費し、処理をプールに委任することでした。もちろん、システム全体のパフォーマンスは主にI / Oバウンドです。ただし、大きな結果セットを処理する場合、CPUが問題になります。

もう一つの理由は「使いやすさ」でした。接続の処理とメッセージの消費は非同期で実装されますが、ワーカーのコードは同期的で単純なものにすることができます。

すぐに、ワーカー内から永続的なネットワーク接続を介してリモートシステムにアクセスすることが問題であることが明らかになりました。これがCommunicationChannelsの目的です。ワーカー内で、これらのチャネルを通じてメッセージバスへのリクエストを許可できます。

私の現在のアイデアの1つは、DBアクセスを同様の方法で処理することです。ステートメントをキューを介してイベントループに渡し、そこでステートメントがDBに送信されます。ただし、SQLAlchemyでこれを行う方法はわかりません。エントリポイントはどこにありますか?オブジェクトはpickled、キューを通過するときに存在する必要があります。SQAクエリからこのようなオブジェクトを取得するにはどうすればよいですか?データベースとの通信は、イベントループをブロックしないように非同期で機能する必要があります。たとえばaiomysqlをSQAのデータベースドライバーとして使用できますか?

olokki:

プロセスプールプロセスごと1つのデータベース接続の要件はsession、ワーカープロセスでormを使用していると想定し、をインスタンス化する方法に注意を払えば、簡単に満たすことができます。

簡単な解決策は、リクエスト全体で再利用するグローバルセッション用意することです。

# db.py
engine = create_engine("connection_uri", pool_size=1, max_overflow=0)
DBSession = scoped_session(sessionmaker(bind=engine)) 

そして労働者の仕事について:

# task.py
from db import engine, DBSession
def task():
    DBSession.begin() # each task will get its own transaction over the global connection
    ...
    DBSession.query(...)
    ...
    DBSession.close() # cleanup on task end

create_engineが使用するデフォルトのQueuePoolを引数pool_sizeおよびmax_overflow カスタマイズしますプロセスは、プロセスプール内のプロセスごとに1つの接続のみを維持します。pool_size

再接続したい場合DBSession.remove()は、レジストリからセッションを削除し、次のDBSessionの使用時に再接続させることができます。またrecyclePool引数を使用して、指定した時間が経過した後に接続を再接続することもできます。

開発/デバッグ中にAssertionPool使用すると、プールから複数の接続がチェックアウトされている場合に例外が発生します。その方法については、プール実装の切り替えを参照してください

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

接続を反復処理する方法

分類Dev

新しいHTTPS接続で突然のバーストを処理する方法は?

分類Dev

Angular 7で接続損失ページを処理する方法は?

分類Dev

vb.netでSQL接続を最適に処理する方法は?

分類Dev

node-amqp — Expressアプリで接続を処理する適切な方法は?

分類Dev

expressJSでmongodb接続を処理する

分類Dev

react-redux接続関数でRefを処理する方法

分類Dev

codeigniterphpでエラー処理を接続する方法

分類Dev

codeigniterphpでエラー処理を接続する方法

分類Dev

SpringとDBCPでJDBC接続を処理する適切な方法は何ですか?

分類Dev

NodeJでMongodbのグローバル接続を処理するための最良の方法は何ですか

分類Dev

efコア2で接続を処理するための最良の方法は何ですか

分類Dev

.netコアで依存性注入を使用してdapperでpostgresqldb接続を処理する方法は?

分類Dev

最も簡単な方法でmysql接続例外を処理する方法

分類Dev

複数のソケット接続を処理するための最良の方法は何ですか

分類Dev

NodeJSでMySQL接続を処理する正しい方法はどのようになっていますか

分類Dev

SKProductsRequest-タイムアウト/接続エラーを処理する方法は?

分類Dev

Indexed db apiの接続を処理するためのより良い方法は?

分類Dev

ポートへの接続のバーストを処理する方法は?

分類Dev

Orange3でウィジェット出力の接続イベントを処理する方法は?

分類Dev

try catchブロックでマングース接続エラーを処理する方法は?

分類Dev

ZeroMQ.jsで接続タイムアウトを適切に処理する方法は?

分類Dev

Pythonでソケットの「接続が拒否されました」例外を処理する方法は?

分類Dev

アプリケーション内でAWS-RDS接続を処理する方法は?

分類Dev

PythonマルチスレッドでMySQL接続を処理する方法

分類Dev

Doobieで光接続プールを正しく処理する方法

分類Dev

Spring統合フローでFtpOutboundAdapter接続例外を処理する方法

分類Dev

nodejsでクラスターとのtcp接続を処理する方法

分類Dev

BIGAyncTaskでインターネット接続の変更を処理する方法

Related 関連記事

  1. 1

    接続を反復処理する方法

  2. 2

    新しいHTTPS接続で突然のバーストを処理する方法は?

  3. 3

    Angular 7で接続損失ページを処理する方法は?

  4. 4

    vb.netでSQL接続を最適に処理する方法は?

  5. 5

    node-amqp — Expressアプリで接続を処理する適切な方法は?

  6. 6

    expressJSでmongodb接続を処理する

  7. 7

    react-redux接続関数でRefを処理する方法

  8. 8

    codeigniterphpでエラー処理を接続する方法

  9. 9

    codeigniterphpでエラー処理を接続する方法

  10. 10

    SpringとDBCPでJDBC接続を処理する適切な方法は何ですか?

  11. 11

    NodeJでMongodbのグローバル接続を処理するための最良の方法は何ですか

  12. 12

    efコア2で接続を処理するための最良の方法は何ですか

  13. 13

    .netコアで依存性注入を使用してdapperでpostgresqldb接続を処理する方法は?

  14. 14

    最も簡単な方法でmysql接続例外を処理する方法

  15. 15

    複数のソケット接続を処理するための最良の方法は何ですか

  16. 16

    NodeJSでMySQL接続を処理する正しい方法はどのようになっていますか

  17. 17

    SKProductsRequest-タイムアウト/接続エラーを処理する方法は?

  18. 18

    Indexed db apiの接続を処理するためのより良い方法は?

  19. 19

    ポートへの接続のバーストを処理する方法は?

  20. 20

    Orange3でウィジェット出力の接続イベントを処理する方法は?

  21. 21

    try catchブロックでマングース接続エラーを処理する方法は?

  22. 22

    ZeroMQ.jsで接続タイムアウトを適切に処理する方法は?

  23. 23

    Pythonでソケットの「接続が拒否されました」例外を処理する方法は?

  24. 24

    アプリケーション内でAWS-RDS接続を処理する方法は?

  25. 25

    PythonマルチスレッドでMySQL接続を処理する方法

  26. 26

    Doobieで光接続プールを正しく処理する方法

  27. 27

    Spring統合フローでFtpOutboundAdapter接続例外を処理する方法

  28. 28

    nodejsでクラスターとのtcp接続を処理する方法

  29. 29

    BIGAyncTaskでインターネット接続の変更を処理する方法

ホットタグ

アーカイブ