Spring WebFluxリアクティブWebSocketが接続のクローズを防止する

MiCkl:

Spring WebFluxをバックエンドにReactiveMongoRepositoryを使用し、Angular 4をフロントに使用して、アプリケーションの簡単なチャットモジュールに取り組んでいます。WebSocketSessionを介してデータを受信できますが、dbからすべてのメッセージをストリーミングした後、接続を維持してメッセージリストを更新します。誰かが私にそれを達成する方法の手掛かりを与えることができますか、あるいは私は間違った仮定に従っているのでしょうか?

WebSocketを担当するJavaバックエンド、私のサブスクライバーは現在の状態のみをログに記録し、そこには何も関係ありません。

WebFluxConfiguration:

@Configuration
@EnableWebFlux
public class WebSocketConfig {

private final WebSocketHandler webSocketHandler;

@Autowired
public WebSocketConfig(WebSocketHandler webSocketHandler) {
    this.webSocketHandler = webSocketHandler;
}

@Bean
@Primary
public HandlerMapping webSocketMapping() {
    Map<String, Object> map = new HashMap<>();
    map.put("/websocket-messages", webSocketHandler);

    SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
    mapping.setOrder(10);
    mapping.setUrlMap(map);
    return mapping;
}

@Bean
public WebSocketHandlerAdapter handlerAdapter() {
    return new WebSocketHandlerAdapter();
}


}

WebSocketHandlerの実装

@Component
public class MessageWebSocketHandler implements WebSocketHandler {

private final MessageRepository messageRepository;
private ObjectMapper mapper = new ObjectMapper();
private MessageSubscriber subscriber = new MessageSubscriber();

@Autowired
public MessageWebSocketHandler(MessageRepository messageRepository) {
    this.messageRepository = messageRepository;
}

@Override
    public Mono<Void> handle(WebSocketSession session) {
    session.receive()
            .map(WebSocketMessage::getPayloadAsText)
            .map(this::toMessage)
            .subscribe(subscriber::onNext, subscriber:: onError, subscriber::onComplete);
    return session.send(
            messageRepository.findAll()
                    .map(this::toJSON)
                    .map(session::textMessage));
}

private String toJSON(Message message) {
    try {
        return mapper.writeValueAsString(message);
    } catch (JsonProcessingException e) {
        throw new RuntimeException(e);
    }
}

private Message toMessage(String json) {
    try {
        return mapper.readValue(json, Message.class);
    } catch (IOException e) {
        throw new RuntimeException("Invalid JSON:" + json, e);
    }
}
}

とMongoRepo

@Repository
public interface MessageRepository extends 
ReactiveMongoRepository<Message,String> {
}

フロントエンドの処理:

@Injectable()
export class WebSocketService {
  private subject: Rx.Subject<MessageEvent>;

  constructor() {
  }

  public connect(url): Rx.Subject<MessageEvent> {
    if (!this.subject) {
      this.subject = this.create(url);
      console.log('Successfully connected: ' + url);
    }
    return this.subject;
  }

  private create(url): Rx.Subject<MessageEvent> {
    const ws = new WebSocket(url);
    const observable = Rx.Observable.create(
      (obs: Rx.Observer<MessageEvent>) => {
        ws.onmessage = obs.next.bind(obs);
        ws.onerror = obs.error.bind(obs);
        ws.onclose = obs.complete.bind(obs);
        return ws.close.bind(ws);
      });
    const observer = {
      next: (data: Object) => {
        if (ws.readyState === WebSocket.OPEN) {
          ws.send(JSON.stringify(data));
        }
      }
    };
    return Rx.Subject.create(observer, observable);
  }
}

他のサービスでは、私のタイプへの応答から観察可能にマッピングしています

  constructor(private wsService: WebSocketService) {
    this.messages = <Subject<MessageEntity>>this.wsService
      .connect('ws://localhost:8081/websocket-messages')
      .map((response: MessageEvent): MessageEntity => {
        const data = JSON.parse(response.data);
        return new MessageEntity(data.id, data.user_id, data.username, data.message, data.links);
      });
  }

