【RocketMQ】NameServer的启动
阅读原文时间:2023年07月09日阅读:1

NameServer是一个注册中心,Broker在启动时向所有的NameServer注册,生产者Producer和消费者Consumer可以从NameServer中获取所有注册的Broker列表,并从中选取Broker进行消息的发送和消费。

NameServer的启动类是NamesrvStartup,主要做了两件事情:

  1. 调用createNamesrvController方法创建NamesrvControllerNamesrvController是NameServer的核心

  2. 调用start方法,启动NameServer

    public class NamesrvStartup {

    private static InternalLogger log;
    private static Properties properties = null;
    private static CommandLine commandLine = null;
    
    public static void main(String[] args) {
        // 启动入口
        main0(args);
    }
    
    public static NamesrvController main0(String[] args) {
    try {
        // 创建NamesrvController
        NamesrvController controller = createNamesrvController(args);
        // 启动nameserver
        start(controller);
        String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
        log.info(tip);
        System.out.printf("%s%n", tip);
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }
    
    return null;
    }

    }

createNamesrvController方法主要是对配置信息进行处理:

  1. 创建NamesrvConfig,从名字可以看出是记录NameServer的相关配置信息

  2. 创建NettyServerConfig,与Netty服务相关的配置信息,默认设置监听端口为9876

  3. 从启动命令中检查是否通过- c指定了配置文件,如果指定了配置文件,从指定的路径中加载文件,并将解析文件将配置保存到NamesrvConfigNettyServerConfig

  4. 校验RocketMQ的主目录是否为空,可以在启动命令中通过-Drocketmq.home.dir=路径指定主目录,也可以在操作系统设置环境变量ROCKETMQ_HOME的方式来指定

  5. 处理日志相关的设置

  6. 创建NamesrvController并返回

    public class NamesrvStartup {
    public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
    Options options = ServerUtil.buildCommandlineOptions(new Options());
    commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
    if (null == commandLine) {
    System.exit(-1);
    return null;
    }
    // Nameserver相关配置
    final NamesrvConfig namesrvConfig = new NamesrvConfig();
    // Netty服务器连接相关配置
    final NettyServerConfig nettyServerConfig = new NettyServerConfig();
    // 设置端口
    nettyServerConfig.setListenPort(9876);
    // 如果启动命令中指定了配置文件
    if (commandLine.hasOption('c')) {
    String file = commandLine.getOptionValue('c');
    if (file != null) {
    // 获取文件
    InputStream in = new BufferedInputStream(new FileInputStream(file));
    properties = new Properties();
    properties.load(in);
    // 解析配置文件
    MixAll.properties2Object(properties, namesrvConfig);
    MixAll.properties2Object(properties, nettyServerConfig);
    namesrvConfig.setConfigStorePath(file);

                System.out.printf("load config properties file OK, %s%n", file);
                in.close();
            }
        }
        // 如果启动命令中带了-p参数,打印NameServer的相关配置信息
        if (commandLine.hasOption('p')) {
            InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
            MixAll.printObjectProperties(console, namesrvConfig);
            MixAll.printObjectProperties(console, nettyServerConfig);
            System.exit(0);
        }
        // 将启动命令中的一些设置记录到namesrvConfig
        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
        // 如果RocketMQ主目录获取为空,打印异常信息
        if (null == namesrvConfig.getRocketmqHome()) {
            System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
            System.exit(-2);
        }
        // 日志相关设置
        LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
        JoranConfigurator configurator = new JoranConfigurator();
        configurator.setContext(lc);
        lc.reset();
        configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
    log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    
    MixAll.printObjectProperties(log, namesrvConfig);
    MixAll.printObjectProperties(log, nettyServerConfig);
    // 创建NamesrvController
    final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
    controller.getConfiguration().registerConfig(properties);
    
    return controller;
    }

    }

NameServer的启动主要通过NamesrvController进行,处理逻辑如下:

  1. 调用NamesrvController的initialize函数进行初始化

  2. 注册JVM关闭钩子函数,在JVM关闭的时候,调用NamesrvController的shutdown方法关闭相关资源

  3. 调用NamesrvController的start方法启动NameServer

    public class NamesrvStartup {
    public static NamesrvController start(final NamesrvController controller) throws Exception {
    if (null == controller) {
    throw new IllegalArgumentException("NamesrvController is null");
    }
    // 初始化NamesrvController
    boolean initResult = controller.initialize();
    if (!initResult) {
    controller.shutdown();
    System.exit(-3);
    }
    // 注册JVM关闭钩子函数
    Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable() {
    @Override
    public Void call() throws Exception {
    controller.shutdown();
    return null;
    }
    }));
    // 启动nameserver
    controller.start();

        return controller;
    }

    }

