消息队列(二)--- RocketMQ-NameServer阅读
阅读原文时间:2023年07月12日阅读:3

  所有broker在启动的时候都会向NameServer进行注册,对它进行发送心跳包。

我们先从 NamesrvStartup这个类分析


public static void main(String\[\] args) {  
    main0(args);  
}

public static NamesrvController main0(String\[\] args) {

    try {  
        //创建NamesrvController  
        NamesrvController controller = createNamesrvController(args);  
        //启动  
        start(controller);  
        String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();  
        log.info(tip);  
        System.out.printf("%s%n", tip);  
        return controller;  
    } catch (Throwable e) {  
        e.printStackTrace();  
        System.exit(-1);  
    }

    return null;  
}

看一下 NamesrvController 这个类有什么东西

public class NamesrvController {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

private final NamesrvConfig namesrvConfig;

private final NettyServerConfig nettyServerConfig;

//时间线程池  
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(  
    "NSScheduledThread"));  
private final KVConfigManager kvConfigManager;  
//路由相关  
private final RouteInfoManager routeInfoManager;  
//远程管理相关  
private RemotingServer remotingServer;

private BrokerHousekeepingService brokerHousekeepingService;

//线程池  
private ExecutorService remotingExecutor;

private Configuration configuration;  
private FileWatchService fileWatchService;

...  

}

主要是 配置 + manager + 线程池


我们先来看一下 RouteInfoManager .

public class RouteInfoManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
//过期时间
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;

//读写锁(适合读多写少), 读的时候可以获取写锁吗?  
//一个线程获取读锁的时候,另外一个线程是否可以获取到写锁  
//答 : 可以  
private final ReadWriteLock lock = new ReentrantReadWriteLock();

//主题对应信息  
private final HashMap<String/\* topic \*/, List<QueueData>> topicQueueTable;

//Broker 基础信息,包括所有的 broker  
private final HashMap<String/\* brokerName \*/, BrokerData> brokerAddrTable;

//Broker集群信息,存储集群中所有broker名称(注意是集群中!!)  
private final HashMap<String/\* clusterName \*/, Set<String/\* brokerName \*/>> clusterAddrTable;

//存活的 Broker  
//收到心跳包时会更新信息会激活,key 是 ip 地址  
private final HashMap<String/\* brokerAddr \*/, BrokerLiveInfo> brokerLiveTable;

//用于过滤  
private final HashMap<String/\* brokerAddr \*/, List<String>/\* Filter Server \*/> filterServerTable;

...  
...

}

  上面的字段在可以通过下面两张图来理解

路由注册的逻辑在 BrokerController 的 start方法内,


    this.registerBrokerAll(true, false);

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override  
        public void run() {  
            try {  
                //向 NameServer 注册所有的 broker,发送心跳包给 NameServer  
                BrokerController.this.registerBrokerAll(true, false);  
            } catch (Throwable e) {  
                log.error("registerBrokerAll Exception", e);  
            }  
        }  
    }, 1000 \* 10, 1000 \* 30, TimeUnit.MILLISECONDS);

registerBrokerAll 方法

public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway) {  
    TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();

    if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())  
        || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {  
        ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();  
        for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {  
            TopicConfig tmp =  
                new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),  
                    this.brokerConfig.getBrokerPermission());  
            topicConfigTable.put(topicConfig.getTopicName(), tmp);  
        }  
        topicConfigWrapper.setTopicConfigTable(topicConfigTable);  
    }

    //注册方法  
    RegisterBrokerResult registerBrokerResult = this.brokerOuterAPI.registerBrokerAll(  
        this.brokerConfig.getBrokerClusterName(),  
        this.getBrokerAddr(),  
        this.brokerConfig.getBrokerName(),  
        this.brokerConfig.getBrokerId(),  
        this.getHAServerAddr(),  
        topicConfigWrapper,  
        this.filterServerManager.buildNewFilterServerList(),  
        oneway,  
        this.brokerConfig.getRegisterBrokerTimeoutMills());

    if (registerBrokerResult != null) {  
        if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {  
            this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());  
        }

        this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());

        if (checkOrderConfig) {  
            this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());  
        }  
    }  
}

brokerOuterAPI 的 registerBrokerAll 方法

/\*\*  
 \* broker 向 NS 的注册方法  
 \*/  
public RegisterBrokerResult registerBrokerAll(  
    final String clusterName,  
    final String brokerAddr,  
    final String brokerName,  
    final long brokerId,  
    final String haServerAddr,  
    final TopicConfigSerializeWrapper topicConfigWrapper,  
    final List<String> filterServerList,  
    final boolean oneway,  
    final int timeoutMills) {  
    RegisterBrokerResult registerBrokerResult = null;

    List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();  
    if (nameServerAddressList != null) {  
        for (String namesrvAddr : nameServerAddressList) {  
            try {  
                RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,  
                    haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills);  
                if (result != null) {  
                    registerBrokerResult = result;  
                }

                log.info("register broker to name server {} OK", namesrvAddr);  
            } catch (Exception e) {  
                log.warn("registerBroker Exception, {}", namesrvAddr, e);  
            }  
        }  
    }

    return registerBrokerResult;  
}

