我是靠谱客的博主 懵懂可乐,这篇文章主要介绍Reactive响应式WebClient负载调用微服务接口封装,自行编写类OpenFeign的Reactive实现背景开搞测试欢迎留言指正、讨论,现在分享给大家,希望可以做个参考。

Reactive响应式WebClient负载调用微服务接口封装,自行编写类OpenFeign的Reactive实现

  • 背景
  • 开搞
    • 项目pom.xml
    • 调用处理方法
    • 自定义注解(类~@FeignClient)
    • 使用注解定义服务调用方法
    • 编写动态代理
      • 创建代理接口
      • 编写代理实现
  • 测试
    • 自动注入获取业务接口实例
    • 方法调用
  • 欢迎留言指正、讨论

经验总结,如有错误请指正!

背景

Spring Gateway v3.0.1为响应式框架,但是在微服务中避免不了微服务之间相互调用,但微服务大部分都是Spring Boot开发的,属于阻塞式框架,所以这种情况就属于响应式程序调用阻塞式程序提供的接口;

OpenFeign是阻塞式的,所以没办法在Gateway中直接使用,官方文档对响应式支持这样写的;
Spring Cloud Openfeign | Reactive Support
在这里插入图片描述
意思就是有个 feign-reactive 的框架可以支持,这个框架是PlaytikaOSS根据OpenFeign开发的一个框架;

但是我没有使用,我想自己封装一下,然后就可以实现像OpenFeign那样调用就可以了,也是增长一下水平,响应式框架的Http客户端有WebClient,可以基于WebClient调用封装一个类似OpenFeign的小实现(参考Openfeign)

开搞

项目pom.xml

本次介绍的封装实现主要是依赖nacos、gateway、loadbalancer,如下所示,其余的按需自行增配

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<!--nacos--> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> </dependency> <!--spring cloud gateway--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-gateway</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-commons</artifactId> </dependency> <!--客户端负载均衡loadbalancer--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-loadbalancer</artifactId> </dependency> <!-- loadbalancer 缓存优化 --> <dependency> <groupId>com.github.ben-manes.caffeine</groupId> <artifactId>caffeine</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> </dependency>

调用处理方法

