目录
一、webflux介绍
1.1 什么是webflux
1.2 什么是响应式编程
1.3 webflux特点
二、Java9中响应式编程
2.1 定义事件流源
2.2 实现订阅者
三、Spring Webflux介绍
四、Reactor 介绍
五、Reactor 常用API操作
5.1 Flux 创建流操作API
5.2 Flux响应流的订阅
5.3 Flux处理实时流
六、Spring Webflux 使用
6.1 Spring Webflux简介
6.1 Spring Webflux中的核心组件
6.2 Spring Webflux基于注解的实现
6.2.1 引入核心依赖
6.2.2 核心业务类
6.2.3 核心接口类
6.3 Spring Webflux 函数式编程实现
6.3.1 自定义handler
6.3.2 自定义server服务器
6.3.3 访问效果测试
6.3.4 使用webclient调用
6.4 Spring Boot RouterFunction 整合方式一
6.5 Spring Boot RouterFunction 整合方式二
6.5.1 静态化改造
七、webflux的使用场景
八、写在文末
一、webflux介绍
1.1 什么是webflux
webflux,即响应式编程。在JDK9中开始引入了响应式编程模型,而spring5.0版本之后正式引入对webflux的支持,即spring webflux,spring webflux是spring在5.0版本后提供的一套响应式编程风格的web开发框架。
1.2 什么是响应式编程
响应式编程是一种用于处理异步数据流和事件的编程范式。它的核心思想是将数据流看作是一系列事件的序列,通过对事件流的处理来实现计算。它强调基于事件的异步处理和函数式编程的思想,可以帮助开发人员更好地处理复杂的应用程序逻辑。
而响应式编程,其实就是为这种异步非阻塞的流式编程制定的一套标准。流式编程已不陌生了,Java8提供的stream api就是这种风格。这套标准包括对运行环境(JVM、JavaScript)以及网络协议相关的规范。
1.3 webflux特点
非阻塞式
在有限资源下,提高系统吞吐量和伸缩性,以 Reactor 为基础实现响应式编程
函数式编程
Spring5 框架基于 java8,Webflux 使用 Java8 函数式编程方式实现路由请求
二、Java9中响应式编程
Java 9引入了Flow API作为响应式编程的标准实现,具体来说:
-
Flow API提供了一组接口和类,用于定义和处理数据流;
-
它基于Publisher-Subscriber模式,其中Publisher生成事件流并发布给Subscriber进行处理。
如果使用Java9中的响应式编程进行实现,核心需要两步:
- 定义事件流源;
- 实现订阅者;
下面来看一段具体的实现代码。
2.1 定义事件流源
在Flow API中,事件流源被定义为Publisher的实现类,具体来说,首先需要创建一个类实现Publisher接口,并重写其subscribe()方法。在subscribe()方法中,可以通过调用Subscriber的onSubscribe()方法来将事件流订阅给Subscriber。
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
public class EventPublisher implements Flow.Publisher {
@Override
public void subscribe(Flow.Subscriber super String> subscriber) {
subscriber.onSubscribe(new SimpleSubscription(subscriber));
}
}
2.2 实现订阅者
订阅者是实现Subscriber接口的类。在Flow API中,只需要实现Subscriber接口的onNext()、onError()和onComplete()方法;
-
当事件流发出下一个元素时,onNext()方法将被调用;
-
当发生错误时,onError()方法将被调用;
-
当事件流结束时,onComplete()方法将被调用;
在这些方法中,我们可以根据业务需要添加处理事件流的数据相关逻辑。
import java.util.concurrent.Flow;
public class EventSubscriber implements Flow.Subscriber {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1);
}
@Override
public void onNext(String item) {
System.out.println("Received item: " + item);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.err.println("Error occurred: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Event stream completed.");
}
}
测试代码
import java.util.concurrent.Flow;
public class Main {
public static void main(String[] args) {
EventPublisher publisher = new EventPublisher();
EventSubscriber subscriber = new EventSubscriber();
publisher.subscribe(subscriber);
publisher.submit("Event 1");
publisher.submit("Event 2");
publisher.submit("Event 3");
publisher.close();
}
}
三、Spring Webflux介绍
是Spring5添加新的模块,用于web开发的,功能和SpringMVC类似的,Webflux使用当前一种比较流程响应式编程出现的框架。spring官方文档地址:Web on Reactive Stack :: Spring Framework
spring-webflux是spring web框架体系中的一个组成模块,说起这个WebFlux,不难会拿出来与Spring Web与WebMvc进行比较,因为在目前很多项目开发中,仍然会使用WebMVC进行开发,尽管springboot成为基础的开发框架,但是接口开发中核心组件还是WebMVC的进一步封装。
四、Reactor 介绍
可以这么理解,响应式编程中的核心实现在于Reactor 的实现和应用,具体来说,Reactor是满足Reactive规范框架。具体来说:
-
对响应式流规范的一种实现;
-
Spring Webflux默认的响应式框架;
-
完全异步非阻塞,对背压的支持;
-
提供两个异步序列API,Flux[N]和Mono[0|1];
-
提供对响应式流的操作;
在Reactor中,有两个核心类,Flux和Mono ,这两个类实现接口 Publisher,提供丰富操作符。
-
Flux 对象实现发布者,返回 N 个元素,即产生0到N个元素的异步序列;
-
Mono 实现发布者,返回 0 或者 1 个元素,即产生至多一个元素的异步序列。
Flux 和 Mono 都是数据流的发布者,使用 Flux 和 Mono 都可以发出三种数据信号:元素值,错误信号,完成信号,错误信号和完成信号都代表终止信号,终止信号用于告诉订阅者数据流结束了,错误信号终止数据流同时把错误信息传递给订阅者。
三种数据信号特点:
- 错误信号和完成信号都是终止信号,不能共存的;
- 如果没有发送任何元素值,而是直接发送错误或者完成信号,表示是空数据流;
- 如果没有错误信号,没有完成信号,表示是无限数据流;
五、Reactor 常用API操作
接下来通过实际操作来演示下基于Reactor 常用的API的使用。引入如下依赖包。
io.projectreactor
reactor-core
3.5.5
5.1 Flux 创建流操作API
在上面提到,如果你需要创建多于一个元素的异步序列,可以考虑使用Flux 相关API,下面是使用Flux 的创建多种形式流的操作
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
public class ReactorApi {
@Test
public void fluxJust() {
Flux phones = Flux.just("小米", "三星", "华为");
}
@Test
public void fluxFromIterable() {
Flux phones = Flux.fromIterable(Arrays.asList("小米", "三星", "华为"));
}
@Test
public void fluxFromArray() {
Flux phones = Flux.fromArray(new String[]{"小米", "三星", "华为"});
}
@Test
public void fluxFromStream() {
Flux phones = Flux.fromStream(Stream.of(new String[]{"小米", "三星", "华为"}));
phones.subscribe();
phones.subscribe(); //只能被订阅一次
}
@Test
public void fluxEmpty() {
Flux phones = Flux.empty(); //generic type still honored
}
@Test
public void fluxRange() {
Flux phones = Flux.range(5, 3);
}
@Test
public void fluxGenerate() {
Flux flux = Flux.generate(
AtomicLong::new,
(state, sink) -> {
long i = state.getAndIncrement();
sink.next(i);
if (i == 10) sink.complete();
return state;
},
(state) -> System.out.println("done")
);
flux.subscribe(System.out::println);
}
@Test
public void fluxCreate() {
Flux phones = Flux.create((t) -> {
t.next("小米");
t.next("三星");
t.next("华为");
t.complete();
});
phones.subscribe(System.out::println);
System.out.println("------------");
Flux ownFluxListener = Flux.create(sink -> {
//传入自定义的方法
new MyDataListener(){
public void onReceiveData(String str){
sink.next(str);
}
public void onComplete(){
sink.complete();
}
};
}, FluxSink.OverflowStrategy.DROP);
ownFluxListener.subscribe(System.out::println);
}
public class MyDataListener{
public void onReceiveData(String str){
System.out.println("收到数据:"+str);
}
public void onComplete(){
System.out.println("完成数据的消费处理");
}
}
@Test
public void fluxDefer() {
Flux.defer(() -> Flux.just("小米", "三星", "华为"))
.subscribe(System.out::println);
Flux stockSeq4 = Flux.defer(() -> Flux.fromStream(Stream.of(new String[]{"小米", "三星", "华为"})));
stockSeq4.subscribe();
stockSeq4.subscribe();
}
@Test
public void fluxInterval() throws InterruptedException {
//interval 定时发送元素
Flux.interval(Duration.of(1, ChronoUnit.SECONDS))
.subscribe((t) -> System.out.println((String.valueOf(t))));
Thread.sleep(1000000);
}
}
5.2 Flux响应流的订阅
在上面的操作API中,调用 just 或者其他方法只是声明数据流,数据流并没有发出,只有进行订阅之后才会触发数据流,不订阅什么都不会发生的。接下来,看看如何订阅和操作这些流。
import org.junit.Test;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.List;
public class ReactorStreamApi {
/**
* 流的map操作
*/
@Test
public void streamMap() {
Flux ints = Flux.range(1, 4);
Flux mapped = ints.map(i -> i * 2);
mapped.subscribe(System.out::println);
}
/**
* 带有异常情况的处理
*/
@Test
public void withError() {
Flux ints = Flux.range(1, 4)
.map(i-> {
if(i System.out.println(i),
err -> System.out.println("error : " + err.getMessage()),
() -> System.out.println("完成订阅和数据的消费")
);
}
@Test
public void testSubscribeWithBase(){
Flux ints = Flux.range(1, 4);
ints.subscribe(new MySubscriber());
}
public class MySubscriber extends BaseSubscriber {
@Override
protected void hookOnSubscribe(Subscription subscription) {
System.out.println("MySubscriber");
request(1);
}
@Override
protected void hookOnNext(T value) {
System.out.println(value.toString());
request(1);
}
}
/**
* 流的filter操作
*/
@Test
public void streamFilter() {
Flux ints = Flux.range(1, 4);
Flux filtered = ints.filter(i -> i % 2 == 0);
filtered.subscribe(System.out::println);
}
@Test
public void streamBuffer() {
Flux ints = Flux.range(1, 40);
Flux> buffered = ints.buffer(3);
buffered.subscribe(System.out::println);
}
@Test
public void streamRetry() {
Mono client = Mono.fromSupplier(() -> {
double num = Math.random();
if (num > 0.01) {
throw new Error("Network issue");
}
return "https://www.baidu.com";
});
client.retry(3).subscribe(System.out::println);
}
/**
* 响应式流的合并
*/
@Test
public void streamZip(){
Flux fluxA = Flux.range(1, 4);
Flux fluxB = Flux.range(5, 5);
fluxA
.zipWith(fluxB, (a, b)-> a+b)
.subscribe(System.out::println);
}
}
5.3 Flux处理实时流
对于某些需要实时处理的场景,可以考虑Flux的实时流的处理
import org.junit.Test;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import static reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST;
public class StreamTest {
@Test
public void simpleHotStreamCreation() {
Sinks.Many hotSource = Sinks.unsafe().many().multicast().directBestEffort();
//转为flux
Flux hotFlux = hotSource.asFlux();
//订阅数据
hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: " + d));
hotSource.emitNext(1, FAIL_FAST);
hotSource.tryEmitNext(2).orThrow();
hotFlux.subscribe(d -> System.out.println("Subscriber 2 to Hot Source: " + d));
hotSource.emitNext(3, FAIL_FAST);
hotSource.emitNext(4, FAIL_FAST);
hotSource.emitComplete(FAIL_FAST);
}
@Test
public void connectableFlux() throws InterruptedException {
Flux source = Flux.range(1, 4);
ConnectableFlux connectableFlux = source.publish();
connectableFlux.subscribe(d -> System.out.println("Subscriber 1 gets " + d));
connectableFlux.subscribe(d -> System.out.println("Subscriber 2 gets " + d));
System.out.println("Finish subscribe action");
Thread.sleep(1000L);
System.out.println("Connect to Flux now");
connectableFlux.connect();
}
@Test
public void autoConnectConnectableFlux() throws InterruptedException {
Flux source = Flux.range(1, 4);
Flux autoConnect = source.publish().autoConnect(2);
autoConnect.subscribe(d -> System.out.println("Subscriber 1 gets " + d));
System.out.println("Finish subscriber 1 action");
Thread.sleep(1000L);
System.out.println("Start subscriber 2 action");
autoConnect.subscribe(d -> System.out.println("Subscriber 2 gets " + d));
}
}
六、Spring Webflux 使用
6.1 Spring Webflux简介
在servlet3.0标准之前,是每一个请求对应一个线程。如果此时一个线程出现了高延迟,就会产生阻塞问题,从而导致整个服务出现严重的性能情况。因为一旦要调用第三方接口,就有可能出现这样的操作了。早期的处理方式只能是手工控制线程。
在servlet3.0标准之后,为解决此类问题,提供了异步响应的支持。在异步响应处理结构中,可以将耗时操作的部分交由一个专属的异步线程进行响应处理,同时请求的线程资源将被释放,并将该线程返回到线程池中,以供其他请求使用,这样的操作机制将极大的提升程序的并发性能。
对于以上给出的响应式编程支持,仅仅是一些原生的支持模式,而现在既然基于springboot程序开发,那么就需要考虑一些更简单的整合。
在spring中实现响应式编程,就需要使用到spring webFlux。该组件是一个重新构建的且基于Reactive Streams标准实现的异步非阻塞Web开发框架,以Reactor开发框架为基础,可以更加容易实现高并发访问下的请求处理模型。在springboot2.x版本中提供了webFlux依赖模块,该模块有两种模型实现:一种是基于功能性端点的方式(编程式实现),另一种是基于SpringMVC注解方式。
6.1 Spring Webflux中的核心组件
Spring Webflux 基于 Reactor,默认使用容器是 Netty,Netty 是高性能的 NIO 框架,异步非阻 塞的框架。Spring Webflux 执行过程和 SpringMVC 相似的 Spring Webflux 核心控制器 DispatchHandler,实现接口 WebHandler。
SpringWebflux 里面 DispatcherHandler,负责请求的处理,
-
HandlerMapping:请求查询到处理的方法
-
HandlerAdapter:真正负责请求处理
-
HandlerResultHandler:响应结果处理
SpringWebflux 实现函数式编程,两个接口:RouterFunction(路由处理)和 HandlerFunction(处理函数)
6.2 Spring Webflux基于注解的实现
6.2.1 引入核心依赖
注意,如果是在springboot项目中提供web接口,引入了下面的依赖之后就不要引入spring-boot-starter-web依赖了。
org.springframework.boot
spring-boot-starter-webflux
6.2.2 核心业务类
使用webflux编写web接口,与普通的rest-api类似,只是在webflux,返回值不再是对象或其他数据类型,而是Flux或Mono包装的数据对象。
import com.congge.entity.BookInfo;
import com.congge.service.BookService;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.HashMap;
import java.util.Map;
@Service
public class BookServiceImpl implements BookService {
//创建 map 集合存储数据
private final Map books = new HashMap();
public BookServiceImpl() {
this.books.put("01",new BookInfo("01","Java",20));
this.books.put("02",new BookInfo("02","Js",30));
this.books.put("03",new BookInfo("03","Hadoop",50));
}
@Override
public Mono getById(String id) {
return Mono.justOrEmpty(this.books.get(id));
}
@Override
public Flux getAll() {
return Flux.fromIterable(this.books.values());
}
@Override
public Mono saveBookInfo(Mono bookInfoMono) {
return bookInfoMono.doOnNext(book -> {
//向 map 集合里面放值
int id = books.size()+1;
books.put(String.valueOf(id),book);
}).thenEmpty(Mono.empty());
}
}
6.2.3 核心接口类
import com.congge.entity.BookInfo;
import com.congge.service.BookService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@RestController
public class BookController {
@Autowired
private BookService bookService;
//根据ID查询 http://localhost:8082/book/01
@GetMapping("/book/{id}")
public Mono getById(@PathVariable String id) {
return bookService.getById(id);
}
//查询所有 http://localhost:8082/findAll
@GetMapping("/findAll")
public Flux getUsers() {
return bookService.getAll();
}
@PostMapping("/save")
public Mono save(@RequestBody BookInfo user) {
Mono userMono = Mono.just(user);
return bookService.saveBookInfo(userMono);
}
}
选择其中一个接口测试,可以看到效果与传统的API接口返回值并无差别
补充说明:
1)SpringMVC 方式实现,同步阻塞的方式,基于 SpringMVC+Servlet+Tomcat;
2)SpringWebflux 方式实现,异步非阻塞 方式,基于 SpringWebflux+Reactor+Netty;
6.3 Spring Webflux 函数式编程实现
在使用函数式编程模型操作时候,需要自己初始化服务器,基于函数式编程模型时候,有两个核心接口:RouterFunction(实现路由功能,请求转发给对应的 handler)和 HandlerFunction(处理请求生成响应的函数)。核心任务定义两个函数式接口的实现并且启动需要的服务器。
Spring Webflux 请 求 和 响 应 不 再 是 ServletRequest 和 ServletResponse ,而是ServerRequest 和 ServerResponse
熟悉Netty的同学对Netty的编码风格不陌生,在编写Netty的服务时,也需要自定义Handler,然后将这个自定义Handler配置到启动配置参数中,因此可以同样的方式来理解Spring Webflux的函数式编程的套路。
6.3.1 自定义handler
可以这么理解,在这个handler类中,其实就是对底层的业务方法进一步的封装,只不过返回的数据类型为Mono或Flux;
import com.congge.entity.BookInfo;
import com.congge.service.BookService;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class BookHandler {
private BookService bookService;
public BookHandler(BookService bookService) {
this.bookService = bookService;
}
/**
* 根据ID查询
* @param serverRequest
* @return
*/
public Mono getBookById(ServerRequest serverRequest) {
String id = serverRequest.pathVariable("id");
Mono bookInfoMono = this.bookService.getById(id);
Mono noDataRes = ServerResponse.notFound().build();
return bookInfoMono.flatMap(bookInfo ->
ServerResponse
.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(bookInfo, BookInfo.class)
.switchIfEmpty(noDataRes)
);
}
/**
* 获取所有
* @return
*/
public Mono getAllBooks(ServerRequest serverRequest) {
//调用 service 得到结果
Flux bookInfoFlux = this.bookService.getAll();
return
ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(bookInfoFlux,BookInfo.class);
}
/**
* 保存数据
* @param request
* @return
*/
public Mono saveUser(ServerRequest request) {
//得到 user 对象
Mono bookInfoMono = request.bodyToMono(BookInfo.class);
return
ServerResponse.ok().build(this.bookService.saveBookInfo(bookInfoMono));
}
6.3.2 自定义server服务器
该类的作用就相当于是netty编程中,通过ServerBootstrap创建一个服务器类似;
import com.congge.handler.BookHandler;
import com.congge.service.BookService;
import com.congge.service.impl.BookServiceImpl;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
import org.springframework.web.reactive.function.server.RequestPredicates;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.netty.http.server.HttpServer;
import static org.springframework.web.reactive.function.server.RouterFunctions.toHttpHandler;
public class BookServer {
public RouterFunction routerFunction() {
BookService bookService = new BookServiceImpl();
BookHandler bookHandler = new BookHandler(bookService);
//设置路由
/* return RouterFunctions.route(
GET("/users/{id}").and(accept(APPLICATION_JSON)),handler::getUserById)
.andRoute(GET("/users").and(accept(APPLICATION_JSON)),handler::get
AllUsers);*/
return RouterFunctions.route(
RequestPredicates.GET("/users/{id}")
.and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),
bookHandler::getBookById
).andRoute(
RequestPredicates.GET("/users/{id}")
.and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),
bookHandler::getAllBooks
);
}
public void createReactorServer() {
//路由和 handler 适配
RouterFunction route = routerFunction();
HttpHandler httpHandler = toHttpHandler(route);
ReactorHttpHandlerAdapter adapter = new
ReactorHttpHandlerAdapter(httpHandler);
//创建服务器
HttpServer httpServer = HttpServer.create();
httpServer.handle(adapter).bindNow();
}
public static void main(String[] args) throws Exception{
BookServer server = new BookServer();
server.createReactorServer();
System.out.println("enter to exit");
System.in.read();
}
}
在该类的最后,编写了一个main函数,运行这个main程序,注意日志中的端口号,因为接下来将通过这个端口进行访问;
6.3.3 访问效果测试
访问接口:localhost:51315/book/01
6.3.4 使用webclient调用
也可以编写webclient调用上面的接口,代码如下
import com.congge.entity.BookInfo;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
public class ClientTest {
public static void main(String[] args) {
//调用服务器地址
WebClient webClient = WebClient.create("http://127.0.0.1:51315");
//根据 id 查询
String id = "01";
BookInfo bookInfo = webClient.get().uri("/book/{id}", id)
.accept(MediaType.APPLICATION_JSON).retrieve().bodyToMono(BookInfo.class)
.block();
System.out.println(bookInfo.getName());
//查询所有
Flux results = webClient.get().uri("/book/findAll").accept(MediaType.APPLICATION_JSON).retrieve().bodyToFlux(BookInfo
.class);
results.map(stu -> stu.getName())
.buffer().doOnNext(System.out::println).blockFirst();
}
}
6.4 Spring Boot RouterFunction 整合方式一
上面是通过自定义handler的方式实现了Spring Webflux函数式编程,如果直接在springboot中直接集成怎么做呢,只需要通过自定义配置bean的方式,将路由配置进去即可;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import static org.springframework.web.reactive.function.BodyInserters.fromObject;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;
@Configuration
public class MyRoutesConfig {
@Bean
RouterFunction index() {
return route(GET("/index"), request -> ok().body(fromObject("Hello Index")));
}
@Bean
RouterFunction about() {
return route(GET("/about"), request -> ok().body(fromObject("About page")));
}
}
当然里面的逻辑非常简单,实际使用时,可以在每个bean中补充更复杂的逻辑,比如调用其他业务类的逻辑,同样我们启动springboot应用后访问下端点/index,看到下面的效果。
6.5 Spring Boot RouterFunction 整合方式二
紧接着上面的案例,下面使用更通用的做法来完成与RouterFunction
的整合,首先还是自定义一个handler,这种自定义的配置类形式handler好处是可以注入其他业务类,从而实现更复杂的逻辑。
import com.congge.entity.BookInfo;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Component
public class ApiHandler {
public Mono getNewBooks(ServerRequest serverRequest) {
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(
Flux.create(sink ->{
sink.next(new BookInfo("05","mysql",90));
sink.next(new BookInfo("06","flink",78));
sink.next(new BookInfo("07","php",66));
sink.complete();
}),BookInfo.class
);
}
public Mono getBookById(ServerRequest serverRequest) {
String bookId = serverRequest.pathVariable("id");
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(
Mono.just(
new BookInfo(bookId,"python",57)
),BookInfo.class
);
}
}
自定义routerFunction,可以这么理解,通过这个类,就不用再单独编写一个controller,从而实现与普通的controller类中一样定义接口的功能。
import com.congge.handler.ApiHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
@Configuration
public class ApiRouterFunction {
@Bean
public RouterFunction apiRoute(ApiHandler apiHandler){
return route(
GET("/book/getBookById/{id}"),
apiHandler::getBookById
).and(
route(
GET("/book/getNewBooks"),
apiHandler::getNewBooks
));
}
}
启动项目之后,我们来访问一下其中的一个接口,效果与普通的接口效果类似。
6.5.1 静态化改造
如果你不希望上面的自定义handler和routerConfig与spring框架耦合的太紧密,也可以将其做成静态化的配置,通过app启动的时候自动注册,只需去掉spring相关的注解,然后在app启动类注册进去即可。
ApiRouterFunction改造,将原本的配置bean方法修改为static 如下代码
public class ApiRouterFunction {
public static RouterFunction apiRoute(){
ApiHandler apiHandler = new ApiHandler();
return route(
GET("/book/getBookById/{id}"),
apiHandler::getBookById
).and(
route(
GET("/book/getNewBooks"),
apiHandler::getNewBooks
));
}
}
ApiHandler改造,去掉配置注解,启动类改造如下
public static void main(String[] args) {
new SpringApplicationBuilder()
.sources(FluxApp.class)
.initializers((ApplicationContextInitializer) ctx ->{
ctx.registerBean("apiRoute",RouterFunction.class,ApiRouterFunction::apiRoute);
})
.run(args)
;
}
再次启动后调用相同的接口,仍然可以得到正确的响应结果
七、webflux的使用场景
通过上面关于webflux的使用了解到webflux的强大之处,其实在很多中间件,微服务组件中都随处可见webflux的响应式编程的影子,比如在springcloud gateway网关中,网关作为流量的入口,为了持续提升整体服务的高性能、高吞吐、高并发的请求,在处理请求拦截、路由转发等方面使用webflux。如下这段代码,就是gateway中自定义过滤器的一段配置;
@Component
@Slf4j
public class LogFilter implements GlobalFilter {
@Override
public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) {
log.info(exchange.getRequest().getPath().value());
return chain.filter(exchange);
}
}
结合实际经验,对于下面的这些场景,可以考虑使用webflux解决:
- Spring WebFlux 是一个异步非阻塞式的 Web 框架,所以,它特别适合应用在 IO 密集型的服务中,比如像上面提到的微服务网关这样的应用中;
- 硬件资源扩充困难,但又希望提升系统整体的吞吐量,可以考虑使用webflux,因为WebFlux 内部使用的是响应式编程(Reactive Programming),以 Reactor 库为基础, 基于异步和事件驱动;
- 一些对请求响应时间要求不高,但是并发较大的异步场景;
注意
WebFlux 并不能使接口的响应时间缩短,它仅仅能够提升吞吐量和伸缩性。
八、写在文末
从WebFlux 的发展以及在众多的Java生态组件中广泛使用来看,WebFlux 的流行趋势已经到来,因此掌握WebFlux 的核心原理和思想,在日常工作开发中,在某些特殊的场景下能够提供很好的解决思路,当然WebFlux 涉及到的技术点还有很多,比如对websocket的支持等,有兴趣的同学可以继续参阅相关资料深入学习,本篇到此结束,感谢观看。