我是靠谱客的博主 懵懂可乐,这篇文章主要介绍Reactive响应式WebClient负载调用微服务接口封装,自行编写类OpenFeign的Reactive实现背景开搞测试欢迎留言指正、讨论,现在分享给大家,希望可以做个参考。

Reactive响应式WebClient负载调用微服务接口封装,自行编写类OpenFeign的Reactive实现

  • 背景
  • 开搞
    • 项目pom.xml
    • 调用处理方法
    • 自定义注解(类~@FeignClient)
    • 使用注解定义服务调用方法
    • 编写动态代理
      • 创建代理接口
      • 编写代理实现
  • 测试
    • 自动注入获取业务接口实例
    • 方法调用
  • 欢迎留言指正、讨论

经验总结,如有错误请指正!

背景

Spring Gateway v3.0.1为响应式框架,但是在微服务中避免不了微服务之间相互调用,但微服务大部分都是Spring Boot开发的,属于阻塞式框架,所以这种情况就属于响应式程序调用阻塞式程序提供的接口;

OpenFeign是阻塞式的,所以没办法在Gateway中直接使用,官方文档对响应式支持这样写的;
Spring Cloud Openfeign | Reactive Support
在这里插入图片描述
意思就是有个 feign-reactive 的框架可以支持,这个框架是PlaytikaOSS根据OpenFeign开发的一个框架;

但是我没有使用,我想自己封装一下,然后就可以实现像OpenFeign那样调用就可以了,也是增长一下水平,响应式框架的Http客户端有WebClient,可以基于WebClient调用封装一个类似OpenFeign的小实现(参考Openfeign)

开搞

项目pom.xml

本次介绍的封装实现主要是依赖nacos、gateway、loadbalancer,如下所示,其余的按需自行增配

<!--nacos-->
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!--spring cloud gateway-->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-commons</artifactId>
</dependency>
<!--客户端负载均衡loadbalancer-->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-loadbalancer</artifactId>
</dependency>
<!-- loadbalancer 缓存优化 -->
<dependency>
    <groupId>com.github.ben-manes.caffeine</groupId>
    <artifactId>caffeine</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context-support</artifactId>
</dependency>

调用处理方法

先编写实际处理业务接口调用的WebClient实现方法,这里写了一个简单的工具类

@Slf4j
@Component
public class SmartWebClient {

    private final WebClient smartWebClient;

	// ReactorLoadBalancerExchangeFilterFunction是LoadBalance的一个负载
	// 可以适配nacos的服务发现,将其注入到WebClient即可实现服务名称调用及负载
    public SmartWebClient(ReactorLoadBalancerExchangeFilterFunction lbFunction) {
        this.smartWebClient = WebClient.builder()
                .filter(lbFunction)
                // 默认请求头
                .defaultHeader(WEB_CLIENT_HEADER, WEB_CLIENT_HEADER_VALUE)
                .build();
    }

	// TODO 调用方法,响应Mono类型,自己可以按照需要创建实体来封装请求信息,这里仅作为例子
    public <T> Mono<T> restMono(HttpMethod httpMethod, String url,
                            Map<String, String> params, MultiValueMap<String, String> body,
                            MediaType reqMediaType, MediaType respMediaType,
                            Class<T> resultType, Function<ClientResponse, Mono<? extends Throwable>> unSuccessHandle) {
        return smartWebClient
                .method(httpMethod) // post get put ……
                .uri(url, params) // 这里的params是拼接在url中的,可以用@PathVariable接收的参数
                .accept(respMediaType) // contentType
                .contentType(reqMediaType) // contentType
                // 这里可以放入BodyInserters.from……多种方法,按需使用
                .body(BodyInserters.fromFormData(body)) // 构造的参数可以用@RequestParam、@RequestBody接收
                .retrieve() // 发送请求
                // 第一个参数判断状态,为调用方法的条件,第二个传入lambda表达式,返回Mono<异常>
                .onStatus(status -> !status.is2xxSuccessful(), unSuccessHandle) 
                // 结果转换,将响应转换为什么类型的结果,结果为响应式类型Mono/Flux
                .bodyToMono(resultType);
    }
    
