##整体流程
默认gitlab的webhook更新调用config server的/monitor(spring-cloud-config-monitor)触发RefreshRemoteApplicationEvent事件,然后spring cloud bus的StreamListener监听RemoteApplicationEvent,通过mq发布到每个config client,然后client接收RemoteApplicationEvent事件来实现refresh。
##PropertyPathEndpoint/monitor
@RequiredArgsConstructor@RestController@RequestMapping(path = "${spring.cloud.config.monitor.endpoint.path:}/monitor")@CommonsLogpublic class PropertyPathEndpoint implements ApplicationEventPublisherAware, ApplicationContextAware {@RequestMapping(method = RequestMethod.POST) public SetnotifyByPath(@RequestHeader HttpHeaders headers, @RequestBody Map request) { PropertyPathNotification notification = this.extractor.extract(headers, request); if (notification != null) { Set services = new LinkedHashSet<>(); for (String path : notification.getPaths()) { services.addAll(guessServiceName(path)); } if (this.applicationEventPublisher != null) { for (String service : services) { log.info("Refresh for: " + service); this.applicationEventPublisher .publishEvent(new RefreshRemoteApplicationEvent(this, this.contextId, service)); } return services; } } return Collections.emptySet(); }}
##MQ接收RefreshRemoteApplicationEvent发布message(BusAutoConfiguration
)
@EventListener(classes = RemoteApplicationEvent.class) public void acceptLocal(RemoteApplicationEvent event) { if (this.serviceMatcher.isFromSelf(event) && !(event instanceof AckRemoteApplicationEvent)) { this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build()); } }
##每个client的MQ接收message
@StreamListener(SpringCloudBusClient.INPUT) public void acceptRemote(RemoteApplicationEvent event) { if (event instanceof AckRemoteApplicationEvent) { if (this.bus.getTrace().isEnabled() && !this.serviceMatcher.isFromSelf(event) && this.applicationEventPublisher != null) { this.applicationEventPublisher.publishEvent(event); } // If it's an ACK we are finished processing at this point return; } if (this.serviceMatcher.isForSelf(event) && this.applicationEventPublisher != null) { if (!this.serviceMatcher.isFromSelf(event)) { this.applicationEventPublisher.publishEvent(event); } if (this.bus.getAck().isEnabled()) { AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this, this.serviceMatcher.getServiceId(), this.bus.getAck().getDestinationService(), event.getDestinationService(), event.getId(), event.getClass()); this.cloudBusOutboundChannel .send(MessageBuilder.withPayload(ack).build()); this.applicationEventPublisher.publishEvent(ack); } } if (this.bus.getTrace().isEnabled() && this.applicationEventPublisher != null) { // We are set to register sent events so publish it for local consumption, // irrespective of the origin this.applicationEventPublisher.publishEvent(new SentApplicationEvent(this, event.getOriginService(), event.getDestinationService(), event.getId(), event.getClass())); } }
##本地监听RefreshListener
public class RefreshListener implements ApplicationListener{ private static Log log = LogFactory.getLog(RefreshListener.class); private ContextRefresher contextRefresher; public RefreshListener(ContextRefresher contextRefresher) { this.contextRefresher = contextRefresher; } @Override public void onApplicationEvent(RefreshRemoteApplicationEvent event) { Set keys = contextRefresher.refresh(); log.info("Received remote refresh request. Keys refreshed " + keys); }}
##docs