나는 websocket 서버를 만들기 위해 akka java websocket 자습서를 따르고 있습니다. 두 가지 추가 기능을 구현하고 싶습니다.
다음은 내 클라이언트 계산 구현을 최소한으로 수정 한 원본 akka Java 서버 예제 코드입니다.
public class websocketServer {
private static AtomicInteger connections = new AtomicInteger(0);//connected clients count.
public static class MyTimerTask extends TimerTask {
//called every second to display number of connected clients.
@Override
public void run() {
System.out.println("Conncurrent connections: " + connections);
}
}
//#websocket-handling
public static HttpResponse handleRequest(HttpRequest request) {
HttpResponse result;
connections.incrementAndGet();
if (request.getUri().path().equals("/greeter")) {
final Flow<Message, Message, NotUsed> greeterFlow = greeter();
result = WebSocket.handleWebSocketRequestWith(request, greeterFlow);
} else {
result = HttpResponse.create().withStatus(413);
}
connections.decrementAndGet();
return result;
}
public static void main(String[] args) throws Exception {
ActorSystem system = ActorSystem.create();
TimerTask timerTask = new MyTimerTask();
Timer timer = new Timer(true);
timer.scheduleAtFixedRate(timerTask, 0, 1000);
try {
final Materializer materializer = ActorMaterializer.create(system);
final Function<HttpRequest, HttpResponse> handler = request -> handleRequest(request);
CompletionStage<ServerBinding> serverBindingFuture =
Http.get(system).bindAndHandleSync(
handler, ConnectHttp.toHost("****", 1183), materializer);
// will throw if binding fails
serverBindingFuture.toCompletableFuture().get(1, TimeUnit.SECONDS);
System.out.println("Press ENTER to stop.");
new BufferedReader(new InputStreamReader(System.in)).readLine();
timer.cancel();
} catch (Exception e){
e.printStackTrace();
}
finally {
system.terminate();
}
}
//#websocket-handler
/**
* A handler that treats incoming messages as a name,
* and responds with a greeting to that name
*/
public static Flow<Message, Message, NotUsed> greeter() {
return
Flow.<Message>create()
.collect(new JavaPartialFunction<Message, Message>() {
@Override
public Message apply(Message msg, boolean isCheck) throws Exception {
if (isCheck) {
if (msg.isText()) {
return null;
} else {
throw noMatch();
}
} else {
return handleTextMessage(msg.asTextMessage());
}
}
});
}
public static TextMessage handleTextMessage(TextMessage msg) {
if (msg.isStrict()) // optimization that directly creates a simple response...
{
return TextMessage.create("Hello " + msg.getStrictText());
} else // ... this would suffice to handle all text messages in a streaming fashion
{
return TextMessage.create(Source.single("Hello ").concat(msg.getStreamedText()));
}
}
//#websocket-handler
}
아래의 두 가지 중요 사항을 해결하십시오.
1-활성 연결을 효과적으로 계산하려면 메트릭을 HttpRequest 흐름이 아닌 메시지 흐름에 연결해야합니다. 을 사용하여이 작업을 수행 할 수 있습니다 watchTermination
. 아래 handleRequest 메서드에 대한 코드 예제
public static HttpResponse handleRequest(HttpRequest request) {
HttpResponse result;
if (request.getUri().path().equals("/greeter")) {
final Flow<Message, Message, NotUsed> greeterFlow = greeter().watchTermination((nu, cd) -> {
connections.incrementAndGet();
cd.whenComplete((done, throwable) -> connections.decrementAndGet());
return nu;
});
result = WebSocket.handleWebSocketRequestWith(request, greeterFlow);
} else {
result = HttpResponse.create().withStatus(413);
}
return result;
}
2-서버가 독립적으로 메시지를 보낼 수 있도록을 사용하여 메시지 흐름을 만들 수 있습니다 Flow.fromSinkAndSource
. 아래 예 (하나의 메시지 만 전송 됨) :
public static Flow<Message, Message, NotUsed> greeter() {
return Flow.fromSinkAndSource(Sink.ignore(),
Source.single(new akka.http.scaladsl.model.ws.TextMessage.Strict("Hello!"))
);
}
이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.
침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제
몇 마디 만하겠습니다