我是靠谱客的博主 明亮夏天,最近开发中收集的这篇文章主要介绍十五、Spring cloud 消息总线(Bus),觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

一、回顾 Spring 事件/监听器

  • Spring 事件
    • ApplicationEvent
  • Spring 监听器
    • ApplicationListener/@EventListener
  • Spring 事件发布器
    • ApplicationEventPublisher

Demo:

/**
 * Spring 事件/监听器 Demo
 * @author 咸鱼
 * @date 2018/11/29 18:58
 */
public class SpringEventDemo {
    public static void main(String[] args) {
        //创建 Annotation 驱动 Spring 应用上下文
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();

        //注册 EventConfiguration 到 Spring 应用上下文
        context.register(EventConfiguration.class);

        //启动 Spring 应用上下文
        context.refresh();

        //AnnotationConfigApplicationContext 是 ApplicationEventPublisher 的一种具体实现
        ApplicationEventPublisher publisher = context;

        //发布一个
        publisher.publishEvent(new MyApplicationEvent("hello java"));
    }

    /**
     * 自定义事件
     */
    private static class  MyApplicationEvent extends ApplicationEvent{

        private static final long serialVersionUID = -5938169569156916456L;

        public MyApplicationEvent(String message) {
            super(message);
        }
    }

    /**
     * 监听事件
     */
    @Configuration
    public static class EventConfiguration{

        /**
         * 监听 {@link MyApplicationEvent}
         * @param event
         */
        @EventListener
        public void onEvent(MyApplicationEvent event){
            System.out.println("监听到事件:" + event);
        }
    }
}

二、Spring Cloud Bus

(一)使用场景
  用于广播应用状态变更到分布式系统中的各个关联的节点。应用节点不直接相互通讯,而通过消息总线来实现通知。

  简单点说:比如有一个配置服务器,有多个配置客户端。以前没用消息总线之前,一旦配置服务器的配置项发生改变,那么需要由每个配置客户端调用 POST请求/actuator/refresh 才能刷新本地配置项。这样带来的问题是,一旦系统越来越大,那么若要改变配置项,则需要大量的配置客户端手动刷新本地配置项。而引入了消息总线以后,则由消息总线来通知各个客户端配置项发生改变了,并触发刷新本地配置项操作。

(二)架构:
Spring Cloud Bus 架构图

  架构解析:
  客户端发送 POST请求 /actuator/bus-refresh/${contextId}:*,消息总线会根据请求生成一个事件(E1),并将该事件发送给消息中间件(比如 Kafka)。此时会有两种情况:单点传播 和 集群传播。这主要适用于同样的应用会有多个同样的实例(这些实例靠端口进行区分,但 serviceId 都是一样的)。单点传播就是只向其中的一个实例传播,集群传播向所有的实例传播。
  在消息中间件在接收消息时,所有的应用同时也在监听这些事件(比如 E1)。在监听到 E1 事件以后,根据上面的规则,消息总线会将 E1 事件包装成不同类型的内部事件(比如 RefreshRemoteApplicationEvent)。然后让目的应用的 EventPublisher 将包装后的事件发布给事件监听器队列,等待下一步的处理。

暂时这么理解,后续如果不对,再进行改正。

(三)默认实现

  • AMQP(Rabbit MQ)
  • Kafka

  现阶段,Spring Cloud Bus 只支持 AMQP(Rabbit MQ) 和 Kafka两个消息中间件。

三、案例(使用 Kafka)

激活总线:
AMQP:spring-cloud-starter-bus-amqp
Kafka:spring-cloud-starter-bus-kafka
spring-cloud-bus

  改造 user-service-client:使用 Kafka 整合 Spring Cloud Bus

(一)增加依赖

		<!-- 整合 Spring Cloud Bus:Kafka -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bus-kafka</artifactId>
        </dependency>

(二)总线事件传播

