消息消费到达主服务器后需要将消息同步到从服务器,如果主服务器Broker宕机后,消息消费者可以从从服务器拉取消息。
HAService:RocketMQ主从同步核心实现类
HAService$AcceptSocketService:HAMaster端监听客户端连接实现类
HAService$GroupTransferService:主从同步通知实现类
HAService$HAClient:HA Client端实现类
HAConnection:HA Master服务端HA链接对象的封装,与Broker从服务器的网络读写实现类
HAConnection$ReadSocketService:HA MAster网络读实现类
HAConnection$WriteSocketService:HA Master网络写实现类
org.apache.rocketmq.store.ha.HAService#start:
public void start() throws Exception {
this.acceptSocketService.beginAccept();
this.acceptSocketService.start();
this.groupTransferService.start();
this.haClient.start();
}
1.主服务器启动,并在特定端口上监听从服务器连接。
2.从服务器主动链接主服务器,主服务器接收客户端的连接,并建立相关TCP连接。
3.从服务器主动想主服务器发送待拉取消息偏移量,主服务器解析请求并返回消息给从服务器。
4.从服务器保存消息并继续发送新的消息同步请求。
class AcceptSocketService extends ServiceThread {
private final SocketAddress socketAddressListen;//Broker服务监套接字
private ServerSocketChannel serverSocketChannel;//服务端socket通道,基于NIO
private Selector selector;//事件选择器,基于NIO
org.apache.rocketmq.store.ha.HAService.AcceptSocketService#beginAccept
public void beginAccept() throws Exception {
this.serverSocketChannel = ServerSocketChannel.open();
this.selector = RemotingUtil.openSelector();
this.serverSocketChannel.socket().setReuseAddress(true);
this.serverSocketChannel.socket().bind(this.socketAddressListen);
this.serverSocketChannel.configureBlocking(false);
this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.selector.select(1000);
Set<SelectionKey> selected = this.selector.selectedKeys();
if (selected != null) {
for (SelectionKey k : selected) {
if ((k.readyOps() & SelectionKey.OP\_ACCEPT) != 0) {
SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
if (sc != null) {
HAService.log.info("HAService receive new connection, "
+ sc.socket().getRemoteSocketAddress());
try {
HAConnection conn = new HAConnection(HAService.this, sc);
conn.start();
HAService.this.addConnection(conn);
} catch (Exception e) {
log.error("new HAConnection exception", e);
sc.close();
}
}
} else {
log.warn("Unexpected ops in select " + k.readyOps());
}
}
selected.clear();
}
} catch (Exception e) {
log.error(this.getServiceName() + " service has exception.", e);
}
}
log.info(this.getServiceName() + " service end");
}
每个连接会创建一个HAConnection对象,该HAConnection江负责M-S数据同步逻辑。
GroupTransferService主从同步阻塞实现,如果是同步主从模式,消息发送者将消息刷写到磁盘后,需要继续等待新数据被传输到从服务器,从服务器数据的复制是在另外一个线程HAConnection中去拉取。
所以消息发送者在这里需要等待数据传输的结果。GroupTransferService就是实现该功能:
private void doWaitTransfer() {
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {
for (CommitLog.GroupCommitRequest req : this.requestsRead) {
boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now()
+ HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout();
while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) {
this.notifyTransferObject.waitForRunning(1000);
transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
}
if (!transferOK) {
log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
}
req.wakeupCustomer(transferOK);
}
this.requestsRead.clear();
}
}
}
GroupTransferService的指责是否则当主从同步复制结束后通知由于等待HA同步结果而阻塞的消息发送者线程。判断主从同步是否完成的依据是Slave中已成功复制的最大偏移量是否大于等于消息生产者发送消息后消息服务端返回下一条消息的起始偏移量。如果是则表示主从同步复制已经完成,唤醒消息发送线程,否则等待1s再次判断,每一个任务在一批任务循环判断5次。消息发送者返回有两种情况:等待超过5s或GroupTransferService通知主从复制完成。
HAClient是主从同步Slave端的核心实现类:
class HAClient extends ServiceThread {
private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;//socket读缓存区大小
private final AtomicReference
private final ByteBuffer reportOffset = ByteBuffer.allocate(8);//Slave
private SocketChannel socketChannel;//网络传输通道
private Selector selector;//NIO事件选择器
private long lastWriteTimestamp = System.currentTimeMillis();上次写入时间戳
private long currentReportedOffset = 0;反馈Slave当前的复制进度,commitlog文件最大偏移量
private int dispatchPosition = 0;本次已处理读缓存区的指针。
private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ\_MAX\_BUFFER\_SIZE);//读缓冲区,4M
private ByteBuffer byteBufferBackup = ByteBuffer.allocate(READ\_MAX\_BUFFER\_SIZE);//读缓冲区备份,与BufferRead进行交换
org.apache.rocketmq.store.ha.HAService.HAClient#run
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
if (this.connectMaster()) {
if (this.isTimeToReportOffset()) {
boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
if (!result) {
this.closeMaster();
}
}
this.selector.select(1000);
boolean ok = this.processReadEvent();
if (!ok) {
this.closeMaster();
}
if (!reportSlaveMaxOffsetPlus()) {
continue;
}
long interval =
HAService.this.getDefaultMessageStore().getSystemClock().now()
- this.lastWriteTimestamp;
if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
.getHaHousekeepingInterval()) {
log.warn("HAClient, housekeeping, found this connection\[" + this.masterAddress
+ "\] expired, " + interval);
this.closeMaster();
log.warn("HAClient, master not response some time, so close connection");
}
} else {
this.waitForRunning(1000 \* 5);
}
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
this.waitForRunning(1000 \* 5);
}
}
log.info(this.getServiceName() + " service end");
}
private boolean connectMaster() throws ClosedChannelException {
if (null == socketChannel) {
String addr = this.masterAddress.get();
if (addr != null) {
SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
if (socketAddress != null) {
this.socketChannel = RemotingUtil.connect(socketAddress);
if (this.socketChannel != null) {
this.socketChannel.register(this.selector, SelectionKey.OP\_READ);
}
}
}
this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
this.lastWriteTimestamp = System.currentTimeMillis();
}
return this.socketChannel != null;
}
尝试连接master, 建立到Master的tcp链接,然后注册OP_READ,初始化currentReportedOffset为commitlog文件的最大偏移量,lastWriteTimestamp上次写入时间戳为当前时间戳。
在Broker启动时,如果Broker角色为SLAVE时将读取Broker配置文件中的haMasterAddress属性并更新HAClient的masterAddress,如果角色为SLAVE
并且haMasterAddress为空,启动并不会报错,但不回执行主从同步复制,该方法最终返回是否成功连接上Master。
private boolean isTimeToReportOffset() {
long interval =
HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp;
boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig()
.getHaSendHeartbeatInterval();
return needHeart;
}
判断是否需要向Master反馈当前待拉取偏移量,Master与Slave的HA心跳发送间隔默认为5s,可通过配置haSendHeartbeatInterval来改变心跳间隔。
private boolean reportSlaveMaxOffset(final long maxOffset) {
this.reportOffset.position(0);
this.reportOffset.limit(8);
this.reportOffset.putLong(maxOffset);
this.reportOffset.position(0);
this.reportOffset.limit(8);
for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
try {
this.socketChannel.write(this.reportOffset);
} catch (IOException e) {
log.error(this.getServiceName()
+ "reportSlaveMaxOffset this.socketChannel.write exception", e);
return false;
}
}
lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
return !this.reportOffset.hasRemaining();
}
向Master反馈拉取偏移量,2个作用:
1。对于slave来说,是发送下次带拉取消息偏移量。
2.对master,既可以认为是Slave本次请求拉取的消息偏移量,也可以理解为slave的消息同步ack确认消息。
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
while (this.byteBufferRead.hasRemaining()) {
try {
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {//如果读取到的字节>0,重制读取到0字节的次数,并更新最后一次写入时间戳,然后调用dispatchReadRequest将读取到的所有消息全部追加到消息内存映射文件中,然后再次反馈拉取进度给服务器
readSizeZeroTimes = 0;
boolean result = this.dispatchReadRequest();
if (!result) {
log.error("HAClient, dispatchReadRequest error");
return false;
}
} else if (readSize == 0) {
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
log.info("HAClient, processReadEvent read socket < 0");
return false;
}
} catch (IOException e) {
log.info("HAClient, processReadEvent read socket exception", e);
return false;
}
}
return true;
}
处理网络读请求,即处理从Master服务器传回的消息数据。这里给出了一个处理网络读的NIO示例。循环判断readByteBuffer是否还有剩余空间。如果存在剩余空间,则调用SocketChannel#read将通道中的数据读入到读缓存区中。
Master服务器在收到从服务器的连接请求后,会将主从服务器的连接SocketChannel封装成HAConnection对象,实现主服务器与从服务器的读写操作,其网络读请求由其内部类ReadSocketService线程来实现:
class ReadSocketService extends ServiceThread {
private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024;//网络读缓存区大小
private final Selector selector;//NIO网络事件选择器
private final SocketChannel socketChannel;//网络通道,用于读写的socket通道
private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);//网络读写缓存区,默认为为1M
private int processPosition = 0;//当前处理指针
private volatile long lastReadTimestamp = System.currentTimeMillis();//上次读取数据的时间戳
@Override
public void run() {
HAConnection.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.selector.select(1000);
boolean ok = this.processReadEvent();
if (!ok) {
HAConnection.log.error("processReadEvent error");
break;
}
long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
log.warn("ha housekeeping, found this connection\[" + HAConnection.this.clientAddr + "\] expired, " + interval);
break;
}
} catch (Exception e) {
HAConnection.log.error(this.getServiceName() + " service has exception.", e);
break;
}
}
每隔1s处理一次读就绪事件,
org.apache.rocketmq.store.ha.HAConnection.ReadSocketService#processReadEvent:
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
if (!this.byteBufferRead.hasRemaining()) {
this.byteBufferRead.flip();
this.processPosition = 0;
}
如果byteBufferRead没有剩余空间,说明该position==limit==capacity,调用flip,position=0,limit=capacity设置processPostion=0,表示从头开始处理。
while (this.byteBufferRead.hasRemaining()) {
try {
int readSize = this.socketChannel.read(this.byteBufferRead);
NIO网络读的常规方法,一般使用循环的方式进行读写,知道byteBuffer中没有剩余的空间。
if (readSize > 0) {
readSizeZeroTimes = 0;
this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
long readOffset = this.byteBufferRead.getLong(pos - 8);
this.processPosition = pos;
HAConnection.this.slaveAckOffset = readOffset;
if (HAConnection.this.slaveRequestOffset < 0) {
HAConnection.this.slaveRequestOffset = readOffset;
log.info("slave\[" + HAConnection.this.clientAddr + "\] request offset " + readOffset);
}
HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
}
}
如果读取的字节大于0并且本次读取到的内容大于等于8,表明收到了从服务器一条拉取消息的请求。由于有新的从服务器反馈拉取偏移量,服务端会通知由于同步等待HA复制结果而阻塞的消息发送者线程
else if (readSize == 0) {
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
return false;
}
如果读取到的字节数等于0,则重复三次,否则结束本次读请求处理;如果读取到的字节数小于0,表示连接处于半关闭状态,返回false则意味着消息服务器将关闭改连接。
HAConnection的写请求由内部内WriteSocketService线程来实现:
org.apache.rocketmq.store.ha.HAConnection.WriteSocketService#run:
if (-1 == HAConnection.this.slaveRequestOffset) {
Thread.sleep(10);
continue;
}
如果 slaveRequestOffset等于-1,说明Master还未收到从服务器的拉取请求,放弃本次事件处理。slaveRequestOffset在收到从服务器拉取请求时更新。
if (-1 == this.nextTransferFromWhere) {
if (0 == HAConnection.this.slaveRequestOffset) {
long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
masterOffset =
masterOffset
- (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getMappedFileSizeCommitLog());
if (masterOffset < 0) {
masterOffset = 0;
}
this.nextTransferFromWhere = masterOffset;
} else {
this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
}
如果nextTransferFromWhere为-1,表示初次进行数据传输,计算待传输的物理偏移量,如果slaveRequestOffset为0,则从当前commitlog文件最大偏移量开始传输,否则根据从服务器的拉取请求偏移量开始传输。
if (this.lastWriteOver) {
long interval =
HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getHaSendHeartbeatInterval()) {
// Build Header
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(headerSize);
this.byteBufferHeader.putLong(this.nextTransferFromWhere);
this.byteBufferHeader.putInt(0);
this.byteBufferHeader.flip();
this.lastWriteOver = this.transferData();
if (!this.lastWriteOver)
continue;
}
} else {
this.lastWriteOver = this.transferData();
if (!this.lastWriteOver)
continue;
}
判断上次写事件是否已将信息全部写入客户端。
如果已全部写入,且当前系统时间与上次最后写入的时间间隔大于HA心跳检测时间,则发送一个心跳包,避免长连接由于空闲被关闭,HA心跳包发送间隔通过haSendHeartbeatInterVal放置,默认值为5s。
如果上次数据未写完,则先传输上一次的数据,如果消息还是未全部传输,则结束此次事件处理。
SelectMappedBufferResult selectResult =
HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
if (selectResult != null) {
int size = selectResult.getSize();
if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
}
long thisOffset = this.nextTransferFromWhere;
this.nextTransferFromWhere += size;
selectResult.getByteBuffer().limit(size);
this.selectMappedBufferResult = selectResult;
// Build Header
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(headerSize);
this.byteBufferHeader.putLong(thisOffset);
this.byteBufferHeader.putInt(size);
this.byteBufferHeader.flip();
this.lastWriteOver = this.transferData();
} else {
HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
}
传输消息到从服务器
1.根据消息从服务器请求的待拉取偏移量,查找该偏移量以后所有的可读消息,如果未查到匹配的消息,通知所有等待线程继续等待100ms。
2.如果匹配到消息,且查找到的消息总长度大于配置HA传输一次同步任务最大传输的字节数,则通过设置ByteBuffer的limit来控制只传输指定长度的字节,这就意味着HA客户端收到的消息会包含不完整的消息,HA一批次传输消息最大字节通过haTransferBatchSize设置,默认为32K。
HA服务端消息的传输一直一上述步骤循环运行,每次时间处理完成后等待1s。
总结:
acceptSocketService 每次select 1s处理连接时间,每个就绪事件委托给一个connnection对象。读线程读偏移量更新到共享成员变量,写线程轮询检查
haClient 建立连接时想master心跳最新同步偏移量,select 1s可读事件,从channel中读数据到缓冲区,按字节处理,之后将偏移量上报给master,如果超过了心跳时间,关闭连接
groupTransferService,通过CountDownLunch控制 同步阻塞模式下,等待HA同步完成的发送请求
手机扫一扫
移动阅读更方便
你可能感兴趣的文章