SpringBoot 集成 WebFlux,请求大模型实现 “打字机” 流式响应效果
- 一、WebFlux 简介
-
- 1.1 Spring MVC 和 Spring WebFlux 的区别?
- 1.2 Spring MVC 和 Spring WebFlux 是否可以同时存在?
- 1.3 WebFlux 之 Reactor 框架
- 二、WebFlux 实战,实现 “打字机” 流式响应效果
-
- 2.1 pom依赖
- 2.2 Controller 核心代码
- 2.3 Service 核心代码
- 2.4 问题记录
-
- 2.4.1 Nginx缓存问题
一、WebFlux 简介
WebFlux官方文档
1.1 Spring MVC 和 Spring WebFlux 的区别?
Spring MVC 和 Spring WebFlux 都是Spring框架中用于构建Web应用程序的模块,它们之间的主要区别在于它们处理并发请求的方式和所采用的编程模型。
1、编程模型:
- Spring MVC: 采用同步的、阻塞的编程模型。每个请求都会在一个单独的线程中处理,线程会一直阻塞直到请求完成。
- Spring WebFlux: 采用异步的、非阻塞的编程模型。它基于Reactive Streams标准,使用反应式编程的理念,可以更有效地处理大量并发请求,减少线程资源的浪费。
2、并发处理:
- Spring MVC: 使用Servlet API中的阻塞IO来处理请求,每个请求需要一个独立的线程,如果线程池中的线程用尽,新的请求就会被阻塞。
- Spring WebFlux: 使用非阻塞IO,通过少量的线程处理大量的并发请求。这可以提高系统的吞吐量,因为不需要为每个请求分配一个独立的线程。
3、适用场景:
- Spring MVC: 适用于传统的同步IO的应用场景,特别是那些对实时性要求不是很高的场景。
- Spring WebFlux: 适用于需要处理大量并发请求、对实时性要求高的场景,比如实时通信、实时数据推送等。
总的来说,如果需要处理大量并发请求,并且对实时性有较高要求,可以选择Spring WebFlux
1.2 Spring MVC 和 Spring WebFlux 是否可以同时存在?
答案:可以同时存在。如官方文档所述:
1.3 WebFlux 之 Reactor 框架
在Spring WebFlux中,Reactor是一种基于响应式编程的框架,用于处理异步数据流。Reactor实际上是一个库,它提供了一组用于处理反应式流的API和工具。Spring WebFlux通过Reactor来实现响应式的Web编程模型。
以下是Reactor的一些关键概念:
1、Flux(流): Flux是Reactor中表示包含零个或多个元素的异步序列的主要类型。它可以发出零到N个元素,并且可以是无限的。在WebFlux中,Flux通常用于表示请求和响应的数据流。
FluxInteger> numbers = Flux.just(1, 2, 3, 4, 5);
2、Mono(单值): Mono是Reactor中表示包含零个或一个元素的异步序列的类型。它可以发出零或一个元素,类似于Java 8中的Optional。
MonoString> value = Mono.just("Hello, Reactor!");
3、Scheduler(调度器): Reactor提供了调度器来控制异步操作的执行。调度器用于在不同的线程或线程池中执行操作,以避免阻塞。
Scheduler scheduler = Schedulers.parallel();
4、操作符(Operators): Reactor提供了丰富的操作符,用于在Flux和Mono上执行各种转换和操作。这些操作符包括映射、过滤、合并、错误处理等。
FluxInteger> doubled = numbers.map(n -> n * 2);
二、WebFlux 实战,实现 “打字机” 流式响应效果
2.1 pom依赖
dependency>
groupId>org.springframework.bootgroupId>
artifactId>spring-boot-starter-webfluxartifactId>
dependency>
2.2 Controller 核心代码
@RestController
@RequestMapping("/chat")
@Api(value = "会话管理", tags = "会话管理")
public class ChatMsgController {
@WebLog
@ApiOperation("主对话")
@PostMapping(value = "/main", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public FluxString> chat(@Valid @RequestBody ChatParam param) {
// ...
}
}
关键点:
1)设置接口返回值:Flux
2)设置接口响应MediaType:TEXT_EVENT_STREAM_VALUE
2.3 Service 核心代码
问答回调接口ChatCallBack:
public interface ChatCallBack {
/**
* 流式处理过程中回调
*/
String onNext(String data, ChatContext context);
/**
* 流式处理完成回调
*/
void completed(ChatContext context);
/**
* 流式处理失败回调
*/
void error(ChatContext context);
}
问答监听类ChatSubscriber:
@Slf4j
public class ChatSubscriber implements SubscriberString>, Disposable {
private final FluxSinkString> emitter;
private Subscription subscription;
private final ChatContext context;
private final ChatCallBack callBack;
public ChatSubscriber(FluxSinkString> emitter,
ChatCallBack callBack, ChatContext context) {
this.emitter = emitter;
this.callBack = callBack;
this.context = context;
this.context.setAnswer(new StringBuilder());
this.context.setDocs(new StringBuilder());
}
@Override
public void onSubscribe(Subscription subscription) {
// 订阅开始,初始化subscription
this.subscription = subscription;
// 请求接收一个数据项
subscription.request(1);
}
@Override
public void onNext(String data) {
try {
data = callBack.onNext(data, context);
} catch (Exception e) {
log.error("大模型问答异常", e);
} finally {
// todo 临时打印日志
log.info("=============== data: {}", data);
// 将数据发送给前端
emitter.next(data);
// 继续请求接收下一个数据项
subscription.request(1);
}
}
@Override
public void onError(Throwable t) {
// 处理数据流完成后的回调逻辑
try {
callBack.error(context);
log.error("大模型问答异常", t);
} catch (Exception e) {
log.error("大模型问答异常", e);
} finally {
dispose();
}
}
@Override
public void onComplete() {
// 处理数据流完成后的回调逻辑
try {
callBack.completed(context);
log.info("大模型问答订阅结束");
} catch (Exception e) {
log.error("大模型问答异常", e);
} finally {
dispose();
}
}
@Override
public void dispose() {
// 数据流结束,取消订阅
emitter.complete();
}
}
问答抽象模板类AbstractChatService:
@Slf4j
@Component
public abstract class AbstractChatService implements ChatCallBack {
private WebClient webClient;
@PostConstruct
private void init() {
webClient = WebClient.builder()
.defaultHeader(HttpHeaders.CONTENT_TYPE, "application/json").build();
}
/**
* 问答统一入口
*/
public FluxString> request(ChatContext context) {
return Flux.create(emitter -> {
FluxString> response = this.doRequest(buildRequest(context));
ChatSubscriber subscriber = new ChatSubscriber(emitter, this, context);
response.subscribe(subscriber);
emitter.onDispose(subscriber);
});
}
/**
* 使用WebClient请求大模型流式响应接口
*/
private FluxString> doRequest(Request request) {
log.info("请求大模型开始,URL:{}, 参数:{}", request.getUrl(), request.getJsonBody());
return webClient.post()
.uri(request.getUrl())
.accept(MediaType.TEXT_EVENT_STREAM)
.bodyValue(request.getJsonBody())
.retrieve()
.bodyToFlux(String.class)
.onErrorResume(WebClientResponseException.class, ex -> {
log.error("请求大模型接口异常", ex);
return Flux.just(JsonUtils.toJson(answer(CommonError.GLOBAL_ERROR.getMsg())));
});
}
}
问答实现类ChatService:
@Slf4j
@Service("chatService")
public class ChatService extends AbstractChatService {
@Override
public String onNext(String data, ChatContext context) {
// 解析响应数据项
JsonNode jsonNode = JsonUtils.toJsonNode(data);
if (jsonNode == null || !jsonNode.isObject()) {
return data;
}
ObjectNode objectNode = (ObjectNode) jsonNode;
// 处理响应
objectNode.put(...);
// ...
return JsonUtils.toJson(jsonNode);
}
@Override
public void completed(ChatContext context) {
// 更新对话
updateChatMsg(context);
// 保存对话关联的索引
saveChatMsgIndex(context);
}
@Override
public void error(ChatContext context) {
// 更新对话
updateChatMsg(context);
}
}
2.4 问题记录
2.4.1 Nginx缓存问题
- 现象:答案一段一段的输出且卡顿明显,正常应该是一个字一个字的输出。
- 原因:项目使用了Nginx,Nginx的
proxy_buffering
配置是默认启用的,这个配置开启后会存在一个代理缓冲区。 - 解决方案:关闭
proxy_buffering
配置,设置为off。