そして最後に、接続が閉じられているために使用できないsend関数を使用したサブスクリプション:

  ngOnInit() {
    this.messages = [];
    this._ws_subscription = this.chatService.messages.subscribe(
      (message: MessageEntity) => {
        this.messages.push(message);
      },
      error2 => {
        console.log(error2.json());
      },
      () => {
        console.log('Closed');
      }
    );
  }

  sendTestMessage() {
    this.chatService.messages.next(new MessageEntity(null, '59ca30ac87e77d0f38237739', 'mickl', 'test message angular', null));
  }
ブライアンクロゼル:

受信中のチャットメッセージがデータストアに永続化されていると仮定すると、Spring Data MongoDB Reactiveのカスタマイズ可能なカーソル機能を使用できます(リファレンスドキュメント参照)。

したがって、次のようにリポジトリに新しいメソッドを作成できます。

public interface MessageRepository extends ReactiveSortingRepository< Message, String> {

    @Tailable
    Flux<Message> findWithTailableCursor();
}

テール可能カーソルにはいくつかの制限があることに注意してください。mongoコレクションにはキャップを付ける必要があり、エントリは挿入順にストリーミングされます。

Spring WebFlux WebSocketサポートはまだSTOMPもメッセージブローカーもサポートしていませんが、そのようなユースケースにはこれがより良い選択かもしれません。

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

分類Dev

リアクティブプログラミング:Spring WebFlux:マイクロサービス呼び出しのチェーンを構築する方法は?

分類Dev

Spring Boot 2-Webflux-Websocket-圧縮をアクティブ化

分類Dev

Spring Boot WebfluxリアクティブAPI

分類Dev

Spring WebFlux:WebClientは2つのリアクティブRESTfulWebサービスを組み合わせます

分類Dev

Spring WebFlux:Spring Data MongoDBリアクティブリポジトリのnull値で例外を発行しますか?

分類Dev

Spring、WebSocketを使用して接続しているクライアントにメッセージをブロードキャストする方法は?

分類Dev

Spring webfluxが別のリクエストをブロックしている

分類Dev

Spring webfluxが別のリクエストをブロックしている

分類Dev

Spring Bootは、ルックアップテーブルがそれらを接続するときに、1つのエンティティが別のエンティティの中にネストされている2つのエンティティを返す方法は?

分類Dev

Spring WebFlux:1つの接続のみがサブスクライバーの受信を許可

分類Dev

postgreSQLに接続されたSpring Bootアプリケーションの「エンティティ/テーブルの関係」を修正する方法(POSTリクエストの問題)

分類Dev

@JmsListenerが使用するSpring BootのアクティブなJMS接続/セッションにアクセスする方法

分類Dev

Spring Data、MySQL、8時間の非アクティブ状態の後に接続が切断される

分類Dev

Spring Websocketのセキュリティ:ブローカーへの直接のSENDリクエストを防ぐ方法

分類Dev

Spring Boot WebSocketブローカーが最大接続数を設定

分類Dev

ゲートウェイをサービスアクティベーターに接続するSpring Integration

分類Dev

SpringとHibernateでエンティティ/クラスを1つだけ使用して3つのテーブルを接続する方法

分類Dev

Spring Security Webflux /リアクティブ例外処理

分類Dev

Spring Webflux-Webflux Webクライアントの使用時に、ログ接続IDと新しい接続ログが表示されない

分類Dev

Spring Webflux-Webflux Webクライアントの使用時に、ログ接続IDと新しい接続ログが表示されない

分類Dev

Springセキュリティを使用してSpringのコントローラーからUserDetailsオブジェクトにアクセスする

分類Dev

Spring Data JPAが例外をスローするネイティブクエリを削除する

分類Dev

Springのセキュリティremember-me実装がログアウト時にすべてのアクティブなトークンを削除するのはなぜですか?

分類Dev

