为什么
大家的项目一定有这种场景,系统发布公告,消息更更新,商家和客户私聊,这种场景,为了保证实时性总不能是http一直长轮询,所以就要用到今天说的websocket
WebSocket 是一种网络通信协议,提供了一个在单个长时间连接上进行全双工、双向交互的通道。WebSocket 设计用来在浏览器和服务器之间进行交互式通信会话。用户可以在网页上发送消息到服务器并接收即时响应,而不必每次都重新加载页面。以下是有关 WebSocket 的详细笔记,包括其业务场景、为何出现、HTTP的不足,以及不同的消息推送方式。
为什么出现
这部分是计算机网络中的内容 建议看>
WebSocket 的出现主要是为了解决 HTTP 协议在实时通信方面的一些局限性:
- 连接重用:HTTP 协议在每次请求时都需要重新建立连接(HTTP/1.1 之前),这在需要频繁通信的场景中效率很低。
- 非实时性:传统的 HTTP 请求-响应模型不能满足实时互动的需求,因为服务器无法主动向客户端推送信息。
- 开销较大:每次 HTTP 请求都会携带完整的头信息,增加了不必要的网络负载。
HTTP的不足
- 单工通信:HTTP 是单工的,客户端发送请求后服务器才能响应,服务器不能主动发送消息。
- 频繁的连接开销:每个 HTTP 连接在传输完毕后通常都需要关闭,再次通信需要重新建立连接,这在需要频繁实时交互的应用中显得尤为低效。
- 头部开销:HTTP 请求和响应都包含大量的头部信息,这对于小数据包的传输非常不利。
并且由于http是单向的,必须有客户端发起请求,我们开发的服务端才会接收返回响应
常见的消息推送方式
轮询方式
SSe
以及现在说的websocket
执行过程
因为websocket 也是从http升级而来,更改协议
先了解http的执行过程
-
建立连接:浏览器(客户端)通过网络向服务器发起一个 TCP 连接。其中包含3次握手
-
第一次握手:客户端向服务器发送一个SYN包,告诉服务器我要跟你建立连接。这个SYN包里面包含了客户端的初始序列号。
-
第二次握手:服务器收到SYN包后,会回复一个SYN+ACK包给客户端。这个ACK是确认客户端的SYN包的,表示服务器已经收到了。同时,服务器也会发送一个自己的SYN包给客户端,告诉客户端我也要跟你建立连接。
-
第三次握手:客户端收到服务器的SYN+ACK包后,会再回复一个ACK包给服务器。这个ACK是确认服务器的SYN包的,表示客户端也收到了服务器的建立连接请求。
-
-
发送 HTTP 请求:客户端构建 HTTP 请求,包括方法(GET、POST、PUT、DELETE 等)、URI、协议版本,以及必要的请求头和请求体请求发送到服务器。
服务器处理请求:服务器接收到请求后,解析请求内容,并根据请求的资源和方法执行相应的动作(如从数据库检索数据、处理提交的表单等)。
发送 HTTP 响应: -
服务器构建 HTTP 响应,包括状态码(如 200 OK、404 Not Found)、响应头和响应体。
响应发回到客户端。 -
关闭连接:在 HTTP/1.0 中,默认情况下,服务器在发送响应后关闭 TCP 连接。HTTP/1.1 支持持久连接(Connection: keep-alive),允许多个请求和响应在同一个连接中传输,从而减少了建立和关闭连接的频率和成本。(这里包含四次挥手)
-
第一次挥手:客户端向服务器发送一个FIN包,通知将要断开连接了。
-
第二次挥手:服务器收到FIN包后,会回复一个ACK回调包给客户端,表示已经收到了客户端的断开连接请求。
-
第三次挥手:服务器在发送完所有数据后,会向客户端发送一个FIN包,告诉客户端我也要断开连接了。
-
第四次挥手:客户端收到服务器的FIN包后,会回复一个ACK包给服务器,表示已经收到了服务器的断开连接请求。
-
而websocket的过程可以通下面案列,出现请求状态101的就表示升级为web socket
比如gpt的回复页面
可以查看请求的消息,一般是初次响应的响应体
连接过程
客户端发起请求:客户端发送一个特殊的 HTTP 请求,请求升级到 WebSocket。这个请求看起来像一个标准的 HTTP 请求,但包含一些特定的头部字段来指示这是一个 WebSocket 升级请求:
-
Upgrade: websocket:明确请求升级到 WebSocket。
-
Connection: Upgrade:指示这是一个升级请求。
-
Sec-WebSocket-Key:一个 Base64 编码的随机值,服务器将用它来构造一个响应头,以确认连接的有效性。
-
Sec-WebSocket-Version:指示 WebSocket 协议的版本,通常是 13。
服务器响应:如果服务器支持 WebSocket,并接受升级请求,则它会返回一个 HTTP 101 Switching Protocols 响应,包含以下头部: -
Upgrade: websocket 和 Connection: Upgrade:确认升级到 WebSocket。
-
Sec-WebSocket-Accept:使用客户端的 Sec-WebSocket-Key 计算得出的一个值,用于验证连接。
建立 WebSocket 连接:一旦握手成功,原始的 HTTP 连接就升级到 WebSocket 连接。此时,客户端和服务器可以开始在这个长连接上双向发送数据。
数据传输:与 HTTP 不同,WebSocket 允许服务器直接发送消息给客户端,而不需要客户端先发送请求,这对于需要实时数据更新的应用非常有用(例如在线游戏、交易平台等)。
接下来就是实现了,模拟消息聊天
代码实现
代码地址
前端页面(vue+vuetify ui) 有兴趣可以看看
实现步骤
先讲一下原理,之前说得websocket是双向通道,客户端连接服务端的端点,也就是没有一个连接就有一个端点实例
再java中是通过会话管理通道
建立过程可以知道,再建立连接之前会先进行握手,那么我们就可以再握手的时候对该线程用户进行验证,然后具体端点会有几个对应的生命周期 建立成功 接收到消息 连接关闭 连接异常我们就可以写对应事件,然后再建立成功时候,可以把对应建立成功的会话,再把对应的通道和用户ida进行保存,这样就可以根据id找到具体通道,进行发送消息。
代码逻辑客户端像服务端发起连接,服务端就要有一个路由对应这个连接进行处理,这个路由称为端点endpoint
所以核心就是写端点 完成端点生命周期
值得注意:我的boot 是3,jdk 17,虽然过程一样,但是springboot2关于网络编程这一块的包都是再javax ,3开始就是 jakarta包了
1.添加依赖
dependency>
groupId>org.springframework.bootgroupId>
artifactId>spring-boot-starter-websocketartifactId>
dependency>
- 配置类
作用是对端点进行扫描
在Spring Boot中,ServerEndpointExporter是一个非常重要的Bean,
* 因为它负责扫描和注册所有带有@ServerEndpoint注解的WebSocket端点。
*
* 以下是该Bean的作用及其配置的详细说明:
*
* 作用
* 扫描带有@ServerEndpoint注解的类:ServerEndpointExporter会自动扫描应用程序中的所有@ServerEndpoint注解的类,并将它们注册为WebSocket端点。
* 注册WebSocket端点:它将找到的WebSocket端点注册到默认的容器(如Tomcat、Jetty、Undertow等),使这些端点能够接受和处理WebSocket连接和消息。
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter(){
return new ServerEndpointExporter();
}
}
3.写端点配置类 (每个端点的作用不一样所以对端点的配置类也不一样)
集成该配置类进行重写 可以见名知意的发现:检查跨域 获取容器默认配置,获取端点实列都是一些等,而主要这里修改的就是 modifyHandshake(修改握手) 这里说的是还没有建立的时候,这里可以进行处理(比如保存用户信息,解析token,生成唯一info等等)
@Configuration
public class GetHttpSessionConfig extends ServerEndpointConfig.Configurator {
/**
* 注意:没有一个客户端发起握手,端点就有一个新的实列 那么引用的这个配置也是新的实列 所以内存地址不一样 这里sec的用胡属性也不同就不会产生冲突
* 修改握手机制 就是第一次http发送过来的握手
* @param sec 服务器websocket端点的配置
* @param request
* @param response
*/
@Override
public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
// super.modifyHandshake(sec, request, response);
HttpSession httpSession =(HttpSession) request.getHttpSession();
// 将从握手的请求中获取httpsession
/**
* 一般会在请求头中添加token 解析出来id作为键值对
*/
MapString, Object> properties = sec.getUserProperties();
/**
* 一个客户端和和服务器发起一次请求交互 就有一个唯一session
*存储session 是为了能够从其中用户用户info 到时候作为wssession的key 但是session 不共享 为此redis改进 或者token也是可以的
* 这里使用UUID作为标识
*/
// properties.put(HttpSession.class.getName(),httpSession);
String sessionKey = UUID.randomUUID().toString().replaceAll("-", "");
properties.put("Connected",sessionKey);
}
}
代码解析:该端点配置是再模拟新用户访问一个网站,网站主动给用户推送广告的情况,所以用户没有登录,就无法从httpsession,请求头,token,redis中获取个人信息了,所以这里使用的uuid,作为游客的websocketid
因为每有一个客户端建立websocket连接就有一个端点实列,一个端点配置,所以这里的 properties.put(“Connected”,sessionKey);key值相同无妨
- 编写端点引用编写的端点配置类 注意每个周期的参数顺序不能错否则会报错
@Slf4j
@Component
@CrossOrigin(origins = "*")
@ServerEndpoint(value = "/chat",configurator = GetHttpSessionConfig.class)//协议升级路由
public class ChatEndpoint {//只要和该路由建立连接 就new 一个新的实列 对应一个该endpoint对象
//模拟储存当前用户的朋友全信息
private static final MapString, Session> Friendgroup=new ConcurrentHashMapString,Session>();//线程安全的银蛇
private HttpSession httpSession;//存放当前用户信息
/**
* 定义的当前用户
*/
private String userId;
/**
* 第一个参数必须是session
* @param session
* @param sec 不能是Server
*/
@OnOpen
public void onOpen(Session session,EndpointConfig sec){
// 1.保存当前连接用户状态
//每个端点获取该端点携带的httpsession数据
// this.httpSession = (HttpSession) sec.getUserProperties().get(HttpSession.class.getName());
// this.httpSession.getAttribute("user");
String sessionKey =(String) sec.getUserProperties().get("Connected");
this.userId=sessionKey;//用户上下文填充
//2.把成功建立升级的会话让放入会话组
Friendgroup.put(sessionKey,session);
//之所以获取http session 是为了获取获取httpsession中的数据 (用户名 /账号/信息)
System.out.println("websocket建立成功");
// 2.广播消息(如果是好咧别表上下) 模拟放房间提示
String content="用户id"+sessionKey+"已经上线 愉快玩耍吧";
Message message = Message.builder()
.content(content)
.isSystem(true).build();
broadcast(message);
System.out.println("WebSocket 连接建立成功: " + sessionKey);
// 3.
}
/**
* 当断开连接
* @param session
*/
@OnClose
public void onClose(Session session) {
// 找到关闭会话对应的用户 ID 并从 Friendgroup 中移除
String sessionKey = this.userId;
if (sessionKey != null) {
Friendgroup.remove(sessionKey);
// 广播消息给所有好友
String content = "用户ID " + sessionKey + " 已经下线";
Message message = Message.builder()
.content(content)
.isSystem(true)
.build();
broadcast(message);
}
}
/**
* 这是接收和处理来自用户的消息的地方。我们需要在这里处理消息逻辑,可能包括广播消息给所有连接的用户。
* @param //前端可以携带来自forname 但是我在这个实列化内部做了一个上下文
*
* 如果接收的消息的是对象 需要解码器,@PathParam("roomId") String roomId, 如果参数写在了第一位 那么就需要使用该注解获取路由的参数信息
*/
@OnMessage
public void onMessage(Session session,String message) throws IOException {
System.out.println("接收到消息"+message);
Message o = (Message) JSON.parse(message);
Message message1 = Message.builder().sender(userId)
.toReceiver(o.getToReceiver())
.content(o.getContent())
.build();
Session session1 = Friendgroup.get(userId);
session1.getBasicRemote().sendText(JSON.toJSONString(message1));
// 你的其他逻辑
}
/**
* 处理WebSocket中发生的任何异常。可以记录这些错误或尝试恢复。
*/
@OnError
public void onError(Throwable error) {
System.out.println("onError......"+error.getMessage());
}
/**
* 将系统的公告等需要推送的消息发布给所有已经建立连接的用户
* 用于系统更细发布公告之类的 或者用户上线通知其他
* @param message
*/
private void broadcast(Message message) throws RuntimeException {
if (message.isSystem()){
Friendgroup.entrySet()
.forEach(item->{
// 遍历每一个键值对
Session userSession = item.getValue();
try {
userSession
.getBasicRemote() //同步消息发送器
.sendText(JSON.toJSONString(message));
} catch (IOException e) {
// 记录日志 保存文件便于查看
throw new RuntimeException(e);
}
;
});
return;
}
else{
try {
Friendgroup.get(message.getSender())
.getBasicRemote()
.sendText(JSON.toJSONString(message));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
发消息的核心代码: Friendgroup.get(message.getSender())
.getBasicRemote()
.sendText(JSON.toJSONString(message));
端点中引用的消息对象:
/**
* 定义的消息发送对象
*/
@Data
@Builder
public class Message {
//没有toname toname 是发送请求时候携带的
//是否系统消息
private boolean isSystem;
//私聊情况 :a->服务器->b显示 弹幕:a->服务器->广播 ->前端消息
private String sender;//来自哪一位用户发的 如果是私聊
private String content; //消息内容
//A->服务器 指定的发送私聊人 这里id
private String toReceiver;
}
代码解读;
各个注解标识代表对应的生命周期
onopen建立成功:这里的逻辑就是读取配置类中的上下文,得到用户信息存入自身的上下文私有对象,然后用这个作为key,把管理socket的会话存入该map(线程安全:多个websocket会话 必须选这个),然后发布广公告
onclose :关闭前触发,移除会话组,然后提示
onerror:发生错误时候触发,一般记录日志
onMessage:接收到消息时候触发
很方便记忆的是前端也是这几个并且命名也一样
进行测试:通过alert打印出来
确实接收到服务器消息
日志也输出成功
当然这样就实现了上号时候服务器之前的群发,当然一对一私发也可以但是需要再请求头或者哪里携带身份凭证,握手测试demo 所以就是用的无携带凭证后端生成随机uuid作为会话id,所以为了演示点对点的私聊,这里就不做jwt了选择发送握手的时候携带参数
优化演示demo
前端演示
前端这里不想再做多用户了,直接修改钩子函数多创建几个窗口,让这些角色都再后台注册会话,写了俩个页面,一个明日香,一个真嗣
并且用字体颜色来表明俩人
修改后端端点代码
@Slf4j
@Component
@CrossOrigin(origins = "*")
@ServerEndpoint(value = "/chat/{userName}",configurator = GetHttpSessionConfig.class)//协议升级路由
public class ChatEndpoint {//只要和该路由建立连接 就new 一个新的实列 对应一个该endpoint对象
//模拟储存当前用户的朋友全信息
private static final MapString, Session> Friendgroup=new ConcurrentHashMapString,Session>();//线程安全的银蛇
private HttpSession httpSession;
/**
* 定义的当前用户
*/
private String userId;
/**
* 第一个参数必须是session
* @param session
* @param sec 不能是Server
*/
@OnOpen
public void onOpen(Session session,EndpointConfig sec,@PathParam("userName") String userName){
this.userId=userName;//用户上下文填充
//2.把成功建立升级的会话让放入会话组
String sessionKey=userName;
Friendgroup.put(userName,session);
//之所以获取http session 是为了获取获取httpsession中的数据 (用户名 /账号/信息)
System.out.println("websocket建立成功");
// 2.广播消息(如果是好咧别表上下) 模拟放房间提示
String content="用户id"+sessionKey+"已经上线 愉快玩耍吧";
Message message = Message.builder()
.content(content)
.isSystem(true).build();
broadcast(message);
System.out.println("WebSocket 连接建立成功: " + sessionKey);
// 3.
}
/**
* 当断开连接
* @param session
*/
@OnClose
public void onClose(Session session) {
// 找到关闭会话对应的用户 ID 并从 Friendgroup 中移除
String sessionKey = this.userId;
if (sessionKey != null) {
Friendgroup.remove(sessionKey);
// 广播消息给所有好友
String content = "用户ID " + sessionKey + " 已经下线";
Message message = Message.builder()
.content(content)
.isSystem(true)
.build();
broadcast(message);
}
}
/**
* 这是接收和处理来自用户的消息的地方。我们需要在这里处理消息逻辑,可能包括广播消息给所有连接的用户。
* @param //前端可以携带来自forname 但是我在这个实列化内部做了一个上下文
*
* 如果接收的消息的是对象 需要解码器,@PathParam("roomId") String roomId, 如果参数写在了第一位 那么就需要使用该注解获取路由的参数信息
*/
@OnMessage
public void onMessage(Session session,String message) throws IOException {
System.out.println("接收到消息"+message);
JSONObject json = JSON.parseObject(message);
// 从JSONObject中提取必要的字段
String sender = json.getString("sender");
String content = json.getString("content");
String toReceiver = json.getString("toReceiver");
// 创建Message对象
Message message1 = Message.builder()
.sender(sender)
// .toReceiver(toReceiver) //发给谁这个信息无需填写
.content(content)
.build();
//调用发送方的会话 发送给他的客户端显示
Session session1 = Friendgroup.get(toReceiver);
session1.getBasicRemote().sendText(JSON.toJSONString(message1));
// 你的其他逻辑
}
/**
* 处理WebSocket中发生的任何异常。可以记录这些错误或尝试恢复。
*/
@OnError
public void onError(Throwable error) {
System.out.println("onError......"+error.getMessage());
}
/**
* 将系统的公告等需要推送的消息发布给所有已经建立连接的用户
* 用于系统更细发布公告之类的 或者用户上线通知其他
* @param message
*/
private void broadcast(Message message) throws RuntimeException {
if (message.isSystem()){
Friendgroup.entrySet()
.forEach(item->{
// 遍历每一个键值对
Session userSession = item.getValue();
try {
userSession
.getBasicRemote() //同步消息发送器
.sendText(JSON.toJSONString(message));
} catch (IOException e) {
// 记录日志 保存文件便于查看
throw new RuntimeException(e);
}
;
});
return;
}
else{
try {
Friendgroup.get(message.getSender())
.getBasicRemote()
.sendText(JSON.toJSONString(message));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
主要修改是 请求时候直接再路由携带了用户名字,那么这里演示就不再使用uuid随机生成的key来管理会话了,使用用户名字来管理会话模拟好友,实际开发还是需要再握手的时候解析http请求数据来存储关键信息哈
端点携带参数的形式很想restful风格
/路由/{参数名}
@PathParam("userName") String userName
私聊逻辑
发送私聊的一对一实现原理 发送消息时候携带需要发送的对象key,通过该健获取该用户的会话然后发送信息,这里的key是用户名
@OnMessage
public void onMessage(Session session,String message) throws IOException {
System.out.println("接收到消息"+message);
JSONObject json = JSON.parseObject(message);
// 从JSONObject中提取必要的字段
String sender = json.getString("sender");
String content = json.getString("content");
String toReceiver = json.getString("toReceiver");
// 创建Message对象
Message message1 = Message.builder()
.sender(sender)
// .toReceiver(toReceiver) //发给谁这个信息无需填写
.content(content)
.build();
//调用发送方的会话 发送给他的客户端显示
Session session1 = Friendgroup.get(toReceiver);
session1.getBasicRemote().sendText(JSON.toJSONString(message1));
// 你的其他逻辑
}
前端代码:这里写了发送方数据是为了前端渲染页面,如果再其他里面做了处理,可以不用写发送方数据,减少负载,主要是发送消息对容和对方的会话存储的key
异步优化
当出现高并发等高性能需求时候,可以采用异步发发送器,让线程不在这里堵塞
替换为AsycRemote
测试:
当明日香发送你好时候
真嗣用户可以成功收到,并且回复也可以收到
就此完成实现,
总结步骤
添加依赖:确保在pom.xml中添加Spring WebSocket和WebSocket依赖。
创建WebSocket处理器(端点):编写一个处理WebSocket消息的处理器。
完成对应的生命周期
如果需要传递http第一次握手时候处理信息 需要添加对应的处理配置
配置WebSocket:配置WebSocket相关的Bean和端点(值得注意的是每一个端点对象对一个用户线程 所以spring的单实列bean和异步处理再这里无法生效 具体会在踩坑笔记中提及)整合的一些细节
实践细节
websocket 中无法依赖注入?
我在socket中收到的消息 想要使用redis 或者mq进行存储和转发 但是却发现@Auwered字段和@Resouce都无法注入依赖
@ServerEndpoint注解的类在WebSocket服务器端点中无法直接使用Spring的依赖注入机制,如字段注入(@Autowired和@Resource)或构造器注入。这是因为@ServerEndpoint注解的类是由Java WebSocket API(JSR 356)管理的,而不是由Spring容器直接管理的。
为了在@ServerEndpoint类中实现依赖注入,可以使用SpringConfigurator或其他类似的配置器来辅助实现。以下是一个示例,展示了如何使用SpringConfigurator来实现依赖注入:
所以需要修稿配置类
端点配置
@Component
public class SpringConfigurator extends ServerEndpointConfig.Configurator {
private static ApplicationContext applicationContext;
/**、
* 这段代码的作用是将Spring的ApplicationContext注入到SpringConfigurator类中,以便在WebSocket端点中实现依赖注入。
* 具体来说,这段代码通过Spring的@Autowired注解将Spring上下文(ApplicationContext)注入到SpringConfigurator类的静态字段中。
* @param applicationContext
*/
@Autowired
public void setApplicationContext(ApplicationContext applicationContext) {
SpringConfigurator.applicationContext = applicationContext;
}
/**
* @doc
* 为什么写了这个类就可以在 WebSocket 端点中进行注入数据?
* 在 WebSocket 端点中,您可以使用 Spring 的 @Autowired 注解注入 Spring 管理的 Bean,因为 SpringConfigurator 类配置了 getEndpointInstance
* 方法来从 Spring 上下文中获取 WebSocket 端点实例。这允许您在 WebSocket 端点中利用 Spring 的依赖注入和管理特性。
* @param clazz
* @return
* @param
* @throws InstantiationException
*/
@Override
public T> T getEndpointInstance(ClassT> clazz) throws InstantiationException {
return applicationContext.getBean(clazz);
}
@Override
public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
// super.modifyHandshake(sec, request, response);
// 将从握手的请求中获取httpsession
String device_id = request.getParameterMap().get("device_id").get(0);
/**
* 一般会在请求头中添加token 解析出来id作为键值对
*/
java.util.MapString, Object> properties = sec.getUserProperties();
/**
*/
properties.put("connection",Device_PREFIX+device_id);
}
}
SOCKET 端点
@Slf4j
@Component
@ServerEndpoint(value = "/pda", configurator = SpringConfigurator.class)
public class DeviceEndpoint {
private final MqService mqService;
@Autowired
public DeviceEndpoint(MqService mqService) {
this.mqService = mqService;
}
public static final ConcurrentHashMapString, Session> deviceMap = new ConcurrentHashMap>();
private String deviceId="";
@OnOpen
public void onOpen(Session session, EndpointConfig config) {
deviceId = (String) config.getUserProperties().get("connection");
deviceMap.put(deviceId, session);
log.info("设备连接:{}", deviceId);
}
@OnMessage
public void onMessage(Session session, String message) {
log.info("接收到设备数据:{}", message);
mqService.sysSend(message, deviceId );
// session.getBasicRemote().sendText("数据处理成功");
}
@OnClose
public void onClose(Session session) {
String deviceId = (String) session.getUserProperties().get("connection");
if (deviceId != null && !deviceId.isEmpty()) {
deviceMap.remove(deviceId);
log.info("设备断开连接:{}", deviceId);
}
}
@OnError
public void onError(Session session, Throwable error) {
log.error("WebSocket error for session {}: {}", session.getId(), error.getMessage(), error);
}
}