先编写实际处理业务接口调用的WebClient实现方法,这里写了一个简单的工具类

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Slf4j @Component public class SmartWebClient { private final WebClient smartWebClient; // ReactorLoadBalancerExchangeFilterFunction是LoadBalance的一个负载 // 可以适配nacos的服务发现,将其注入到WebClient即可实现服务名称调用及负载 public SmartWebClient(ReactorLoadBalancerExchangeFilterFunction lbFunction) { this.smartWebClient = WebClient.builder() .filter(lbFunction) // 默认请求头 .defaultHeader(WEB_CLIENT_HEADER, WEB_CLIENT_HEADER_VALUE) .build(); } // TODO 调用方法,响应Mono类型,自己可以按照需要创建实体来封装请求信息,这里仅作为例子 public <T> Mono<T> restMono(HttpMethod httpMethod, String url, Map<String, String> params, MultiValueMap<String, String> body, MediaType reqMediaType, MediaType respMediaType, Class<T> resultType, Function<ClientResponse, Mono<? extends Throwable>> unSuccessHandle) { return smartWebClient .method(httpMethod) // post get put …… .uri(url, params) // 这里的params是拼接在url中的,可以用@PathVariable接收的参数 .accept(respMediaType) // contentType .contentType(reqMediaType) // contentType // 这里可以放入BodyInserters.from……多种方法,按需使用 .body(BodyInserters.fromFormData(body)) // 构造的参数可以用@RequestParam、@RequestBody接收 .retrieve() // 发送请求 // 第一个参数判断状态,为调用方法的条件,第二个传入lambda表达式,返回Mono<异常> .onStatus(status -> !status.is2xxSuccessful(), unSuccessHandle) // 结果转换,将响应转换为什么类型的结果,结果为响应式类型Mono/Flux .bodyToMono(resultType); } // TODO 调用方法,响应Flux类型,自己可以按照需要创建实体来封装请求信息,这里仅作为例子 public <T> Flux<T> restFlux(HttpMethod httpMethod, String url, Map<String, String> params, MultiValueMap<String, String> body, MediaType reqMediaType, MediaType respMediaType, Class<T> resultType, Function<ClientResponse, Mono<? extends Throwable>> unSuccessHandle) { return smartWebClient .method(httpMethod) // post get put …… .uri(url, params) // 这里的params是拼接在url中的,可以用@PathVariable接收的参数 .accept(respMediaType) // contentType .contentType(reqMediaType) // contentType // 这里可以放入BodyInserters.from……多种方法,按需使用 .body(BodyInserters.fromFormData(body)) // 构造的参数可以用@RequestParam、@RequestBody接收 .retrieve() // 发送请求 // 第一个参数判断状态,为调用方法的条件,第二个传入lambda表达式,返回Mono<异常> .onStatus(status -> !status.is2xxSuccessful(), unSuccessHandle) // 结果转换,将响应转换为什么类型的结果,结果为响应式类型Mono/Flux .bodyToFlux(resultType); } }

实体

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Data public class WebClientRestInfo<T> { /** * 请求方式 */ private HttpMethod httpMethod = HttpMethod.GET; /** * 请求地址 */ private String url; /** * 请求path参数 */ private Map<String, Object> pathVariable; /** * 请求表单/Body */ private MultiValueMap<String, Object> formValues; /** * 编码 */ private MediaType reqMediaType = MediaType.APPLICATION_JSON; private MediaType respMediaType = MediaType.APPLICATION_JSON; /** * 是否为Flux */ private boolean isFlux = false; /** * 返回值类型 */ private Class<T> resultType; }

自定义注解(类~@FeignClient)

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited public @interface ReactiveFeignClient { String name(); String path(); Class<?> fallback() default void.class; }

使用注解定义服务调用方法

和Openfeign一样,从其他服务的controller参考创建服务接口即可,实体在服务内创建即可,但返回结果需要加上Mono(单个结果)或者Flux(多个结果),这是响应式编程必要的

复制代码
1
2
3
4
5
6
7
@ReactiveFeignClient(name = "/order-service", path = "/test") public interface OrderService { @PostMapping(value = "/getOrder") Mono<OrderInfo> getOrder(@RequestParam String orderCode); }

编写动态代理

根据注解所在类,使用反射获取信息,调用业务处理方法
这里使用JDK代理方式

创建代理接口

复制代码
1
2
3
4
5
6
public interface ReactiveFeignClientProxy { // 因为WebClient调用需要负载,所以需要将负载通过自动注入传进来 <T> T createProxyObject(Class<T> type, ReactorLoadBalancerExchangeFilterFunction lbFunction); }

编写代理实现

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
@Slf4j public class ReactiveFeignClientProxyCreator implements ReactiveFeignClientProxy { /** * 代理实例化 * * @param type * @param lbFunction 需要传入LoadBalance负载均衡实例 * @param <T> * @return */ @Override public <T> T createProxyObject(Class<T> type, ReactorLoadBalancerExchangeFilterFunction lbFunction) { log.info("ReactiveFeignClientProxyCreator.createProxy: {}", type); // 构建WebClient客户端 SmartWebClient smartWebClient = new SmartWebClient(lbFunction); // 获取注解信息 ReactiveFeignClient annotationClass = type.getAnnotation(ReactiveFeignClient.class); String serviceName = annotationClass.name(); String serviceBasePath = annotationClass.path(); Class fallbackClass = annotationClass.fallback(); // 创建代理方法 Object obj = Proxy.newProxyInstance(type.getClassLoader(), new Class<?>[]{type}, (proxy, method, args) -> { // 根据方法和参数得到调用服务器信息 String url = "http://" + serviceName + serviceBasePath; // TODO 这里我是封装了对象,多个变量信息也是一样的,参考逻辑即可 WebClientRestInfo<T> webClientRestInfo = new WebClientRestInfo<T>(); webClientRestInfo.setHttpMethod(HttpMethod.GET); webClientRestInfo.setUrl(url); // 得到请求URL和请求方法 contractUrlAndMethod(method, webClientRestInfo); // 得到调用的参数和body contractRequestParams(method, args, webClientRestInfo); // 返回flux还是mono // isAssignableFrom 判断类型是否是某个类的子类 instanceof 判断实例是否是某个类的子类 boolean isFlux = method.getReturnType().isAssignableFrom(Flux.class); webClientRestInfo.setFlux(isFlux); // 得到返回对象的实际类型 Class<T> elementType = extractElementType(method.getGenericReturnType()); webClientRestInfo.setResultType(elementType); // 调用rest 注意根据类型分别调用Mono和Flux两种请求处理 CorePublisher<T> webClientResult = null; if (isFlux) { // TODO 第二个参数为判断响应状态后的处理Function webClientResult = smartWebClient.restFlux(webClientRestInfo, resp -> { log.error("调用{}.{}方法发生错误,调用Fallback", type.getSimpleName(), method.getName()); return Mono.error(new RuntimeException(type.getSimpleName() + "." + method.getName() + "Request Fail.")); }).onErrorResume(Exception.class, throwable -> { // 失败时调用方法 onErrorResume可以使调用失败后按照自定义处理并继续返回响应式结果 return (Flux<T>) fallback(fallbackClass, type, method, args); }); } else { // 调用rest 同上 webClientResult = smartWebClient.restMono(webClientRestInfo, resp -> { log.error("调用{}.{}方法发生错误,调用Fallback", type.getSimpleName(), method.getName()); return Mono.error(new RuntimeException(type.getSimpleName() + "." + method.getName() + "Request Fail.")); }).onErrorResume(Exception.class, throwable -> { // 失败时调用方法 return (Mono<T>) fallback(fallbackClass, type, method, args); }); } // 返回调用结果 return webClientResult; }); return (T) obj; } /** * 得到缺省类型的实际类型 * * @param genericReturnType * @return */ private <T> Class<T> extractElementType(Type genericReturnType) { Type[] actualTypeArguments = ((ParameterizedType) genericReturnType).getActualTypeArguments(); for (Type t : actualTypeArguments) { if (t instanceof Class) { return (Class<T>) t; } else { Type[] aaa = ((ParameterizedType) t).getActualTypeArguments(); return (Class<T>) ((ParameterizedType) t).getRawType(); } } return (Class<T>) actualTypeArguments[0]; } /** * 得到请求URL和请求方法 * * @param method * @param webClientRestInfo */ private void contractUrlAndMethod(Method method, WebClientRestInfo webClientRestInfo) { String url = webClientRestInfo.getUrl(); Annotation[] annotationsMethod = method.getAnnotations(); for (Annotation annotation : annotationsMethod) { // GET if (annotation instanceof GetMapping) { GetMapping a = (GetMapping) annotation; url += a.value()[0]; webClientRestInfo.setHttpMethod(HttpMethod.GET); } // POST else if (annotation instanceof PostMapping) { PostMapping a = (PostMapping) annotation; url += a.value()[0]; webClientRestInfo.setHttpMethod(HttpMethod.POST); } // DELETE else if (annotation instanceof DeleteMapping) { DeleteMapping a = (DeleteMapping) annotation; url += a.value()[0]; webClientRestInfo.setHttpMethod(HttpMethod.DELETE); } // PUT else if (annotation instanceof PutMapping) { PutMapping a = (PutMapping) annotation; url += a.value()[0]; webClientRestInfo.setHttpMethod(HttpMethod.PUT); } } webClientRestInfo.setUrl(url); } /** * 得到调用的参数和body * * @param method * @param args * @param webClientRestInfo */ private void contractRequestParams(Method method, Object[] args, WebClientRestInfo webClientRestInfo) { // 参数和值对应的map Map<String, Object> params = new LinkedHashMap<>(); MultiValueMap<String, Object> formValue = new LinkedMultiValueMap<>(); // 得到调用的参数和body Parameter[] parameters = method.getParameters(); for (int i = 0; i < parameters.length; i++) { // 是否带 @PathVariable注解 PathVariable annoPath = parameters[i].getAnnotation(PathVariable.class); if (annoPath != null) { params.put(annoPath.value(), args[i]); } // 是否带了 RequestParam RequestParam annoParam = parameters[i].getAnnotation(RequestParam.class); // 是否带了 RequestBody RequestBody annoBody = parameters[i].getAnnotation(RequestBody.class); if (annoParam != null || annoBody != null) { formValue.add(parameters[i].getName(), args[i]); } } webClientRestInfo.setPathVariable(params); webClientRestInfo.setFormValues(formValue); } /** * 调用fallback方法 * * @param fallbackClass * @param proxyType * @param method * @param args * @param <T> * @return */ private <T> Object fallback(Class fallbackClass, Class<T> proxyType, Method method, Object[] args) { // 失败时调用方法 try { return fallbackClass.getMethod( method.getName(), method.getParameterTypes() ).invoke(fallbackClass.newInstance(), args); } catch (NoSuchMethodException e) { log.error("未找到{}.{}方法的Fallback", proxyType.getSimpleName(), method.getName()); } catch (IllegalAccessException | InstantiationException | InvocationTargetException e) { log.error("实例化{}的FallbackClass失败", proxyType.getSimpleName()); } return Mono.empty(); } }

测试

可用通过自动注入,调用代理获取实例

自动注入获取业务接口实例

复制代码
1
2
3
4
5
6
7
private OrderService orderService; @Resource public void setOrderService (ReactorLoadBalancerExchangeFilterFunction lbFunction) { this.orderService = new ReactiveFeignClientProxyCreator().createProxyObject(OrderService.class, lbFunction); }

方法调用

像Openfeign一样使用,就会根据自定义注解声明的服务名称去nacos中找到服务地址,并通过LoadBalance负载调用服务,然后根据注解声明的接口信息调用的Controller接口

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
// 响应式线程处理 Mono<OrderInfo> orderInfo = orderService.getOrder("order-code"); // 返回结果或者订阅响应处理 orderInfo.subscribe(res -> { //TODO }) // 阻塞式线程处理(不推荐) Mono<OrderInfo> orderInfo = orderService.getOrder("order-code"); // block可以阻塞等待获取结果 OrderInfo orderInfo = orderInfo.block();

欢迎留言指正、讨论

最后

以上就是懵懂可乐最近收集整理的关于Reactive响应式WebClient负载调用微服务接口封装,自行编写类OpenFeign的Reactive实现背景开搞测试欢迎留言指正、讨论的全部内容,更多相关Reactive响应式WebClient负载调用微服务接口封装,自行编写类OpenFeign内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(88)

评论列表共有 0 条评论

立即
投稿
返回
顶部