Spring Webflux Async PostgreSQLPublisherが最初の結果後に停止する

JJザブカー

私は、反応性、非同期でPostgreSQLデータベースポーラーを交換しようとしていますpostgresの-非同期ドライバなどの春5 Webflux反応用WebSocketクライアントに新たに挿入された行と列ジョシュロングの素晴らしい例は、デモここに基づくセバスチャン・ドゥルーズの 春反応性-遊び場

MyPublisherは最初のを取得しますが、row後続の行を返しません。Observable、私のPublisher、またはpostgres-async-driverの使用方法に問題がありDbますか?

public Observable<WebSocketMessage> getObservableWSM(WebSocketSession session){
    return
        // com.github.pgasync.Db
        db.queryRows(sql)
        // ~RowMapper method
        .map(row -> mapRowToDto(row))
        // serialize dto to String for websocket
        .map(dto -> { return objectMapper.writeValueAsString(dto); })
        // finally, write to websocket session 
        .map(str -> { return session.textMessage((String) str);
        });
}

次に、コンバーターを使用Observableしてに配線します。WebSocketHandlerRxReactiveStream.toPublisher

@Bean
WebSocketHandler dbWebSocketHandler() {
    return session -> {
        Observable<WebSocketMessage> o = getObservableWSM(session);
        return session.send(Flux.from(RxReactiveStreams.toPublisher(o)));
    };
}

それはrow私のsqlステートメントから最初のものを取得しますが、追加の行はありません。追加の行をストリーミングし続けるにはどうすればよいですか?

理想的には、PostgreSQLでMongoDBTailableカーソルに相当するものが必要だと思います

JJザブカー

この例に基づいて、テーブルに対してsで起動するPostgresトリガーを作成しましたINSERT

CREATE OR REPLACE FUNCTION table_update_notify() RETURNS trigger AS $$
DECLARE
  id bigint;
BEGIN
  IF TG_OP = 'INSERT' THEN
    id = NEW.id;
  ELSE
    id = OLD.id;
  END IF;
  PERFORM pg_notify('my_trigger_name', json_build_object('table', TG_TABLE_NAME, 'id', id, 'type', TG_OP)::text);
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

次に、reactive-pg-clientを使用してそのPostgresトリガーをサブスクライブしましたPub / Subの例のコードは次のとおりです。

@Bean
PgPool subscribedNotificationHandler() {
    PgPool client = pgPool();
    client.getConnection(asyncResult -> {
        if (asyncResult.succeeded()) {
            PgConnection connection = asyncResult.result();
            connection.notificationHandler(notification -> {
                notification.getPayload();
                // do things with payload
            });
            connection.query("LISTEN my_trigger_name", ar -> {
                log.info("Subscribed to channel");
            });
        }
    });
    return client;
}

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

Spring Webflux Async PostgreSQLPublisherが最初の結果後に停止する

分類Dev

Spring @Async vs Spring WebFlux

分類Dev

Spring webflux、 `ServerResponse`のテスト

分類Dev

Spring webflux、 `ServerResponse`のテスト

分類Dev

Spring webfluxでのflatMap()とsubscribe()

分類Dev

Spring Security AbstractAuthenticationProcessingFilterをWebFluxに移行する

分類Dev

Spring WebFlux(Flux):動的に公開する方法

分類Dev

Spring WebFlux(Flux):動的に公開する方法

分類Dev

Spring WebFlux/ Reactor core

分類Dev

Capturing response in spring Webflux

分類Dev

Spring WebFlux with Kafka and Websockets

分類Dev

Spring webflux testing

分類Dev

Spring Webfluxが404を返す(Not Foud)

分類Dev

Spring WebFluxでJaegerを使用する方法は?

分類Dev

Spring WebFlux WebClient:実行を遅らせる

分類Dev

春WebFlux対Spring MVCの(非同期)

分類Dev

Spring Security WebFlux-認証付きの本体

分類Dev

APIのSpring webfluxカスタム認証

分類Dev

Spring 5 webfluxにHandlerFunctionsがあるのはなぜですか?

分類Dev

Spring Framework WebFlux Reactive Programming

分類Dev

Spring WebFlux webclient handle ConnectTimeoutException

分類Dev

Java Spring WebFlux vs RxJava

分類Dev

Spring WebFluxにSpringCloudSleuthのTracingWebFillterを実装する

分類Dev

Spring Boot 2.2へのアップグレード後にSpringboot WebFluxテストが失敗する

分類Dev

Spring-WebFlux Fluxがコンテキストで失敗する

分類Dev

Spring webflux:httpをhttpsにリダイレクトする

分類Dev

Spring WebfluxでFlux <T>をMono <Response <T >>に変換する

分類Dev

Spring Boot 2.1.5、WebFlux、Reactor:MDCを適切に処理する方法

分類Dev

Spring WebFlux UnapiedMediaTypeExceptionを与えるすべてのAPI

Related 関連記事

  1. 1

    Spring Webflux Async PostgreSQLPublisherが最初の結果後に停止する

  2. 2

    Spring @Async vs Spring WebFlux

  3. 3

    Spring webflux、 `ServerResponse`のテスト

  4. 4

    Spring webflux、 `ServerResponse`のテスト

  5. 5

    Spring webfluxでのflatMap()とsubscribe()

  6. 6

    Spring Security AbstractAuthenticationProcessingFilterをWebFluxに移行する

  7. 7

    Spring WebFlux(Flux):動的に公開する方法

  8. 8

    Spring WebFlux(Flux):動的に公開する方法

  9. 9

    Spring WebFlux/ Reactor core

  10. 10

    Capturing response in spring Webflux

  11. 11

    Spring WebFlux with Kafka and Websockets

  12. 12

    Spring webflux testing

  13. 13

    Spring Webfluxが404を返す(Not Foud)

  14. 14

    Spring WebFluxでJaegerを使用する方法は?

  15. 15

    Spring WebFlux WebClient:実行を遅らせる

  16. 16

    春WebFlux対Spring MVCの(非同期)

  17. 17

    Spring Security WebFlux-認証付きの本体

  18. 18

    APIのSpring webfluxカスタム認証

  19. 19

    Spring 5 webfluxにHandlerFunctionsがあるのはなぜですか?

  20. 20

    Spring Framework WebFlux Reactive Programming

  21. 21

    Spring WebFlux webclient handle ConnectTimeoutException

  22. 22

    Java Spring WebFlux vs RxJava

  23. 23

    Spring WebFluxにSpringCloudSleuthのTracingWebFillterを実装する

  24. 24

    Spring Boot 2.2へのアップグレード後にSpringboot WebFluxテストが失敗する

  25. 25

    Spring-WebFlux Fluxがコンテキストで失敗する

  26. 26

    Spring webflux:httpをhttpsにリダイレクトする

  27. 27

    Spring WebfluxでFlux <T>をMono <Response <T >>に変換する

  28. 28

    Spring Boot 2.1.5、WebFlux、Reactor:MDCを適切に処理する方法

  29. 29

    Spring WebFlux UnapiedMediaTypeExceptionを与えるすべてのAPI

ホットタグ

アーカイブ