Springブートキークロークアダプター+ Springセキュリティを使用してoAuthトークンの更新(アクセストークン+更新トークン)を強制する方法。

分類Dev

Springを使用した現在のWebSocket接続に関するメトリックの公開

分類Dev

Spring Boot 2 webflux thymeleafで現在のページのURLを取得して、cssアクティブリンクを設定するにはどうすればよいですか?

分類Dev

Spring Webflux-WebFilter内でリアクティブエンドポイントを呼び出す方法

分類Dev

内部オブジェクトのプロパティの条件がどこにあるかを問わず、Springデータクエリを作成する方法

分類Dev

Spring WebFluxを使用してリアクティブなメモリ内リポジトリをどのように構築しますか?

Related 関連記事

  1. 1

    リアクティブプログラミング:Spring WebFlux:マイクロサービス呼び出しのチェーンを構築する方法は?

  2. 2

    Spring Boot 2-Webflux-Websocket-圧縮をアクティブ化

  3. 3

    Spring Boot WebfluxリアクティブAPI

  4. 4

    Spring WebFlux:WebClientは2つのリアクティブRESTfulWebサービスを組み合わせます

  5. 5

    Spring WebFlux:Spring Data MongoDBリアクティブリポジトリのnull値で例外を発行しますか?

  6. 6

    Spring、WebSocketを使用して接続しているクライアントにメッセージをブロードキャストする方法は?

  7. 7

    Spring webfluxが別のリクエストをブロックしている

  8. 8

    Spring webfluxが別のリクエストをブロックしている

  9. 9

    Spring Bootは、ルックアップテーブルがそれらを接続するときに、1つのエンティティが別のエンティティの中にネストされている2つのエンティティを返す方法は?

  10. 10

    Spring WebFlux:1つの接続のみがサブスクライバーの受信を許可

  11. 11

    postgreSQLに接続されたSpring Bootアプリケーションの「エンティティ/テーブルの関係」を修正する方法(POSTリクエストの問題)

  12. 12

    @JmsListenerが使用するSpring BootのアクティブなJMS接続/セッションにアクセスする方法

  13. 13

    Spring Data、MySQL、8時間の非アクティブ状態の後に接続が切断される

  14. 14

    Spring Websocketのセキュリティ:ブローカーへの直接のSENDリクエストを防ぐ方法

  15. 15

    Spring Boot WebSocketブローカーが最大接続数を設定

  16. 16

    ゲートウェイをサービスアクティベーターに接続するSpring Integration

  17. 17

    SpringとHibernateでエンティティ/クラスを1つだけ使用して3つのテーブルを接続する方法

  18. 18

    Spring Security Webflux /リアクティブ例外処理

  19. 19

    Spring Webflux-Webflux Webクライアントの使用時に、ログ接続IDと新しい接続ログが表示されない

  20. 20

    Spring Webflux-Webflux Webクライアントの使用時に、ログ接続IDと新しい接続ログが表示されない

  21. 21

    Springセキュリティを使用してSpringのコントローラーからUserDetailsオブジェクトにアクセスする

  22. 22

    Spring Data JPAが例外をスローするネイティブクエリを削除する

  23. 23

    Springのセキュリティremember-me実装がログアウト時にすべてのアクティブなトークンを削除するのはなぜですか?

  24. 24

    Springブートキークロークアダプター+ Springセキュリティを使用してoAuthトークンの更新(アクセストークン+更新トークン)を強制する方法。

  25. 25

    Springを使用した現在のWebSocket接続に関するメトリックの公開

  26. 26

    Spring Boot 2 webflux thymeleafで現在のページのURLを取得して、cssアクティブリンクを設定するにはどうすればよいですか?

  27. 27

    Spring Webflux-WebFilter内でリアクティブエンドポイントを呼び出す方法

  28. 28

    内部オブジェクトのプロパティの条件がどこにあるかを問わず、Springデータクエリを作成する方法

  29. 29

    Spring WebFluxを使用してリアクティブなメモリ内リポジトリをどのように構築しますか?

ホットタグ

アーカイブ