大数跨境
0
0

Spring WebFlux基于反应式WebSocket的应用

Spring WebFlux基于反应式WebSocket的应用 Spring全家桶实战案例
2023-01-02
0
导读:Spring WebFlux基于反应式WebSocket的应用

环境:Springboot2.4.13


WebSocket介绍

WebSocket协议RFC 6455提供了一种标准化的方式,通过一个TCP连接在客户端和服务器之间建立全双工、双向的通信通道。它是一个不同于HTTP的TCP协议,但设计为在HTTP之上工作,使用80和443端口,并允许重用现有的防火墙规则。

WebSocket交互开始于一个HTTP请求,使用HTTP Upgrade Header进行升级,在本例中是切换到WebSocket协议。下面的例子展示了这种交互:

GET /spring-websocket-portfolio/portfolio HTTP/1.1

Host: localhost:8080

Upgrade: websocket // The Upgrade header.
Connection: Upgrade // Using the Upgrade connection.

Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==

Sec-WebSocket-Protocol: v10.stomp, v11.stomp

Sec-WebSocket-Version: 13 Origin: http://localhost:8080

支持WebSocket的服务器会返回类似下面的输出,而不是通常的200状态码:

HTTP/1.1 101 Switching Protocols

Upgrade: websocket

Connection: Upgrade

Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=

Sec-WebSocket-Protocol: v10.stomp

握手成功后,HTTP upgrade请求的TCP套接字保持打开,客户端和服务器可以继续发送和接收消息。

对WebSockets工作原理的完整介绍超出了本文档的范围。请参阅RFC 6455、HTML5中有关WebSocket的章节,或者网上的任何介绍和教程。

注意,如果WebSocket服务器运行在web服务器(例如nginx)后面,你可能需要配置它来将 WebSocket升级请求传递给WebSocket服务器。

自定义HandlerMapping

自定义HandlerMapping是为了在项目中能够自动的失败0到N的不同请求的WebSocket连接

public class WebSocketHandlerMapping extends SimpleUrlHandlerMapping {

@Override
public void initApplicationContext() throws BeansException {
Map<String, WebSocketHandler> handlers = new HashMap<>();
ApplicationContext context = getApplicationContext() ;
Map<String, WebSocketHandler> beans = context.getBeansOfType(WebSocketHandler.class) ;
for (WebSocketHandler handler : beans.values()) {
WebSocketMapping webSocketMapping = AnnotatedElementUtils.findMergedAnnotation(handler.getClass(), WebSocketMapping.class) ;
if (webSocketMapping != null) {
String value = webSocketMapping.value() ;
if (StringUtils.hasLength(value)) {
handlers.put(value, handler) ;
}
}
}
if (handlers.size() > 0) {
this.setUrlMap(handlers) ;
super.initApplicationContext();
}
}

@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE ;
}

}

在这HandlerMapping中使用了自定义的Mapping注解

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface WebSocketMapping {
/**请求路径*/
String value() default "" ;
}

通过上面的HandlerMapping处理能够识别出当前环境下所有带有@WebSocketMapping注解的Bean,然后进行注册到当前的URL集合中。

@Component
@WebSocketMapping("/chat2/{name}")
public class ChatWebSocketHandler2 implements WebSocketHandler {
private static final Logger logger = LoggerFactory.getLogger(ChatWebSocketHandler2.class) ;
public static final Map<String, WebSocketWrapper> sessions = new ConcurrentHashMap<>() ;

@Override
public Mono<Void> handle(WebSocketSession session) {
System.out.println(session) ;
URI uri = session.getHandshakeInfo().getUri() ;
String path = uri.getPath() ;
String username = path.split("/")[2] ;
logger.info("Client id: {} Connected, Request URI: {}", session.getId(), uri) ;
HttpHeaders headers = session.getHandshakeInfo().getHeaders() ;
logger.info("Request Headers: {}", headers) ;
Mono<Void> receive = session.receive()
.doOnNext(message -> {
// 这里如果header中没有to,那么返回null,所以要做好判断,不然默认异常是不会被抛出的
// 导致连接即关闭,只有加了下面的onErrorMap才能看到异常信息
List<String> tos = headers.get("to") ;
if (tos !=null && !tos.isEmpty()) {
String to = tos.get(0) ;
WebSocketWrapper wsw = sessions.get(to) ;
if (wsw != null) {
String msg = message.getPayloadAsText() ;
logger.info("给 {} 发送消息: {}", tos, msg) ;
wsw.send(msg) ;
}
} else {
logger.info("Chat 接收到消息: {}", message.getPayloadAsText());
}
}).onErrorMap(ex -> {
ex.printStackTrace();
return ex ;
}).then() ;
Mono<Void> sender = session.send(Flux.create(sink -> sessions.put(username, new WebSocketWrapper(session, sink)))) ;
return Mono.zip(receive, sender).doFinally(signalType -> {
logger.info("Client id: {}, 断开连接. 信号: {}", session.getId(), signalType.name());
sessions.remove(username) ;
session.close() ;
}).then() ;
}
}

WebSocketWrapper

public class WebSocketWrapper {
private WebSocketSession session ;
private FluxSink<WebSocketMessage> sink ;
public void send(String payload) {
this.sink.next(session.textMessage(payload)) ;
}
}

测试:

点对点消息


完毕!!!


【声明】内容源于网络
0
0
Spring全家桶实战案例
Java全栈开发,前端Vue2/3全家桶;Spring, SpringBoot 2/3, Spring Cloud各种实战案例及源码解读
内容 832
粉丝 0
Spring全家桶实战案例 Java全栈开发,前端Vue2/3全家桶;Spring, SpringBoot 2/3, Spring Cloud各种实战案例及源码解读
总阅读38
粉丝0
内容832