	// TODO 调用方法,响应Flux类型,自己可以按照需要创建实体来封装请求信息,这里仅作为例子
    public <T> Flux<T> restFlux(HttpMethod httpMethod, String url,
                            Map<String, String> params, MultiValueMap<String, String> body,
                            MediaType reqMediaType, MediaType respMediaType,
                            Class<T> resultType, Function<ClientResponse, Mono<? extends Throwable>> unSuccessHandle) {
        return smartWebClient
                .method(httpMethod) // post get put ……
                .uri(url, params) // 这里的params是拼接在url中的,可以用@PathVariable接收的参数
                .accept(respMediaType) // contentType
                .contentType(reqMediaType) // contentType
                // 这里可以放入BodyInserters.from……多种方法,按需使用
                .body(BodyInserters.fromFormData(body)) // 构造的参数可以用@RequestParam、@RequestBody接收
                .retrieve() // 发送请求
                // 第一个参数判断状态,为调用方法的条件,第二个传入lambda表达式,返回Mono<异常>
                .onStatus(status -> !status.is2xxSuccessful(), unSuccessHandle) 
                // 结果转换,将响应转换为什么类型的结果,结果为响应式类型Mono/Flux
                .bodyToFlux(resultType);
    }

}

实体

@Data
public class WebClientRestInfo<T> {

    /**
     * 请求方式
     */
    private HttpMethod httpMethod = HttpMethod.GET;

    /**
     * 请求地址
     */
    private String url;

    /**
     * 请求path参数
     */
    private Map<String, Object> pathVariable;

    /**
     * 请求表单/Body
     */
    private MultiValueMap<String, Object> formValues;

    /**
     * 编码
     */
    private MediaType reqMediaType = MediaType.APPLICATION_JSON;
    private MediaType respMediaType = MediaType.APPLICATION_JSON;

    /**
     * 是否为Flux
     */
    private boolean isFlux = false;

    /**
     * 返回值类型
     */
    private Class<T> resultType;
}

自定义注解(类~@FeignClient)

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface ReactiveFeignClient {

    String name();

    String path();

    Class<?> fallback() default void.class;
}

使用注解定义服务调用方法

和Openfeign一样,从其他服务的controller参考创建服务接口即可,实体在服务内创建即可,但返回结果需要加上Mono(单个结果)或者Flux(多个结果),这是响应式编程必要的

@ReactiveFeignClient(name = "/order-service", path = "/test")
public interface OrderService {

    @PostMapping(value = "/getOrder")
    Mono<OrderInfo> getOrder(@RequestParam String orderCode);
}	

编写动态代理

根据注解所在类,使用反射获取信息,调用业务处理方法
这里使用JDK代理方式

创建代理接口

public interface ReactiveFeignClientProxy {

	// 因为WebClient调用需要负载,所以需要将负载通过自动注入传进来
    <T> T createProxyObject(Class<T> type, ReactorLoadBalancerExchangeFilterFunction lbFunction);
}

编写代理实现

@Slf4j
public class ReactiveFeignClientProxyCreator implements ReactiveFeignClientProxy {