NamesrvController

初始化

NamesrvController的初始化方法中主要做了如下操作:

  1. 加载配置信息,主要是从kvConfig.json中加载数据

  2. 创建NettyRemotingServer,用于网络通信

  3. 根据设置的工作线程数量创建netty服务相关线程池

  4. 注册处理器DefaultRequestProcessor,用于处理收到的请求,比如Broker发起的注册请求

  5. 注册用于心跳检测的定时任务,定时扫描处于不活跃状态的Broker并剔除

  6. 注册定时打印KV信息的任务

    public class NamesrvController {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

    private final NamesrvConfig namesrvConfig; // NameServer相关配置
    
    private final NettyServerConfig nettyServerConfig; // Netty服务相关配置
    
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
        "NSScheduledThread")); // 定时执行任务的线程池
    private final KVConfigManager kvConfigManager;
    private final RouteInfoManager routeInfoManager; // 路由表
    
    private RemotingServer remotingServer; // 远程服务,使用的是NettyRemotingServer
    
    private BrokerHousekeepingService brokerHousekeepingService;
    
    private ExecutorService remotingExecutor; // Netty服务相关线程池
    
    // ...
    
    public boolean initialize() {
        // 加载配置信息
        this.kvConfigManager.load();
        // 创建NettyRemotingServer
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
        // 创建netty服务相关线程池
        this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
        // 注册处理器
        this.registerProcessor();
        // 定时任务,扫描Broker
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            // 心跳监测扫描处于不活跃状态的Broker
            NamesrvController.this.routeInfoManager.scanNotActiveBroker();
        }
    }, 5, 10, TimeUnit.SECONDS);
    // 定时打印KV配置信息
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
        @Override
        public void run() {
            NamesrvController.this.kvConfigManager.printAllPeriodically();
        }
    }, 1, 10, TimeUnit.MINUTES);
    
    // ....
    
    return true;
    } // 注册处理器 private void registerProcessor() { if (namesrvConfig.isClusterTest()) {
        this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
            this.remotingExecutor);
    } else {
        // 注册DefaultRequestProcessor处理请求
        this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
    }
    }

    }

启动

在启动方法中,主要是调用了RemotingServer的start方法启动服务,在NamesrvController的初始化方法中可知,使用的实现类是NettyRemotingServer,所以之后会启动Netty服务:

    public void start() throws Exception {
        // 启动Netty服务
        this.remotingServer.start();

        if (this.fileWatchService != null) {
            this.fileWatchService.start();
        }
    }
Netty启动

NettyRemotingServer的start方法中主要是对Netty的一些设置,然后绑定端口并启动服务:

public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {

    @Override
    public void start() {
        // ...

        prepareSharableHandlers();
        // Netty相关设置
        ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) // 设置EventLoopGroup线程组
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) // 设置channel类型
                .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog())
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) // 设置端口
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception { // 设置ChannelHandler
                        ch.pipeline()
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                            .addLast(defaultEventExecutorGroup,
                                encoder,
                                new NettyDecoder(),
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                connectionManageHandler,
                                serverHandler
                            );
                    }
                });
        if (nettyServerConfig.getServerSocketSndBufSize() > 0) {
            log.info("server set SO_SNDBUF to {}", nettyServerConfig.getServerSocketSndBufSize());
            // 设置Socket发送缓存区大小
            childHandler.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize());
        }
        if (nettyServerConfig.getServerSocketRcvBufSize() > 0) {
            log.info("server set SO_RCVBUF to {}", nettyServerConfig.getServerSocketRcvBufSize());
            // 设置Socket接收缓存区大小
            childHandler.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize());
        }
        if (nettyServerConfig.getWriteBufferLowWaterMark() > 0 && nettyServerConfig.getWriteBufferHighWaterMark() > 0) {
            log.info("server set netty WRITE_BUFFER_WATER_MARK to {},{}",
                    nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark());
            childHandler.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
                    nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark()));
        }

        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }

        try {
            // 绑定端口并启动服务
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }

        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }

        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    NettyRemotingServer.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }
}

参考

丁威、周继锋《RocketMQ技术内幕》

RocketMQ版本:4.9.3