目录
前言
技术栈
功能展示
一、springboot项目添加netty依赖
二、netty服务端
三、netty客户端
四、测试
五、代码仓库地址
专属小彩蛋:前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站(前言 – 床长人工智能教程)
前言
最近做了一个硬件设备通信项目,需求是这样,前端使用webSocket向后端进行tcp协议的通信,后端netty服务端收到数据后,将数据发往socket客户端,客户端收到数据之后需要进行响应数据显示到前端页面供用户进行实时监控。
技术栈
后端
- springboot
- netty
前端
- 前端websocket
功能展示
前端页面输入webSocket地址,点击连接,输入待发送的数据,点击发送
后端我们可以使用网络测试工具NetAssist 进行响应测试
在工具中连接netty服务端,并点击发送按钮,可以看到,前端页面右侧对话框成功显示出了NetAssist测试工具响应的数据内容。接下来我们来看一看代码如何进行实现,关键的点在于需要同时支持前端websocket和后端socket的连接,需要自定义一个协议选择处理器。
一、springboot项目添加netty依赖
4.0.0
org.springframework.boot
spring-boot-starter-parent
2.7.12
com.example.dzx.netty
qiyan-project
0.0.1-SNAPSHOT
qiyan-project
Demo project for Spring Boot
1.8
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-configuration-processor
true
org.projectlombok
lombok
true
org.springframework.boot
spring-boot-starter-test
test
org.springframework.boot
spring-boot-autoconfigure
2.6.7
io.netty
netty-all
4.1.52.Final
org.springframework.boot
spring-boot-maven-plugin
org.projectlombok
lombok
二、netty服务端
(1)netty服务启动类
package com.example.dzx.netty.qiyanproject.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
/**
* @author dzx
* @ClassName:
* @Description: netty服务启动类
* @date 2023年06月30日 21:27:16
*/
@Slf4j
@Component
public class NettyServer {
public void start(InetSocketAddress address) {
//配置服务端的NIO线程组
/*
* 在Netty中,事件循环组是一组线程池,用于处理网络事件,例如接收客户端连接、读写数据等操作。
* 它由两个部分组成:bossGroup和workerGroup。
* bossGroup 是负责接收客户端连接请求的线程池。
* workerGroup 是负责处理客户端连接的线程池。
* */
EventLoopGroup bossGroup = new NioEventLoopGroup(10);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//创建ServerBootstrap实例,boss组用于接收客户端连接请求,worker组用于处理客户端连接。
ServerBootstrap bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup) // 绑定线程池
.channel(NioServerSocketChannel.class)//通过TCP/IP方式进行传输
.childOption(ChannelOption.SO_REUSEADDR, true) //快速复用端口
.childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000)
.localAddress(address)//监听服务器地址
.childHandler(new NettyServerChannelInitializer())
// .childHandler(new com.ccp.dev.system.netty.NettyServerChannelInitializer())
.childOption(ChannelOption.TCP_NODELAY, true)//子处理器处理客户端连接的请求和数据
.option(ChannelOption.SO_BACKLOG, 1024) //服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝
.childOption(ChannelOption.SO_KEEPALIVE, true); //保持长连接,2小时无数据激活心跳机制
// 绑定端口,开始接收进来的连接
ChannelFuture future = bootstrap.bind(address).sync();
future.addListener(l -> {
if (future.isSuccess()) {
System.out.println("Netty服务启动成功");
} else {
System.out.println("Netty服务启动失败");
}
});
log.info("Netty服务开始监听端口: " + address.getPort());
//关闭channel和块,直到它被关闭
future.channel().closeFuture().sync();
} catch (Exception e) {
log.error("启动Netty服务器时出错", e);
} finally {
//释放资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
(2)服务端初始化类编写,客户端与服务器端连接一旦创建,这个类中方法就会被回调,设置出站编码器和入站解码器以及各项处理器
package com.example.dzx.netty.qiyanproject.server;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import org.springframework.stereotype.Component;
/**
* @author dzx
* @ClassName:
* @Description: 服务端初始化,客户端与服务器端连接一旦创建,这个类中方法就会被回调,设置出站编码器和入站解码器以及各项处理器
* @date 2023年06月30日 21:27:16
*/
@Component
public class NettyServerChannelInitializer extends ChannelInitializer {
// private FullHttpResponse createCorsResponseHeaders() {
// FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
//
// // 设置允许跨域访问的响应头
// response.headers().set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
// response.headers().set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_METHODS, "GET, POST, PUT, DELETE");
// response.headers().set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS, "Content-Type, Authorization");
// response.headers().set(HttpHeaderNames.ACCESS_CONTROL_MAX_AGE, "3600");
//
// return response;
// }
@Override
protected void initChannel(SocketChannel channel) {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("active", new ChannelActiveHandler());
//Socket 连接心跳检测
pipeline.addLast("idleStateHandler", new IdleStateHandler(60, 0, 0));
pipeline.addLast("socketChoose", new SocketChooseHandler());
pipeline.addLast("commonhandler",new NettyServerHandler());
}
}
(3) 编写新建连接处理器
package com.example.dzx.netty.qiyanproject.server;
import com.example.dzx.netty.qiyanproject.constants.General;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
/**
* @author dzx
* @ClassName:
* @Description: 客户端新建连接处理器
* @date 2023年06月30日 21:27:16
*/
@ChannelHandler.Sharable
@Slf4j
public class ChannelActiveHandler extends ChannelInboundHandlerAdapter {
/**
* 有客户端连接服务器会触发此函数
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
//获取客户端连接的远程地址
InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
//获取客户端的IP地址
String clientIp = insocket.getAddress().getHostAddress();
//获取客户端的端口号
int clientPort = insocket.getPort();
//获取连接通道唯一标识
ChannelId channelId = ctx.channel().id();
//如果map中不包含此连接,就保存连接
if (General.CHANNEL_MAP.containsKey(channelId)) {
log.info("Socket------客户端【" + channelId + "】是连接状态,连接通道数量: " + General.CHANNEL_MAP.size());
} else {
//保存连接
General.CHANNEL_MAP.put(channelId, ctx);
log.info("Socket------客户端【" + channelId + "】连接netty服务器[IP:" + clientIp + "--->PORT:" + clientPort + "]");
log.info("Socket------连接通道数量: " + General.CHANNEL_MAP.size());
}
}
}
(4)编写协议初始化解码器,用来判定实际使用什么协议(实现websocket和socket同时支持的关键点就在这里)
package com.example.dzx.netty.qiyanproject.server;
/**
* @author 500007
* @ClassName:
* @Description:
* @date 2023年06月30日 21:29:17
*/
import com.example.dzx.netty.qiyanproject.constants.General;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
/**
* @author dzx
* @ClassName:
* @Description: 协议初始化解码器.用来判定实际使用什么协议,以用来处理前端websocket或者后端netty客户端的连接或通信
* @date 2023年06月30日 21:31:24
*/
@Component
@Slf4j
public class SocketChooseHandler extends ByteToMessageDecoder {
/** 默认暗号长度为23 */
private static final int MAX_LENGTH = 23;
/** WebSocket握手的协议前缀 */
private static final String WEBSOCKET_PREFIX = "GET /";
@Resource
private SpringContextUtil springContextUtil;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List
(5)给NettyServerChannelInitializer初始化类中的commonhandler添加前置处理器
package com.example.dzx.netty.qiyanproject.server;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.springframework.stereotype.Component;
/**
* @author dzx
* @ClassName:
* @Description: 给NettyServerChannelInitializer初始化类中的commonhandler添加前置处理器
* @date 2023年06月30日 21:31:24
*/
@Component
public class PipelineAdd {
public void websocketAdd(ChannelHandlerContext ctx) {
System.out.println("PipelineAdd");
ctx.pipeline().addBefore("commonhandler", "http-codec", new HttpServerCodec());
ctx.pipeline().addBefore("commonhandler", "aggregator", new HttpObjectAggregator(999999999));
ctx.pipeline().addBefore("commonhandler", "http-chunked", new ChunkedWriteHandler());
// ctx.pipeline().addBefore("commonhandler","WebSocketServerCompression",new WebSocketServerCompressionHandler());
ctx.pipeline().addBefore("commonhandler", "ProtocolHandler", new WebSocketServerProtocolHandler("/ws"));
// ctx.pipeline().addBefore("commonhandler","StringDecoder",new StringDecoder(CharsetUtil.UTF_8)); // 解码器,将字节转换为字符串
// ctx.pipeline().addBefore("commonhandler","StringEncoder",new StringEncoder(CharsetUtil.UTF_8));
// HttpServerCodec:将请求和应答消息解码为HTTP消息
// ctx.pipeline().addBefore("commonhandler","http-codec",new HttpServerCodec());
//
// // HttpObjectAggregator:将HTTP消息的多个部分合成一条完整的HTTP消息
// ctx.pipeline().addBefore("commonhandler","aggregator",new HttpObjectAggregator(999999999));
//
// // ChunkedWriteHandler:向客户端发送HTML5文件,文件过大会将内存撑爆
// ctx.pipeline().addBefore("commonhandler","http-chunked",new ChunkedWriteHandler());
//
// ctx.pipeline().addBefore("commonhandler","WebSocketAggregator",new WebSocketFrameAggregator(999999999));
//
// //用于处理websocket, /ws为访问websocket时的uri
// ctx.pipeline().addBefore("commonhandler","ProtocolHandler", new WebSocketServerProtocolHandler("/ws"));
}
}
(6)编写业务处理器
package com.example.dzx.netty.qiyanproject.server;
import com.example.dzx.netty.qiyanproject.constants.General;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.Set;
import java.util.stream.Collectors;
/**
* @author dzx
* @ClassName:
* @Description: netty服务端处理类
* @date 2023年06月30日 21:27:16
*/
@Slf4j
public class NettyServerHandler extends SimpleChannelInboundHandler
(7)spring上下文工具类
package com.example.dzx.netty.qiyanproject.netty1;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
/**
* @author dzx
* @ClassName:
* @Description: spring容器上下文工具类
* @date 2023年06月30日 21:30:02
*/
@Component
public class SpringContextUtil implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
SpringContextUtil.applicationContext = applicationContext;
}
/**
* @Description: 获取spring容器中的bean, 通过bean类型获取
*/
public static T getBean(Class beanClass) {
return applicationContext.getBean(beanClass);
}
}
(8)编写全局map常量类
package com.example.dzx.netty.qiyanproject.constants;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author 500007
* @ClassName:
* @Description:
* @date 2023年07月02日 19:12:42
*/
public class General {
/**
* 管理一个全局map,保存连接进服务端的通道数量
*/
public static final ConcurrentHashMap CHANNEL_MAP = new ConcurrentHashMap();
/**
* 管理一个全局mao, 报存连接进服务器的各个通道类型
*/
public static final ConcurrentHashMap CHANNEL_TYPE_MAP = new ConcurrentHashMap();
}
三、netty客户端
(1)编写netty客户端,用于测试向服务端的消息发送
package com.example.dzx.netty.qiyanproject.Socket;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author dzx
* @ClassName:
* @Description: netty客户端
* @date 2023年06月30日 21:30:02
*/
public class SocketClient {
// 服务端IP
static final String HOST = System.getProperty("host", "127.0.0.1");
// 服务端开放端口
static final int PORT = Integer.parseInt(System.getProperty("port", "7777"));
// 日志打印
private static final Logger LOGGER = LoggerFactory.getLogger(SocketClient.class);
// 主函数启动
public static void main(String[] args) throws InterruptedException {
sendMessage("我是客户端,我发送了一条数据给netty服务端。。");
}
/**
* 核心方法(处理:服务端向客户端发送的数据、客户端向服务端发送的数据)
*/
public static void sendMessage(String content) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new SocketChannelInitializer());
}
});
ChannelFuture future = b.connect(HOST, PORT).sync();
for (int i = 0; i
(2)编写netty客户端初始化处理器
package com.example.dzx.netty.qiyanproject.Socket;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
/**
* @author dzx
* @ClassName:
* @Description: netty客户端初始化时设置出站和入站的编码器和解码器
* @date 2023年06月30日 21:30:02
*/
public class SocketChannelInitializer extends ChannelInitializer {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
p.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
p.addLast(new SocketHandler());
}
}
(3)netty客户端业务处理器,用于接收并处理服务端发送的消息数据
package com.example.dzx.netty.qiyanproject.Socket;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
/**
* @author dzx
* @ClassName:
* @Description: netty客户端处理器
* @date 2023年06月30日 21:30:02
*/
@Slf4j
public class SocketHandler extends ChannelInboundHandlerAdapter {
// 日志打印
private static final Logger LOGGER = LoggerFactory.getLogger(SocketHandler.class);
@Override
public void channelActive(ChannelHandlerContext ctx) {
LOGGER.debug("SocketHandler Active(客户端)");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
LOGGER.debug("####接收服务端发送过来的消息####");
LOGGER.debug("SocketHandler read Message:" + msg);
//获取服务端连接的远程地址
InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
//获取服务端的IP地址
String clientIp = insocket.getAddress().getHostAddress();
//获取服务端的端口号
int clientPort = insocket.getPort();
log.info("netty服务端[IP:" + clientIp + "--->PORT:" + clientPort + "]");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
LOGGER.debug("####客户端断开连接####");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
至此,netty服务端和netty客户端都编写完毕,我们可以来进行测试了。
四、测试
(1)前端 websocket 向 后端NetAssist测试工具发送消息
在前端窗口向后端 发送了一个 22222的 字符串,后端测试工具成功接收到消息并展示在对话框中。
(2)后端NetAssist向 前端 websocket 发送消息
在后端窗口向前端 发送了一个{“deviceId”:”11111″,”deviceName”:”qz-01″,”deviceStatus”:”2″}的 字符串,前端测试工具成功接收到消息并展示在对话框中。
五、代码仓库地址
完整项目已上传至gitee仓库,请点击下方传送门自行获取,一键三连!!
https://gitee.com/dzxmy/netty-web-socketd-dnamic
无法访问就点击下方传送门去我的资源下载即可
https://download.csdn.net/download/qq_31905135/88044942?spm=1001.2014.3001.5503