    /**
     * 代理实例化
     *
     * @param type
     * @param lbFunction 需要传入LoadBalance负载均衡实例
     * @param <T>
     * @return
     */
    @Override
    public <T> T createProxyObject(Class<T> type, ReactorLoadBalancerExchangeFilterFunction lbFunction) {
        log.info("ReactiveFeignClientProxyCreator.createProxy: {}", type);
        // 构建WebClient客户端
        SmartWebClient smartWebClient = new SmartWebClient(lbFunction);

        // 获取注解信息
        ReactiveFeignClient annotationClass = type.getAnnotation(ReactiveFeignClient.class);
        String serviceName = annotationClass.name();
        String serviceBasePath = annotationClass.path();
        Class fallbackClass = annotationClass.fallback();

        // 创建代理方法
        Object obj = Proxy.newProxyInstance(type.getClassLoader(), new Class<?>[]{type}, (proxy, method, args) -> {
            // 根据方法和参数得到调用服务器信息
            String url = "http://" + serviceName + serviceBasePath;
            
            // TODO 这里我是封装了对象,多个变量信息也是一样的,参考逻辑即可
            WebClientRestInfo<T> webClientRestInfo = new WebClientRestInfo<T>();
            webClientRestInfo.setHttpMethod(HttpMethod.GET);
            webClientRestInfo.setUrl(url);
            // 得到请求URL和请求方法
            contractUrlAndMethod(method, webClientRestInfo);
            // 得到调用的参数和body
            contractRequestParams(method, args, webClientRestInfo);

            // 返回flux还是mono
            // isAssignableFrom 判断类型是否是某个类的子类 instanceof 判断实例是否是某个类的子类
            boolean isFlux = method.getReturnType().isAssignableFrom(Flux.class);
            webClientRestInfo.setFlux(isFlux);

            // 得到返回对象的实际类型
            Class<T> elementType = extractElementType(method.getGenericReturnType());
            webClientRestInfo.setResultType(elementType);

            // 调用rest 注意根据类型分别调用Mono和Flux两种请求处理
            CorePublisher<T> webClientResult = null;
            if (isFlux) {
            	// TODO 第二个参数为判断响应状态后的处理Function
                webClientResult = smartWebClient.restFlux(webClientRestInfo, resp -> {
                    log.error("调用{}.{}方法发生错误,调用Fallback", type.getSimpleName(), method.getName());
                    return Mono.error(new RuntimeException(type.getSimpleName() + "." + method.getName() + "Request Fail."));
                }).onErrorResume(Exception.class, throwable -> {
                    // 失败时调用方法 onErrorResume可以使调用失败后按照自定义处理并继续返回响应式结果
                    return (Flux<T>) fallback(fallbackClass, type, method, args);
                });
            } else {
                // 调用rest 同上
                webClientResult = smartWebClient.restMono(webClientRestInfo, resp -> {
                    log.error("调用{}.{}方法发生错误,调用Fallback", type.getSimpleName(), method.getName());
                    return Mono.error(new RuntimeException(type.getSimpleName() + "." + method.getName() + "Request Fail."));
                }).onErrorResume(Exception.class, throwable -> {
                    // 失败时调用方法
                    return (Mono<T>) fallback(fallbackClass, type, method, args);
                });
            }

            // 返回调用结果
            return webClientResult;
        });

        return (T) obj;
    }

    /**
     * 得到缺省类型的实际类型
     *
     * @param genericReturnType
     * @return
     */
    private <T> Class<T> extractElementType(Type genericReturnType) {
        Type[] actualTypeArguments = ((ParameterizedType) genericReturnType).getActualTypeArguments();
        for (Type t : actualTypeArguments) {
            if (t instanceof Class) {
                return (Class<T>) t;
            } else {
                Type[] aaa = ((ParameterizedType) t).getActualTypeArguments();
                return (Class<T>) ((ParameterizedType) t).getRawType();
            }
        }
        return (Class<T>) actualTypeArguments[0];
    }

    /**
     * 得到请求URL和请求方法
     *
     * @param method
     * @param webClientRestInfo
     */
    private void contractUrlAndMethod(Method method, WebClientRestInfo webClientRestInfo) {
        String url = webClientRestInfo.getUrl();
        Annotation[] annotationsMethod = method.getAnnotations();
        for (Annotation annotation : annotationsMethod) {
            // GET
            if (annotation instanceof GetMapping) {
                GetMapping a = (GetMapping) annotation;
                url += a.value()[0];
                webClientRestInfo.setHttpMethod(HttpMethod.GET);
            }
            // POST
            else if (annotation instanceof PostMapping) {
                PostMapping a = (PostMapping) annotation;
                url += a.value()[0];
                webClientRestInfo.setHttpMethod(HttpMethod.POST);
            }
            // DELETE
            else if (annotation instanceof DeleteMapping) {
                DeleteMapping a = (DeleteMapping) annotation;
                url += a.value()[0];
                webClientRestInfo.setHttpMethod(HttpMethod.DELETE);
            }
            // PUT
            else if (annotation instanceof PutMapping) {
                PutMapping a = (PutMapping) annotation;
                url += a.value()[0];
                webClientRestInfo.setHttpMethod(HttpMethod.PUT);
            }
        }
        webClientRestInfo.setUrl(url);
    }