1、事件传播类型

  • 单点传播
    • Endpoint:/actuator/bus-refresh/${applicationContextId}:*(POST请求)
    • 案例:localhost:8080/actuator/bus-refresh/user-service-client:8080
  • 集群传播
    • Endpoint:/actuator/bus-refresh/${applicationContextId}:**(POST请求)
    • 案例:localhost:8080/actuator/bus-refresh/user-service-client:**

  备注1:${applicationContextId}:* 一般是 serviceId:port,而 serviceId 就是在 application.properties 中配置的 spring.application.name=user-service-client 属性。

  备注2:消息总线提供的端点 /actuator/bus-refresh/${applicationContextId}:* 和 Actuator 自带的端点 /actuator/refresh 作用是相同的,都是刷新配置项,区别主要在于:
/actuator/refresh:刷新本地配置项
/actuator/bus-refresh/${applicationContextId}:*:刷新远程应用配置项

  问题:如何定位 Application Context ID?
  通过访问 /actuator/beans 确认当前 Application Context ID(PS:2.0版本的找不着。。。)

2、事件传播监听器

(1)通过日志可知 单点传播/集群传播 监听器均为org.springframework.cloud.bus.event.RefreshListener

public class RefreshListener
		implements ApplicationListener<RefreshRemoteApplicationEvent> {

	private static Log log = LogFactory.getLog(RefreshListener.class);

	private ContextRefresher contextRefresher;

	public RefreshListener(ContextRefresher contextRefresher) {
		this.contextRefresher = contextRefresher;
	}

	@Override
	public void onApplicationEvent(RefreshRemoteApplicationEvent event) {
		Set<String> keys = contextRefresher.refresh();
		log.info("Received remote refresh request. Keys refreshed " + keys);
	}
}

  RefreshListener 监听事件 RefreshRemoteApplicationEvent

(2)自定义 RefreshRemoteApplicationEvent 事件监听器,监听 总线事件传播

/**
 * @author 咸鱼
 * @date 2018/11/29 20:44
 */
@Configuration
public class BusConfiguration {
    
    @EventListener
    public void onRefreshRemoteApplicationEvent(RefreshRemoteApplicationEvent event){
        System.out.printf("Source : %s , originService : %s, destinationService : %sn",
                event.getSource(), event.getOriginService(), event.getDestinationService());
    }
}

(三)总线事件跟踪

1、端点:/trace
  默认事件跟踪功能是失效的,需要通过配置项激活:spring.cloud.bus.trace.enabled=true

2、总线内部事件

  • EnvironmentChangeRemoteApplicationEvent
  • RefreshRemoteApplicationEvent
  • AckRemoteApplicationEvent

EnvironmentChangeRemoteApplicationEvent:应用环境变量(env)改变触发该事件,比如执行:POST请求 /actuator/bus-env

RefreshRemoteApplicationEvent:刷新配置项,触发该事件,比如执行:POST请求 /actuator/bus-refresh

3、自定义事件监听器

  我们可以自定义监听器,来监听这两个事件的发生,可以相应的做一些处理,比如:

@Configuration
public class BusConfiguration {

    /**
     * 监听 RefreshRemoteApplicationEvent 事件
     * POST请求 `/actuator/bus-env` 触发
     */
    @EventListener
    public void onRefreshRemoteApplicationEvent(RefreshRemoteApplicationEvent event){
        System.out.printf("RefreshRemoteApplicationEvent - Source : %s , originService : %s, destinationService : %sn",
                event.getSource(), event.getOriginService(), event.getDestinationService());
    }

    /**
     * 监听 EnvironmentChangeRemoteApplicationEvent 事件
     * POST请求 `/actuator/bus-refresh` 触发
     */
    @EventListener
    public void onEnvironmentChangeRemoteApplicationEvent(EnvironmentChangeRemoteApplicationEvent event){
        System.out.printf("EnvironmentChangeRemoteApplicationEvent - Source : %s , originService : %s, destinationService : %sn",
                event.getSource(), event.getOriginService(), event.getDestinationService());
    }
}