private RegisterBrokerResult registerBroker(  
    final String namesrvAddr,  
    final String clusterName,  
    final String brokerAddr,  
    final String brokerName,  
    final long brokerId,  
    final String haServerAddr,  
    final TopicConfigSerializeWrapper topicConfigWrapper,  
    final List<String> filterServerList,  
    final boolean oneway,  
    final int timeoutMills  
) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,  
    InterruptedException {  
    //No.1 封装请求头和请求body  
    RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();  
    requestHeader.setBrokerAddr(brokerAddr);  
    requestHeader.setBrokerId(brokerId);  
    requestHeader.setBrokerName(brokerName);  
    requestHeader.setClusterName(clusterName);  
    requestHeader.setHaServerAddr(haServerAddr);  
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER\_BROKER, requestHeader);

    RegisterBrokerBody requestBody = new RegisterBrokerBody();  
    requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);  
    requestBody.setFilterServerList(filterServerList);  
    request.setBody(requestBody.encode());

    //No.2 发送  
    if (oneway) {  
        try {  
            //发送,remotingClient  
            this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);  
        } catch (RemotingTooMuchRequestException e) {  
            // Ignore  
        }  
        return null;  
    }

    RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);  
    assert response != null;  
    switch (response.getCode()) {  
        case ResponseCode.SUCCESS: {  
            RegisterBrokerResponseHeader responseHeader =  
                (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);  
            RegisterBrokerResult result = new RegisterBrokerResult();  
            result.setMasterAddr(responseHeader.getMasterAddr());  
            result.setHaServerAddr(responseHeader.getHaServerAddr());  
            if (response.getBody() != null) {  
                result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));  
            }  
            return result;  
        }  
        default:  
            break;  
    }

    throw new MQBrokerException(response.getCode(), response.getRemark());  
}

  其中 RemotingClient 是个接口,它的结构图如下   具体的子类是 NettyRemotingClient ,这个类留着后续分析。   NS 处理心跳包的逻辑在 RouteInfoManager 的 registerBroker 方法,这里不再分析源码实现(对字段保存的对应信息进行增删改)。

路由发现

  发现路由变化不会主动push到 producer ,而是 producer 主动到 NS 中去获取。RocketMQ路由实体 TopicRouteData

public class TopicRouteData extends RemotingSerializable {
//顺序消息配置信息,来自于 kvConfig
private String orderTopicConf;
//多个broker 订阅了某个 topic ,所以一个 topic可能对应着多个 broker
private List queueDatas;
//多个broker 的信息
private List brokerDatas;
//过滤
private HashMap/* Filter Server */> filterServerTable;

...  
...  

}

  NameServer 路由发现实 现类 : DefaultRequestProcessor#getRoutelnfoByTopic

public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,  
    RemotingCommand request) throws RemotingCommandException {  
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);  
    final GetRouteInfoRequestHeader requestHeader =  
        (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);

    TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());

    if (topicRouteData != null) {  
        if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {  
            String orderTopicConf =  
                this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE\_ORDER\_TOPIC\_CONFIG,  
                    requestHeader.getTopic());  
            topicRouteData.setOrderTopicConf(orderTopicConf);  
        }

        byte\[\] content = topicRouteData.encode();  
        response.setBody(content);  
        response.setCode(ResponseCode.SUCCESS);  
        response.setRemark(null);  
        return response;  
    }

    response.setCode(ResponseCode.TOPIC\_NOT\_EXIST);  
    response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()  
        + FAQUrl.suggestTodo(FAQUrl.APPLY\_TOPIC\_URL));  
    return response;  
}

//RouteInfoManager#pickupTopicRouteData 方法 

public TopicRouteData pickupTopicRouteData(final String topic) {  
    TopicRouteData topicRouteData = new TopicRouteData();  
    boolean foundQueueData = false;  
    boolean foundBrokerData = false;  
    Set<String> brokerNameSet = new HashSet<String>();  
    List<BrokerData> brokerDataList = new LinkedList<BrokerData>();  
    topicRouteData.setBrokerDatas(brokerDataList);

    HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();  
    topicRouteData.setFilterServerTable(filterServerMap);

    try {  
        try {  
            this.lock.readLock().lockInterruptibly();  
            List<QueueData> queueDataList = this.topicQueueTable.get(topic);  
            if (queueDataList != null) {  
                topicRouteData.setQueueDatas(queueDataList);  
                foundQueueData = true;

                Iterator<QueueData> it = queueDataList.iterator();  
                while (it.hasNext()) {  
                    QueueData qd = it.next();  
                    brokerNameSet.add(qd.getBrokerName());  
                }

                for (String brokerName : brokerNameSet) {  
                    BrokerData brokerData = this.brokerAddrTable.get(brokerName);  
                    if (null != brokerData) {  
                        BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData  
                            .getBrokerAddrs().clone());  
                        brokerDataList.add(brokerDataClone);  
                        foundBrokerData = true;  
                        for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {  
                            List<String> filterServerList = this.filterServerTable.get(brokerAddr);  
                            filterServerMap.put(brokerAddr, filterServerList);  
                        }  
                    }  
                }  
            }  
        } finally {  
            this.lock.readLock().unlock();  
        }  
    } catch (Exception e) {  
        log.error("pickupTopicRouteData Exception", e);  
    }

    if (log.isDebugEnabled()) {  
        log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);  
    }

    if (foundBrokerData && foundQueueData) {  
        return topicRouteData;  
    }

    return null;  
} 

  文章总结 NS 相关的路由管理逻辑。