你可能没有看过 RocketMQ 的架构图,没关系,一起来学习一下,RocketMQ 架构图如下:
在 RocketMQ 中,有四个角色:
- Producer:消息的生产者,每个 MQ 中间件都有。
- Consumer:消息的消费者,每个 MQ 中间件都有。
- NameServer:RocketMQ 的路由中心,跟 ZooKeeper 差不多。
- Broker:消息服务器,RocketMQ 的消息全部存储在这里。
Producer 发送消息之前,先从 NameServer 中获取到 Broker 服务器列表,然后根据负载均衡策略选择一台 Broker 发送,消息消费时也是同样的道理。可以说 NameServer 是 RocketMQ 的大脑,想要实现路由分发的功能,那么在 NameServer 必然要维护着 Broker 服务器信息,这中间就会涉及到 Broker 服务器服务状态管理问题,这篇文章就来聊一聊 RocketMQ 是如何做服务状态管理的。
在聊服务状态管理之前,先来讲一讲为何不用 ZooKeeper 来做路由中心?
听闻早期的 RocketMQ 是使用 ZooKeeper 来做路由中心。我们知道 ZooKeeper 功能比较强大,包括自动 Master 选举等,强大的同时部署维护就变得复杂了,但是 ZooKeeper 的很多功能 RocketMQ 并不需要,RocketMQ 只需要一个轻量级的元数据服务器就够了。所以就造了 NameServer 这个轮子。
还有一个原因就是中间件对稳定性要求比较高,使用 ZooKeeper 作为注册和路由中心的话,就依赖了另一个中间件,提高了系统复杂性和维护成本,而 NameServer 只是 RocketMQ 中的一个模块,且只有少量代码,维护起来简单,稳定性也提高了。
好了,说回服务状态管理问题,其实这个并不陌生,在微服务领域有大量的中间件都涉及到了这个问题。对于服务状态管理,一般有两种解决思路。
第一种思路是主动探测,如图:
主动探测是由路由方(比如 NameServer)发起的,每一个被路由方(比如 Broker)需要打开一个端口,然后路由方每隔一段时间(比如 30 秒)探测这些端口是否可用,如果可用就认为服务器正常,否则认为服务不可用,就把服务从列表中删除。
这种方式存在的问题就路由方压力可能过大,如果被路由方部署的实例较多时,那么每次探测的成本会比较高,探测的时间也比较长,可能会导致路由方可能不能正常工作。
第二种思路是心跳模式,如图:
心跳模式不在是路由方发起了,改成被路由方每隔一段时间向路由方发送心跳包,路由方记录被路由方的心跳包,包括服务器IP、上报时间等。每一次上报后,更新对应的信息。路由方启动一个定时器,定期检测当前时间和节点,最近续约时间的差值,如果达到一个阈值(比如说90秒),那么认为这个服务节点不可用。
现在大部分需要服务状态管理的中间件,都采用心跳模式,没有太多的缺陷,也不会对服务器造成多大的压力。在 RocketMQ 中 NameServer 与 Broker 的通信也是采用 心跳模式。
心跳模式中,有上报心跳、保存心跳信息、定时检测这个步骤。我们从上报心跳和定时检测这两个方面,从源码的角度,看看 RocketMQ 是如何实现心跳模式的。
先从上报心跳开始,在 RocketMQ 中,默认情况下,Broker 服务器会每间隔 30秒向集群中的所有 NameServer 发送心跳包。源代码是BrokerController#start()
,如下代码:
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
// brokerConfig.getRegisterNameServerPeriod() 默认是 30 秒
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
其中上报心跳的时间用户是可以自定义的,但是不会低于 10秒高于 60秒。当然这只是一个定时器,具体发送心跳包的方法是org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll()
,代码如下:
public List<RegisterBrokerResult> registerBrokerAll(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final boolean oneway,
final int timeoutMills,
final boolean compressed) {
final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
// 获取所有 NameServer 服务器
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
// 构建 broker 信息
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setCompressed(compressed);
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
final byte[] body = requestBody.encode(compressed);
final int bodyCrc32 = UtilAll.crc32(body);
requestHeader.setBodyCrc32(bodyCrc32);
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
// 向 NameServer 逐个上报
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run() {
try {
RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
if (result != null) {
registerBrokerResultList.add(result);
}
log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
} finally {
countDownLatch.countDown();
}
}
});
}
try {
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
}
return registerBrokerResultList;
}
心跳包发送完之后,就是 NameServer 处理心跳包了,NameServer 会将心跳信息保存起来,保存心跳信息的源代码我就不贴了,涉及的东西比较多,有兴趣的可以查看org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest()#RequestCode.REGISTER_BROKER
,一步一步 Debug 就知道保存过程。
来看看最后一个操作定时检测,NameServer 会开启一个探测线程,源代码在org.apache.rocketmq.namesrv.NamesrvController#initialize()
下,代码如下:
// 检测 broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
NameServer 每 10秒会发起一次检测。具体检测源代码是org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#scanNotActiveBroker()
,代码如下:
/**
* 检测 broker 状态
*/
public void scanNotActiveBroker() {
// 遍历 broker 存活列表
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
// 如果最后一次上报时间已经超过两分钟,则移出
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
}
NameServer 会遍历 Broker 存活列表,如果最后一次发送心跳包的时间超过 120秒,则认为 Broker 服务器不可用,将 Broker 从各种配置列表中移出。
到此为止,RocketMQ 的心跳模式实现就完成了,上面的源代码都是一些粗略的,具体的实现细节还是比较繁琐的,有兴趣的可以深入研究源码,获取更多详细信息。
关于RocketMQ 解决服务状态管理的分享就这些,感谢您的阅读,希望这篇文章对您的学习或者工作有一点帮助。有收获的话,也可以帮忙推荐给其他的小伙伴,让更多的人受益,万分感谢。
最后
目前互联网上很多大佬都有 RocketMQ 相关文章,如有雷同,请多多包涵了。原创不易,码字不易,还希望大家多多支持。若文中有所错误之处,还望提出,谢谢。
欢迎关注公众号【互联网平头哥】。这里有职场感悟、Java 技术,虽然不高大上,但通俗易懂。今天最好的是明天最低的要求,愿你我共同进步。
评论前必须登录!
注册