(四)自定义事件

  • 事件扩展
    • RemoteApplicationEvent
  • 事件扫描
    • @RemoteApplicationEventScan
  • 事件序列化

1、扩展 RemoteApplicationEvent

/**
 * 自定义事件 {@link RemoteApplicationEvent}
 * @author 咸鱼
 * @date 2018/12/1 10:11
 */
public class UserRemoteApplicationEvent extends RemoteApplicationEvent {
    private static final long serialVersionUID = -1624266233141574546L;

    /**
     * 这个默认构造函数必须有,否则无法将 originService 传递到 目标应用中
     */
    private UserRemoteApplicationEvent() {
    }

    public UserRemoteApplicationEvent(User user, String originService,
                                      String destinationService) {
        super(user, originService, destinationService);
    }
}

2、添加 @RemoteApplicationEventScan

/**
 * 注解 @RemoteApplicationEventScan(basePackageClasses = UserRemoteApplicationEvent.class):
 * 扫面自定义事件
 */
@RemoteApplicationEventScan(basePackageClasses = UserRemoteApplicationEvent.class)
@Configuration
public class BusConfiguration {
    /**
     * 监听自定义的 UserRemoteApplicationEvent 事件
     */
    @EventListener
    public void onUserRemoteApplicationEvent(UserRemoteApplicationEvent event){
        System.out.printf("UserRemoteApplicationEvent - Source : %s , originService : %s, destinationService : %sn",
                event.getSource(), event.getOriginService(), event.getDestinationService());
    }
}

3、发布 RemoteApplicationEvent

/**
 * Bus 事件 Controller
 * @author 咸鱼
 * @date 2018/12/1 10:09
 */
@RestController
public class BusApplicationEventController implements ApplicationContextAware,ApplicationEventPublisherAware{
    /**
     * 事件发布器(通过实现 ApplicationEventPublisherAware 实现自动装载)
     * 补充: AnnotationConfigApplicationContext 是 ApplicationEventPublisher 的一种具体实现
     */
    private ApplicationEventPublisher eventPublisher;

    /**
     * 应用上下文(通过实现 ApplicationContextAware 实现自动装载)
     */
    private ApplicationContext applicationContext;
    
    @Override
    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.eventPublisher = applicationEventPublisher;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext = applicationContext;
    }

    /**
     * 问题:为什么这里发布的的自定义事件,可以被框架监听到?
     *     因为在 BusAutoConfiguration#acceptLocal() 中,注册了下面的监听器:
     @EventListener(classes = RemoteApplicationEvent.class)
     public void acceptLocal(RemoteApplicationEvent event) {
     if (this.serviceMatcher.isFromSelf(event)
     && !(event instanceof AckRemoteApplicationEvent)) {
     this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build());
     }
     }
     *     而我们自定义的事件 UserRemoteApplicationEvent 是 RemoteApplicationEvent 的子类,所以我们在
     * 发布自定义事件以后,可以被框架监听到。
     */
    @PostMapping("/bus/event/publish/user")
    public boolean publishUserEvent(@RequestBody User user,
                                    @RequestParam(value = "destination", required = false) String destination) {
        //获取应用id
        String serviceInstanceId = applicationContext.getId();
        //新建 自定义事件 对象
        UserRemoteApplicationEvent event = new UserRemoteApplicationEvent(user, serviceInstanceId, destination);
        //发布事件
        eventPublisher.publishEvent(event);
        return true;
    }
}

4、监听 RemoteApplicationEvent

/**
     * 监听自定义的 UserRemoteApplicationEvent 事件
     */
    @EventListener
    public void onUserRemoteApplicationEvent(UserRemoteApplicationEvent event){
        System.out.printf("UserRemoteApplicationEvent - Source : %s , originService : %s, destinationService : %sn",
                event.getSource(), event.getOriginService(), event.getDestinationService());
    }

四、源码分析

(一)BusAutoConfiguration

