SpringBoot整合netty-socketio
- 一、准备工作
-
- 1、maven依赖
- 2、socketIO的yml配置
- 3、socketIO的config代码
- 4、SocketIOServer启动或关闭
- 5、项目目录结构
- 二、客户端和服务端建立连接
-
- 1、服务端
-
- 1.1 用户缓存信息ClientCache
- 1.2 SocketIOServerHandler
- 2、客户端
- 3、简单的演示
- 三、广播
-
- 1、SocketIO基础概念图
- 2、定义namespace
- 3、创建namespace所属的Handler
-
- 3.1 自定义Handler
- 3.2 监听自定义Handler
- 3.3演示
-
- 3.3.1 正确演示
- 3.3.1 错误演示
- 四、常用方法
-
- 1、加入房间
- 2、离开房间
- 3、获取用户所有房间
- 4、发送消息给指定的房间
- 5、广播消息给指定的Namespace下所有客户端
- 6、点对点发送
这次整合借鉴了以下博主的智慧
websocket和socketio的区别
socket.io.js最简版单页HTML测试工具
Netty-SocketIO多路复用
springboot学习(四十三) springboot使用netty-socketio实现消息推送
SpringBoot集成SocketIO
一、准备工作
1、maven依赖
socketio的核心依赖就只有这个
dependency>
groupId>com.corundumstudio.socketiogroupId>
artifactId>netty-socketioartifactId>
version>1.7.19version>
dependency>
2、socketIO的yml配置
#自定义socketio配置,你可以直接硬编码,看个人喜好
socketio:
# socketio请求地址
host: 127.0.0.1
# socketio端口
port: 9999
# 设置最大每帧处理数据的长度,防止他人利用大数据来攻击服务器
maxFramePayloadLength: 1048576
# 设置http交互最大内容长度
maxHttpContentLength: 1048576
# socket连接数大小(如只监听一个端口boss线程组为1即可)
bossCount: 1
# 连接数大小
workCount: 100
# 允许客户请求
allowCustomRequests: true
# 协议升级超时时间(毫秒),默认10秒。HTTP握手升级为ws协议超时时间
upgradeTimeout: 1000000
# Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件
pingTimeout: 6000000
# Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔
pingInterval: 25000
# 命名空间,多个以逗号分隔,
namespaces: /test,/socketIO
#namespaces: /socketIO
3、socketIO的config代码
package com.gzgs.socketio.common.config;
import com.corundumstudio.socketio.SocketConfig;
import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.annotation.SpringAnnotationScanner;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Arrays;
import java.util.Optional;
@Configuration
public class SocketIOConfig {
@Value("${socketio.host}")
private String host;
@Value("${socketio.port}")
private Integer port;
@Value("${socketio.bossCount}")
private int bossCount;
@Value("${socketio.workCount}")
private int workCount;
@Value("${socketio.allowCustomRequests}")
private boolean allowCustomRequests;
@Value("${socketio.upgradeTimeout}")
private int upgradeTimeout;
@Value("${socketio.pingTimeout}")
private int pingTimeout;
@Value("${socketio.pingInterval}")
private int pingInterval;
@Value("${socketio.namespaces}")
private String[] namespaces;
@Bean
public SocketIOServer socketIOServer() {
SocketConfig socketConfig = new SocketConfig();
socketConfig.setTcpNoDelay(true);
socketConfig.setSoLinger(0);
com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
config.setSocketConfig(socketConfig);
config.setHostname(host);
config.setPort(port);
config.setBossThreads(bossCount);
config.setWorkerThreads(workCount);
config.setAllowCustomRequests(allowCustomRequests);
config.setUpgradeTimeout(upgradeTimeout);
config.setPingTimeout(pingTimeout);
config.setPingInterval(pingInterval);
//服务端
final SocketIOServer server = new SocketIOServer(config);
//添加命名空间(如果你不需要命名空间,下面的代码可以去掉)
Optional.ofNullable(namespaces).ifPresent(nss ->
Arrays.stream(nss).forEach(server::addNamespace));
return server;
}
//这个对象是用来扫描socketio的注解,比如 @OnConnect、@OnEvent
@Bean
public SpringAnnotationScanner springAnnotationScanner() {
return new SpringAnnotationScanner(socketIOServer());
}
}
4、SocketIOServer启动或关闭
我在启动类里面定义了启动或者关闭SocketIOServer
package com.gzgs.socketio;
import com.corundumstudio.socketio.SocketIOServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.stereotype.Component;
@SpringBootApplication
public class SocketioServerApplication {
public static void main(String[] args) {
SpringApplication.run(SocketioServerApplication.class, args);
}
}
@Component
@Slf4j
class SocketIOServerRunner implements CommandLineRunner, DisposableBean {
@Autowired
private SocketIOServer socketIOServer;
@Override
public void run(String... args) throws Exception {
socketIOServer.start();
log.info("SocketIOServer==============================启动成功");
}
@Override
public void destroy() throws Exception {
//如果用kill -9 这个监听是没用的,有可能会导致你服务kill掉了,但是socket服务没有kill掉
socketIOServer.stop();
log.info("SocketIOServer==============================关闭成功");
}
}
springboot整合socketIO的工作已经完成了
5、项目目录结构
参考下即可,核心是如何配置以及如何启动/关闭SocketIO
二、客户端和服务端建立连接
1、服务端
1.1 用户缓存信息ClientCache
package com.gzgs.socketio.common.cache;
import com.corundumstudio.socketio.SocketIOClient;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
/**
* 这是存储用户的缓存信息
*/
@Component
public class ClientCache {
//用于存储用户的socket缓存信息
private static ConcurrentHashMapString, HashMapUUID, SocketIOClient>> concurrentHashMap = new ConcurrentHashMap>();
//保存用户信息
public void saveClient(String userId,UUID sessionId,SocketIOClient socketIOClient){
HashMapUUID, SocketIOClient> sessionIdClientCache = concurrentHashMap.get(userId);
if(sessionIdClientCache == null){
sessionIdClientCache = new HashMap>();
}
sessionIdClientCache.put(sessionId,socketIOClient);
concurrentHashMap.put(userId,sessionIdClientCache);
}
//获取用户信息
public HashMapUUID,SocketIOClient> getUserClient(String userId){
return concurrentHashMap.get(userId);
}
//根据用户id和session删除用户某个session信息
public void deleteSessionClientByUserId(String userId,UUID sessionId){
concurrentHashMap.get(userId).remove(sessionId);
}
//删除用户缓存信息
public void deleteUserCacheByUserId(String userId){
concurrentHashMap.remove(userId);
}
}
1.2 SocketIOServerHandler
用于监听客户端的建立连接请求和关闭连接请求
package com.gzgs.socketio.common.handler;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.annotation.OnConnect;
import com.corundumstudio.socketio.annotation.OnDisconnect;
import com.gzgs.socketio.common.cache.ClientCache;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Slf4j
@Component
public class SocketIOServerHandler {
@Autowired
private ClientCache clientCache;
/**
* 建立连接
* @param client 客户端的SocketIO
*/
@OnConnect
public void onConnect(SocketIOClient client) {
//因为我定义用户的参数为userId,你也可以定义其他名称 客户端请求 http://localhost:9999?userId=12345
//下面两种是加了命名空间的,他会请求对应命名空间的方法(就类似你进了不同的房间玩游戏)
//因为我定义用户的参数为userId,你也可以定义其他名称 客户端请求 http://localhost:9999/test?userId=12345
//因为我定义用户的参数为userId,你也可以定义其他名称 客户端请求 http://localhost:9999/SocketIO?userId=12345
String userId = client.getHandshakeData().getSingleUrlParam("userId");
//同一个页面sessionid一样的
UUID sessionId = client.getSessionId();
//保存用户的信息在缓存里面
clientCache.saveClient(userId,sessionId,client);
log.info("SocketIOServerHandler-用户id:{},sessionId:{},建立连接成功",userId,sessionId);
}
/**
* 关闭连接
* @param client 客户端的SocketIO
*/
@OnDisconnect
public void onDisconnect(SocketIOClient client){
//因为我定义用户的参数为userId,你也可以定义其他名称
String userId = client.getHandshakeData().getSingleUrlParam("userId");
//sessionId,页面唯一标识
UUID sessionId = client.getSessionId();
//clientCache.deleteUserCacheByUserId(userId);
//只会删除用户某个页面会话的缓存,不会删除该用户不同会话的缓存,比如:用户同时打开了谷歌和QQ浏览器,当你关闭谷歌时候,只会删除该用户谷歌的缓存会话
clientCache.deleteSessionClientByUserId(userId,sessionId);
log.info("SocketIOServerHandler-用户id:{},sessionId:{},关闭连接成功",userId,sessionId);
}
}
2、客户端
直接复制建立html文件,在浏览器打开就可以使用了
DOCTYPE html>
html>
head>
meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
title>SocketIO客户端测试环境title>
base>
script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.min.js">script>
script src="https://cdn.bootcss.com/socket.io/2.1.1/socket.io.js">script>
style>
body {
padding: 20px;
}
#console {
height: 450px;
overflow: auto;
}
.connect-msg {
color: green;
}
.disconnect-msg {
color: red;
}
style>
head>
body>
h1>客户端测试环境h1>
hr style="height:1px;border:none;border-top:1px solid black;" />
div style="width: 700px; float: left">
h3>SocketClient建立连接h3>
div style="border: 1px;">
label>socketio服务端地址:label>
input type="text" id="url" value="http://localhost:9999?userId=12345" style="width: 500px;">
br>
br>
button id="connect" style="width: 100px;">建立连接button>
button id="disconnect" style="width: 100px;">断开连接button>
div>
hr style="height:1px;border:none;border-top:1px solid black;" />
h3>SocketClient发送消息h3>
div style="border: 1px;">
label>socketEvent名称:label>input type="text" id="socketEvent" value="getUserRooms">
br>br>
textarea id="content" maxlength="1000" cols="40" rows="5" placeholder="请输入内容">textarea>
button id="send" style="width: 100px;">发送消息button>
div>
hr style="height:1px;border:none;border-top:1px solid black;" />
div>
div style="float: left;margin-left: 50px;">
h3>SocketIO互动消息h3>
button id="clean" style="width: 100px;">清理输出button>
div id="console" class="well">div>
div>
body>
script type="text/javascript">
var socket ;
var errorCount = 0;
var isConnected = false;
var maxError = 5;
//连接
function connect(url) {
//var opts = {
// query: 'userId='+userId
//};
//socket = io.connect(url, opts);
socket = io.connect(url);
//socket.nsp = "/socketIO";//定义命名空间
console.log(socket)
//监听本次连接回调函数
socket.on('connect', function () {
isConnected =true;
console.log("连接成功");
serverOutput(''+getNowTime()+' 连接成功');
errorCount=0;
});
//监听消息
socket.on('message', function (data) {
output(''+getNowTime()+' ' + data + ' ');
console.log(data);
});
//监听断开
socket.on('disconnect', function () {
isConnected =false;
console.log("连接断开");
serverOutput(''+getNowTime()+' ' + '已下线! ');
});
//监听断开错误
socket.on('connect_error', function(data){
serverOutput(''+getNowTime()+' ;' + '连接错误-'+data+' ');
errorCount++;
if(errorCount>=maxError){
socket.disconnect();
}
});
//监听连接超时
socket.on('connect_timeout', function(data){
serverOutput(''+getNowTime()+' ' + '连接超时-'+data+' ');
errorCount++;
if(errorCount>=maxError){
socket.disconnect();
}
});
//监听错误
socket.on('error', function(data){
serverOutput(''+getNowTime()+' ' + '系统错误-'+data+' ');
errorCount++;
if(errorCount>=maxError){
socket.disconnect();
}
});
/*socket.on('ack', function(data){
console.log("ack:"+data)
var str = '消息发送失败';
if(data==1){
str = '消息发送成功';
}
serverOutput(''+getNowTime()+' ' + str+' ');
});*/
}
function output(message) {
var element = $("" + " " + message + "
");
$('#console').prepend(element);
}
function serverOutput(message) {
var element = $("" + message + "
");
$('#console').prepend(element);
}
//连接
$("#connect").click(function(){
if(!isConnected){
var url = $("#url").val();
connect(url);
}else {
serverOutput(''+getNowTime()+' ' + '已经成功建立连接,不要重复建立!!! ');
}
})
//断开连接
$("#disconnect").click(function(){
if(isConnected){
socket.disconnect();
}
})
//发送消息
$("#send").click(function(){
var socketEvent = $("#socketEvent").val();//自定义的事件名称
var content = $("#content").val();//发送的内容
socket.emit(socketEvent,content,function(data1,data2){
console.log("ack1:"+data1);
console.log("ack2:"+data2);
});
})
//清理消息
$("#clean").click(function(){
$('#console').html("");
})
function getNowTime(){
var date=new Date();
var year=date.getFullYear(); //获取当前年份
var mon=date.getMonth()+1; //获取当前月份
var da=date.getDate(); //获取当前日
var h=date.getHours(); //获取小时
var m=date.getMinutes(); //获取分钟
var s=date.getSeconds(); //获取秒
var ms=date.getMilliseconds();
var d=document.getElementById('Date');
var date =year+'/'+mon+'/'+da+' '+h+':'+m+':'+s+':'+ms;
return date;
}
script>
html>
html效果如下:
3、简单的演示
自己点击建立连接和断开连接按钮测试玩下
ps:http://localhost:9999?userId=12345是没有命名空间的请求
三、广播
1、SocketIO基础概念图
SocketIO、namespace(命名空间)、room(房间)的关系如下:
SocketIO广播是以namespace或者room为维度的,具体如下:
如果不定义namespace,默认是/
如果定义了namespace,没有定义room,房间默认的名字和namespace一样。
2、定义namespace
你也可以这样定义
server.addNamespace(“/test”);
server.addNamespace(“/socketIO”);
3、创建namespace所属的Handler
3.1 自定义Handler
package com.gzgs.socketio.common.handler;
import com.corundumstudio.socketio.AckRequest;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.annotation.OnEvent;
import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class TestHandler {
//测试使用
@OnEvent("testHandler")
public void testHandler(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException {
log.info("MyTestHandler:{}",data);
if(ackRequest.isAckRequested()){
//返回给客户端,说我接收到了
ackRequest.sendAckData("MyTestHandler",data);
}
}
}
package com.gzgs.socketio.common.handler;
import com.corundumstudio.socketio.AckRequest;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.annotation.OnEvent;
import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class SocketIOHandler {
//测试使用
@OnEvent("socketIOHandler")
public void testHandler(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException {
log.info("SocketIOHandler:{}",data);
if(ackRequest.isAckRequested()){
//返回给客户端,说我接收到了
ackRequest.sendAckData("SocketIOHandler",data);
}
}
}
3.2 监听自定义Handler
在启动类的SocketIO监听里面加入监听
package com.gzgs.socketio;
import com.corundumstudio.socketio.SocketIOServer;
import com.gzgs.socketio.common.handler.SocketIOHandler;
import com.gzgs.socketio.common.handler.TestHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.stereotype.Component;
@SpringBootApplication
public class SocketioServerApplication {
public static void main(String[] args) {
SpringApplication.run(SocketioServerApplication.class, args);
}
}
@Component
@Slf4j
class SocketIOServerRunner implements CommandLineRunner, DisposableBean {
@Autowired
private SocketIOServer socketIOServer;
@Autowired
private TestHandler testHandler;
@Autowired
private SocketIOHandler socketIOHandler;
@Override
public void run(String... args) throws Exception {
//namespace分别交给各自的Handler监听,这样就可以隔离,只有客户端指定namespace,才能访问对应Handler。
//比如:http://localhost:9999/test?userId=12345
socketIOServer.getNamespace("/test").addListeners(testHandler);
socketIOServer.getNamespace("/socketIO").addListeners(socketIOHandler);
socketIOServer.start();
log.info("SocketIOServer==============================启动成功");
}
@Override
public void destroy() throws Exception {
socketIOServer.stop();
log.info("SocketIOServer==============================关闭成功");
}
}
3.3演示
3.3.1 正确演示
3.3.1 错误演示
四、常用方法
其他的一些测试我写在下面的代码上,自己去测试才能更好的理解
1、加入房间
//加入房间
@OnEvent("joinRoom")
public void joinRooms(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException {
client.joinRoom(data);
if(ackRequest.isAckRequested()){
//返回给客户端,说我接收到了
ackRequest.sendAckData("加入房间","成功");
}
}
2、离开房间
//离开房间
@OnEvent("leaveRoom")
public void leaveRoom(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException {
client.leaveRoom(data);
if(ackRequest.isAckRequested()){
//返回给客户端,说我接收到了
ackRequest.sendAckData("离开房间","成功");
}
}
3、获取用户所有房间
//获取该用户所有房间
@OnEvent("getUserRooms")
public void getUserRooms(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException {
String userId = client.getHandshakeData().getSingleUrlParam("userId");
SetString> allRooms = client.getAllRooms();
for (String room:allRooms){
System.out.println("房间名称:"+room);
}
log.info("服务器收到消息,客户端用户id:{} | 客户发送的消息:{} | 是否需要返回给客户端内容:{} ",userId,data,ackRequest.isAckRequested());
if(ackRequest.isAckRequested()){
//返回给客户端,说我接收到了
ackRequest.sendAckData("你好","哈哈哈");
}
}
4、发送消息给指定的房间
@OnEvent("sendRoomMessage")
public void sendRoomMessage(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException {
String userId = client.getHandshakeData().getSingleUrlParam("userId");
SetString> allRooms = client.getAllRooms();
for (String room:allRooms){
log.info("房间:{}",room);
//发送给指定空间名称以及房间的人,并且排除不发给自己
socketIoServer.getNamespace("/socketIO").getRoomOperations(room).sendEvent("message",client, data);
//发送给指定空间名称以及房间的人,包括自己
//socketIoServer.getNamespace("/socketIO").getRoomOperations(room).sendEvent("message", data);;
}
if(ackRequest.isAckRequested()){
//返回给客户端,说我接收到了
ackRequest.sendAckData("发送消息到指定的房间","成功");
}
}
5、广播消息给指定的Namespace下所有客户端
//广播消息给指定的Namespace下所有客户端
@OnEvent("sendNamespaceMessage")
public void sendNamespaceMessage(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException {
socketIoServer.getNamespace("/socketIO").getBroadcastOperations().sendEvent("message",client, data);;
if(ackRequest.isAckRequested()){
//返回给客户端,说我接收到了
ackRequest.sendAckData("发送消息到指定的房间","成功");
}
}
6、点对点发送
//点对点
public void sendMessageOne(String userId) throws JsonProcessingException {
HashMapUUID, SocketIOClient> userClient = clientCache.getUserClient(userId);
for (UUID sessionId : userClient.keySet()) {
socketIoServer.getNamespace("/socketIO").getClient(sessionId).sendEvent("message", "这是点对点发送");
}
}