    /**
     * 得到调用的参数和body
     *
     * @param method
     * @param args
     * @param webClientRestInfo
     */
    private void contractRequestParams(Method method, Object[] args, WebClientRestInfo webClientRestInfo) {
        // 参数和值对应的map
        Map<String, Object> params = new LinkedHashMap<>();
        MultiValueMap<String, Object> formValue = new LinkedMultiValueMap<>();

        // 得到调用的参数和body
        Parameter[] parameters = method.getParameters();
        for (int i = 0; i < parameters.length; i++) {
            // 是否带 @PathVariable注解
            PathVariable annoPath = parameters[i].getAnnotation(PathVariable.class);
            if (annoPath != null) {
                params.put(annoPath.value(), args[i]);
            }

            // 是否带了 RequestParam
            RequestParam annoParam = parameters[i].getAnnotation(RequestParam.class);
            // 是否带了 RequestBody
            RequestBody annoBody = parameters[i].getAnnotation(RequestBody.class);
            if (annoParam != null || annoBody != null) {
                formValue.add(parameters[i].getName(), args[i]);
            }
        }
        webClientRestInfo.setPathVariable(params);
        webClientRestInfo.setFormValues(formValue);
    }

    /**
     * 调用fallback方法
     *
     * @param fallbackClass
     * @param proxyType
     * @param method
     * @param args
     * @param <T>
     * @return
     */
    private <T> Object fallback(Class fallbackClass, Class<T> proxyType,
                                Method method, Object[] args) {
        // 失败时调用方法
        try {
            return fallbackClass.getMethod(
                    method.getName(),
                    method.getParameterTypes()
            ).invoke(fallbackClass.newInstance(), args);
        } catch (NoSuchMethodException e) {
            log.error("未找到{}.{}方法的Fallback", proxyType.getSimpleName(), method.getName());
        } catch (IllegalAccessException | InstantiationException | InvocationTargetException e) {
            log.error("实例化{}的FallbackClass失败", proxyType.getSimpleName());
        }
        return Mono.empty();
    }
}

测试

可用通过自动注入,调用代理获取实例

自动注入获取业务接口实例

private OrderService orderService;

@Resource
public void setOrderService (ReactorLoadBalancerExchangeFilterFunction lbFunction) {
    this.orderService = new ReactiveFeignClientProxyCreator().createProxyObject(OrderService.class, lbFunction);
}

方法调用

像Openfeign一样使用,就会根据自定义注解声明的服务名称去nacos中找到服务地址,并通过LoadBalance负载调用服务,然后根据注解声明的接口信息调用的Controller接口

// 响应式线程处理
Mono<OrderInfo> orderInfo = orderService.getOrder("order-code");
// 返回结果或者订阅响应处理
orderInfo.subscribe(res -> {
//TODO
})

// 阻塞式线程处理(不推荐)
Mono<OrderInfo> orderInfo = orderService.getOrder("order-code");
// block可以阻塞等待获取结果
OrderInfo orderInfo = orderInfo.block();

欢迎留言指正、讨论

最后

以上就是懵懂可乐最近收集整理的关于Reactive响应式WebClient负载调用微服务接口封装,自行编写类OpenFeign的Reactive实现背景开搞测试欢迎留言指正、讨论的全部内容,更多相关Reactive响应式WebClient负载调用微服务接口封装,自行编写类OpenFeign内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(153)

评论列表共有 0 条评论

立即
投稿
返回
顶部