1、监听 Spring Event(本地事件)

	@EventListener(classes = RemoteApplicationEvent.class)
	public void acceptLocal(RemoteApplicationEvent event) {
		if (this.serviceMatcher.isFromSelf(event)
				&& !(event instanceof AckRemoteApplicationEvent)) {
			this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build());
		}
	}

  由于 @EventListener 监听 Spring Event,所以事件 RemoteApplicationEvent 属于本地事件,因必然有发布该事件的源头。

2、监听 Stream 事件(远程事件)

@StreamListener(SpringCloudBusClient.INPUT)
	public void acceptRemote(RemoteApplicationEvent event) {
		if (event instanceof AckRemoteApplicationEvent) {
			if (this.bus.getTrace().isEnabled() && !this.serviceMatcher.isFromSelf(event)
					&& this.applicationEventPublisher != null) {
				this.applicationEventPublisher.publishEvent(event);
			}
			// If it's an ACK we are finished processing at this point
			return;
		}
		if (this.serviceMatcher.isForSelf(event)
				&& this.applicationEventPublisher != null) {
			if (!this.serviceMatcher.isFromSelf(event)) {
				this.applicationEventPublisher.publishEvent(event);
			}
			if (this.bus.getAck().isEnabled()) {
				AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this,
						this.serviceMatcher.getServiceId(),
						this.bus.getAck().getDestinationService(),
						event.getDestinationService(), event.getId(), event.getClass());
				this.cloudBusOutboundChannel
						.send(MessageBuilder.withPayload(ack).build());
				this.applicationEventPublisher.publishEvent(ack);
			}
		}
		if (this.bus.getTrace().isEnabled() && this.applicationEventPublisher != null) {
			// We are set to register sent events so publish it for local consumption,
			// irrespective of the origin
			this.applicationEventPublisher.publishEvent(new SentApplicationEvent(this,
					event.getOriginService(), event.getDestinationService(),
					event.getId(), event.getClass()));
		}
	}

acceptRemote() 监听 Stream 事件,同时发送 Spring Event(本地事件)

  ServiceMatcher#isForSelf() 用于匹配 RemoteApplicationEvent 是否为当前应用实例而来。

this.serviceMatcher.isForSelf(event)

  ServiceMatcher#isFromSelf() 用于判断当前事件是否为自己发送。

this.serviceMatcher.isFromSelf(event)

3、整体流程

  假设 user-service-client:8080 执行 /actuator/bus-refresh 端口,发送一个 RefreshRemoteApplicationEvent 事件:

curl -X POST http://localhost:8080/bus-refresh/user-service-client:8080
  • user-service-client:8080:Bus事件的发布者、监听者
  • user-service-client:8081:Bus事件的监听者
  • user-service-client:8080:Bus事件的监听者

  当 Stream Binder 接收到发布者 RefreshRemoteApplicationEvent 事件,广播该事件到所有的监听者:

  • user-service-client:8080:判断事件不是为自己发送,发布 SentApplicationEvent 事件(主要发布到 /trace 中)
  • user-service-client:8081:判断事件不是为自己发送,发布 SentApplicationEvent 事件(主要发布到 /trace 中)
  • user-service-client:8082:判断事件是为自己发送,执行 RefreshRemoteApplicationEvent 事件监听。如果 ack 激活的(默认激活),cloudBusOutboundChannel 会发送 AckRemoteApplicationEvent 到管道里。可以由以下代码监听:
	/**
     * 监听 AckRemoteApplicationEvent 事件
     */
    @StreamListener(SpringCloudBusClient.OUTPUT)
    public void onAckRemoteApplicationEvent(AckRemoteApplicationEvent event) {
        System.out.printf("AckRemoteApplicationEvent - Source : %s , originService : %s, destinationService : %sn",
                event.getSource(), event.getOriginService(), event.getDestinationService());
    }

最后

以上就是明亮夏天为你收集整理的十五、Spring cloud 消息总线(Bus)的全部内容,希望文章能够帮你解决十五、Spring cloud 消息总线(Bus)所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(64)

评论列表共有 0 条评论

立即
投稿
返回
顶部