WebSocket¶
WebSocket is a bi-directional, multiplexed protocol built on HTTP. It is widely used for real-time client-server communication (chat, live updates, gaming) and supports custom subprotocols.
Spring WebFlux adds simple basic WebSocket support.
Server Side¶
Firstly create a WebSocket Handler to process the incoming messages and send back to the client.
public class PostsWebSocketHandler implements WebSocketHandler {
private final PostRepository posts;
public PostsWebSocketHandler(PostRepository posts) {
this.posts = posts;
}
@Override
public List<String> getSubProtocols() {
return Arrays.asList("test");
}
@Override
public Mono<Void> handle(WebSocketSession session) {
String protocol = session.getHandshakeInfo().getSubProtocol();
WebSocketMessage message = session.textMessage(this.posts.findAll().takeLast(0).toString());
return doSend(session, Mono.just(message));
}
// Small delay to avoid client-side timing issues when sending initial frames.
private Mono<Void> doSend(WebSocketSession session, Publisher<WebSocketMessage> output) {
return session.send(Mono.delay(Duration.ofMillis(100)).thenMany(output));
}
}
And register this handler in a HandlerMapping bean.
@Bean
public HandlerMapping handlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/echo", new EchoWebSocketHandler());
map.put("/posts", new PostsWebSocketHandler(this.posts));
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setUrlMap(map);
return mapping;
}
In a none Spring Boot application, you have to declare a WebSocketHandlerAdapter bean.
Source codes: spring-reactive-sample/websocket.
Client Side¶
Let's move to the client side to interact with this WebSocket.
Spring provides a built-in Netty based WebSocket client to communicate with the server side.
WebSocketClient client = new ReactorNettyWebSocketClient();
// client.execute(new URI("ws://localhost:8080/echo"), (WebSocketSession session) -> {
// session.send().log().;
// });
int count = 100;
Flux<String> input = Flux.range(1, count).map(index -> "msg-" + index);
ReplayProcessor<Object> output = ReplayProcessor.create(count);
client.execute(new URI("ws://localhost:8080/echo"),
session -> {
log.debug("Starting to send messages");
return session
.send(input.doOnNext(s -> log.debug("outbound " + s)).map(session::textMessage))
.thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText))
.subscribeWith(output)
.doOnNext(s -> log.debug("inbound " + s))
.then()
.doOnTerminate((aVoid, ex) ->
log.debug("Done with " + (ex != null ? ex.getMessage() : "success")));
})
.block(Duration.ofMillis(5000));
}
Source codes: spring-reactive-sample/client.
A more close to the real world example application, go to hantsy/angular-spring-websocket-sample/.