登录
注册
写文章
发现
工具
服务停止过程中MQ还在消费?
_3t3lfz KEKfID
编辑文章
服务停止过程中MQ还在消费?
asfx站长
2023.09.27 16:32:53
阅读
674
最近生产环境收到短信告警说MQ消费失败,原来是那个时候小伙伴在发布服务,但是如果每次发布服务都造成MQ消费失败不是很友好啊,怎么解决这个问题呢?前阵子我们利用Nacos实现了服务的优雅下线,那么能不能在Nacos客户端监听到服务下线,然后关闭MQ消费呢? 这里我们使用的是RocketMQ,查了文档确实有相关的停止方法,现在就差Nacos客户端监听服务变化的方法了,查了下也有,哈哈~ 那么就开始撸代码吧! ``` import com.alibaba.fastjson.JSON; import com.alibaba.nacos.client.naming.event.InstancesChangeEvent; import com.alibaba.nacos.common.notify.Event; import com.alibaba.nacos.common.notify.NotifyCenter; import com.alibaba.nacos.common.notify.listener.Subscriber; import com.alibaba.nacos.common.utils.CollectionUtils; import com.aliyun.openservices.ons.api.Admin; import com.aliyun.openservices.ons.api.Producer; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.Map; /** * Nacos服务变化事件监听 * 当服务下线时,关闭RocketMQ消费者;当服务上线时,开启RocketMQ消费者; */ @Slf4j @Component public class NacosServiceChangedListener extends Subscriber<InstancesChangeEvent> { @Value("${dubbo.service.xx.spi.group}") private String spiGroup; @Value("${dubbo.service.xx.spi.version}") private String spiVersion; @Value("${DUBBO_IP_TO_REGISTRY:}") private String dubboRegistryIp; @Value("${project.env:}") private String env; /** * 用来标志Nacos服务是否启动 */ private final AtomicBoolean serviceStarted = new AtomicBoolean(false); @PostConstruct public void registerToNotifyCenter() { NotifyCenter.registerSubscriber(this); } @Override public void onEvent(InstancesChangeEvent event) { // 只关注本应用服务 String serviceNameCheck = String.format("providers:com.xxx.Service:%s:%s", spiVersion, spiGroup); if (StringUtils.isBlank(event.getServiceName()) || !event.getServiceName().endsWith(serviceNameCheck)) { return; } log.info("监听到nacos的服务实例变化:{}", JSON.toJSONString(event)); if (StringUtils.isBlank(dubboRegistryIp)) { dubboRegistryIp = IpUtil.getIp(); } if (!CollectionUtils.isEmpty(event.getHosts()) && event.getHosts().stream().anyMatch(instance -> dubboRegistryIp.equals(instance.getIp()))) { log.info("检测到当前服务启动了,serviceNameCheck={},dubboRegistryIp={}", serviceNameCheck, dubboRegistryIp); if (!serviceStarted.getAndSet(true)) { Map<String, Admin> consumers = SpringContextHolder.getBeansOfType(Admin.class); consumers.forEach((k, v) -> { try { if (!(v instanceof Producer) && !v.isStarted()) { log.info("正在启动RocketMQ消费者:{}", k); v.start(); } } catch (Exception e) { log.error("启动RocketMQ消费者异常,e={}", e); } }); } } else { log.info("检测到当前服务不在线,serviceNameCheck={},dubboRegistryIp={}", serviceNameCheck, dubboRegistryIp); if (serviceStarted.getAndSet(false)) { //从容器中获取 Admin 接口的实例, 关闭它 Map<String, Admin> consumers = SpringContextHolder.getBeansOfType(Admin.class); consumers.forEach((k, v) -> { try { if (!(v instanceof Producer)) { log.info("正在关闭RocketMQ消费者:{}", k); v.shutdown(); } } catch (Exception e) { log.error("关闭RocketMQ消费者异常,e={}", e); } }); } } } @Override public Class<? extends Event> subscribeType() { return InstancesChangeEvent.class; } } ```
我的主页
退出