私は、反応性、非同期でPostgreSQLデータベースポーラーを交換しようとしていますpostgresの-非同期ドライバなどの春5 Webflux反応用WebSocketクライアントに新たに挿入された行と列ジョシュロングの素晴らしい例は、デモここに基づくセバスチャン・ドゥルーズの 春反応性-遊び場。
MyPublisher
は最初のを取得しますが、row
後続の行を返しません。私Observable
、私のPublisher
、またはpostgres-async-driverの使用方法に問題がありDb
ますか?
public Observable<WebSocketMessage> getObservableWSM(WebSocketSession session){
return
// com.github.pgasync.Db
db.queryRows(sql)
// ~RowMapper method
.map(row -> mapRowToDto(row))
// serialize dto to String for websocket
.map(dto -> { return objectMapper.writeValueAsString(dto); })
// finally, write to websocket session
.map(str -> { return session.textMessage((String) str);
});
}
次に、コンバーターを使用Observable
してに配線します。WebSocketHandler
RxReactiveStream.toPublisher
@Bean
WebSocketHandler dbWebSocketHandler() {
return session -> {
Observable<WebSocketMessage> o = getObservableWSM(session);
return session.send(Flux.from(RxReactiveStreams.toPublisher(o)));
};
}
それはrow
私のsql
ステートメントから最初のものを取得しますが、追加の行はありません。追加の行をストリーミングし続けるにはどうすればよいですか?
理想的には、PostgreSQLでMongoDBTailableカーソルに相当するものが必要だと思います。
この例に基づいて、テーブルに対してsで起動するPostgresトリガーを作成しました。INSERT
CREATE OR REPLACE FUNCTION table_update_notify() RETURNS trigger AS $$
DECLARE
id bigint;
BEGIN
IF TG_OP = 'INSERT' THEN
id = NEW.id;
ELSE
id = OLD.id;
END IF;
PERFORM pg_notify('my_trigger_name', json_build_object('table', TG_TABLE_NAME, 'id', id, 'type', TG_OP)::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
次に、reactive-pg-clientを使用してそのPostgresトリガーをサブスクライブしました。Pub / Subの例のコードは次のとおりです。
@Bean
PgPool subscribedNotificationHandler() {
PgPool client = pgPool();
client.getConnection(asyncResult -> {
if (asyncResult.succeeded()) {
PgConnection connection = asyncResult.result();
connection.notificationHandler(notification -> {
notification.getPayload();
// do things with payload
});
connection.query("LISTEN my_trigger_name", ar -> {
log.info("Subscribed to channel");
});
}
});
return client;
}
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加