RabbitMQブローカーからメッセージをフェッチし、ワーカーメソッドをトリガーしてこれらのメッセージをプロセスプールで処理するリアクターがあり、次のようになっています。
これは、Pythonを使用して実装されasyncio
、loop.run_in_executor()
そしてconcurrent.futures.ProcessPoolExecutor
。
次に、SQLAlchemyを使用してワーカーメソッドでデータベースにアクセスします。ほとんどの場合、処理は非常に単純で迅速なCRUD操作です。
reactorは最初は1秒あたり10〜50のメッセージを処理するため、すべてのリクエストに対して新しいデータベース接続を開くことはできません。むしろ、プロセスごとに1つの永続的な接続を維持したいと考えています。
私の質問は、次のとおりです。それらをグローバル変数に格納できますか?SQA接続プールはこれを処理しますか?原子炉が停止したときにクリーンアップする方法は?
[更新]
なぜプロセスプールでこのパターンを選択するのですか?
現在の実装では、各コンシューマーが独自のスレッドで実行される異なるパターンを使用しています。どういうわけか、これはうまく機能しません。すでに約200のコンシューマーがそれぞれ独自のスレッドで実行されており、システムは急速に成長しています。より適切にスケーリングするためのアイデアは、懸念事項を分離し、I / Oループでメッセージを消費し、処理をプールに委任することでした。もちろん、システム全体のパフォーマンスは主にI / Oバウンドです。ただし、大きな結果セットを処理する場合、CPUが問題になります。
もう一つの理由は「使いやすさ」でした。接続の処理とメッセージの消費は非同期で実装されますが、ワーカーのコードは同期的で単純なものにすることができます。
すぐに、ワーカー内から永続的なネットワーク接続を介してリモートシステムにアクセスすることが問題であることが明らかになりました。これがCommunicationChannelsの目的です。ワーカー内で、これらのチャネルを通じてメッセージバスへのリクエストを許可できます。
私の現在のアイデアの1つは、DBアクセスを同様の方法で処理することです。ステートメントをキューを介してイベントループに渡し、そこでステートメントがDBに送信されます。ただし、SQLAlchemyでこれを行う方法はわかりません。エントリポイントはどこにありますか?オブジェクトはpickled
、キューを通過するときに存在する必要があります。SQAクエリからこのようなオブジェクトを取得するにはどうすればよいですか?データベースとの通信は、イベントループをブロックしないように非同期で機能する必要があります。たとえばaiomysqlをSQAのデータベースドライバーとして使用できますか?
プロセスプールプロセスごとに1つのデータベース接続の要件はsession
、ワーカープロセスでormを使用していると想定して、をインスタンス化する方法に注意を払えば、簡単に満たすことができます。
簡単な解決策は、リクエスト全体で再利用するグローバルセッションを用意することです。
# db.py
engine = create_engine("connection_uri", pool_size=1, max_overflow=0)
DBSession = scoped_session(sessionmaker(bind=engine))
そして労働者の仕事について:
# task.py
from db import engine, DBSession
def task():
DBSession.begin() # each task will get its own transaction over the global connection
...
DBSession.query(...)
...
DBSession.close() # cleanup on task end
create_engineが使用するデフォルトのQueuePoolを引数pool_size
およびmax_overflow
カスタマイズします。プロセスは、プロセスプール内のプロセスごとに1つの接続のみを維持します。pool_size
再接続したい場合DBSession.remove()
は、レジストリからセッションを削除し、次のDBSessionの使用時に再接続させることができます。またrecycle
、Poolの引数を使用して、指定した時間が経過した後に接続を再接続することもできます。
開発/デバッグ中にAssertionPoolを使用すると、プールから複数の接続がチェックアウトされている場合に例外が発生します。その方法については、プール実装の切り替えを参照してください。
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加