MySQL事务的提交采用两阶段提交协议, 前些日子和同事聊的时候发现对提交的细节还是有些模糊,这里对照MySQL源码详细记录一下,版本是MySQL5.7.36。
一. 事务的提交流程。
1. 获取 MDL_key::COMMIT 锁: FTWRL会阻塞 commit 操作。
-------------------------------- 接下来进入 prepare 阶段:
2. binlog prepare: 将上一次 commit 队列中的最大的 seq_no 写入本次事务的 last_commit 中。【用户发起的显式提交(显式commit), 或者DDL发起的隐式提交没有这一步】
3. innodb prepare:
3.1 获取线程的 xid, 设置为事务的 xid.
3.2 修改Innodb中事务的状态为 Prepare 状态
3.3 将 undo 日志端从 active 设置为 prepare 状态, 并在undo段的第一个undo的last undo log header中写入xid.
-------------------------------- 接下来进入 commit 阶段:
4. XID_Event生成并写入binlog cache中。会首先将事务中的写操作生成的event flush到binlog cache中, 再写入 xid event。这也符合binlog中事务event顺序。
5. binlog flush:
5.1. 当前线程加入flush队列, 如果flush队列是空的, 则当前为leader; 如果为非空, 则为 follower, 非leader线程将被阻塞, 直到commit之后被leader线程唤醒, 完成提交。
5.2. 获取Lock log锁
5.3. 对flush队列进行fetch, 本次处理的队列就固定了
5.4. 在innodb存储引擎中flush redo log, 做 innodb层redo持久化
5.5. 为flush队列中每个事务生成gtid
5.6. 将flush队列中每个线程的binlog cache flush到 binlog文件中, 这里包含三步:
a. 直接将 GTID event 写入 binlog磁盘文件中
b. 将事务生成的别的 event 写入 binlog file cache中
c. 将 binlog cache[IO cache] flush到文件
5.7. 判断 binlog 是否需要切换, 设置切换标记。注意:这里是将事务的event写入binlog file cache后再判断, 因此一个事务的binlog都位于同一个binlog文件中
5.8. after_flush hook:如果sync_binlog != 1, 那么在这里更新 binlog 点位, 通知dump线程向从节点发送 event
6. binlog sync:
6.1. 形成 sync 队列, 第一个进入 sync 队列的leader为本阶段的laeder, 其他flush队列加入sync队列的leader线程将被阻塞, 直到commit阶段被leader线程唤醒, finish_commit
6.2. 释放 Lock log mutex, 获取 Lock sync mutex
6.3. 根据 delay 的设置来决定是否延迟一段时间, 使得sync队列变大, last commit是在binlog prepare时生成, 这时last commit尚未修改, 因此加入sync队列的事务是同一组事务, 可以提高从库 mts 效率。
6.4. fetch sync队列, 对 sync 队列进行固化
6.5. sync binlog file到磁盘中, 需要根据sync_binlog的设置来决定是否刷盘
6.6. 如果 sync_binlog = 1, 那么更新 binlog end pos, 通知 dump线程发送 event
7. commit阶段:
如果需要按顺序提交: order_commits:
7.1. sync队列加入commit队列, 第一个进入的 sync 队列的leader为本阶段的leader, 其他sync队列加入commit队列的leader会被阻塞, 直到 commit阶段后被leader线程唤醒, 进入 finish commit
7.2. 释放 Lock sync mutex,获取 lock commit mutex
7.3. fetch commit队列, 对 commit 队列进行固化
7.4. 调用 after_sync hook: 这里, 如果半同步复制为 after_sync, 则需要等待dump线程收到从节点对于commit队列中最大的binlog filename和 pos的ack。
7.5. 在 Innodb 层提交之前变更 last_commit, 将其变更为 commit 队列中最大的 seqno
7.6. COMMIT队列中每个事务按照顺序进行存储引擎层提交
7.7. 变更 gtid_executed
7.8. 释放 lock commit mutex
如果不需要按顺序提交:
7.1. 调用 after_sync hook: 这里, 如果半同步复制为 after_sync, 则需要等待dump线程收到从节点对于commit队列中最大的binlog filename和 pos的ack。
8. 收尾:
8.1. leader 线程唤醒组内成员, 进行各自操作
8.2. commit 队列中事务清空 binlog cache 临时文件和内存
8.3. 如果不需要按顺序提交:则commit队列中线程各自进行存储引擎层的提交, 提交完成之后更新 gtid_executed
8.4. 决定是否进行 binlog 的 rotate
8.5. 如果 rotate 了 binlog, 则根据 expire_log_days 判断是否需要清理 binlog
二. 流程代码。
主代码如下,删除了部分辅助代码,从 trans_commit(THD *thd) 函数开始。
bool trans_commit(THD *thd)
{
// 提交事务。
res = ha_commit_trans(thd, TRUE);
if (res == FALSE)
if (thd->rpl_thd_ctx.session_gtids_ctx().notify_after_transaction_commit(thd))
sql_print_warning("Failed to collect GTID to send in the response packet!");
thd->server_status &= ~SERVER_STATUS_IN_TRANS;
thd->variables.option_bits &= ~OPTION_BEGIN;
thd->get_transaction()->reset_unsafe_rollback_flags(Transaction_ctx::SESSION);
thd->lex->start_transaction_opt = 0;
/* The transaction should be marked as complete in P_S. */
assert(thd->m_transaction_psi == NULL);
thd->tx_priority = 0;
trans_track_end_trx(thd);
DBUG_RETURN(MY_TEST(res));
}
/*
提交事务。
server层最后调用函数 ha_commit_trans(), 该函数负责处理 binlog 层和存储引擎层的提交。
*/
int ha_commit_trans(THD *thd, bool all, bool ignore_global_read_lock)
{
// 读写事务 && 不能忽略全局读锁
if (rw_trans && !ignore_global_read_lock)
{
/*
获取一个 MDL_KEY::COMMIT 元数据锁, 该元数据锁将确保 commit 操作会被活跃的 FTWRL 锁阻止。
FTWRL锁会阻塞 COMMIT 操作。
*/
MDL_REQUEST_INIT(&mdl_request,
MDL_key::COMMIT, "", "", MDL_INTENTION_EXCLUSIVE,
MDL_EXPLICIT);
DBUG_PRINT("debug", ("Acquire MDL commit lock"));
// 申请 MDL_key::COMMIT 锁, 申请失败
if (thd->mdl_context.acquire_lock(&mdl_request,
thd->variables.lock_wait_timeout))
{
ha_rollback_trans(thd, all);
DBUG_RETURN(1);
}
release_mdl = true;
}
// 判断是否开启 xa 事务;
// 所有的 entries 都支持 2pc && 在事务 scope 中设置做读写更改的引擎数量 > 1
if (!trn_ctx->no_2pc(trx_scope) && (trn_ctx->rw_ha_count(trx_scope) > 1))
// prepare; 在事务协调器中 prepare commit tx, 在引擎层生成一个 XA 事务。
// tc_log: mysqld启动时生成的 MySQL_BIN_LOG 对象[XA控制对象]。
error = tc_log->prepare(thd, all);
}
/*
XA 事务的状态变更为 prepared, 中间态。最终会变成常规的 NOTR 状态。
*/
if (!error && all && xid_state->has_state(XID_STATE::XA_IDLE))
{
assert(thd->lex->sql_command == SQLCOM_XA_COMMIT &&
static_cast
// 设置 XA 事务状态为 XA_PREPARED 状态。
xid_state->set_state(XID_STATE::XA_PREPARED);
}
/**
* XA 事务提交
*/
if (error || (error = tc_log->commit(thd, all)))
{
ha_rollback_trans(thd, all);
error = 1;
goto end;
}
end:
// 释放 mdl 锁。
if (release_mdl && mdl_request.ticket)
{
thd->mdl_context.release_lock(mdl_request.ticket);
}
/*
* 释放资源并执行其他清理。空事务也需要。
*/
if (is_real_trans)
{
trn_ctx->cleanup();
thd->tx_priority = 0;
}
}
int MYSQL_BIN_LOG::prepare(THD *thd, bool all)
{
/*
设置 HA_IGNORE_DURABILITY 在 prepare 阶段不将事务的 prepare record 刷到 innodb redo log。
这样在 binlog 组提交的 flush 阶段 flushing binlog 之前 flush prepare record 到 innodb redo log。
在 innodb prepare 时, 不刷 redo log.
*/
thd->durability_property = HA_IGNORE_DURABILITY;
// 在引擎中 prepare commit trx
int error = ha_prepare_low(thd, all);
DBUG_RETURN(error);
}
/**
* prepare commit trx
* 在引擎层 prepare commit trx
* 包括 binlog引擎 和 innodb引擎
*/
int ha_prepare_low(THD *thd, bool all)
{
// 遍历引擎
if (ha_info)
{
for (; ha_info && !error; ha_info = ha_info->next())
{
int err = 0;
// 引擎
handlerton *ht = ha_info->ht();
/*
如果这个特定事务是只读的, 不要调用两阶段提交。
*/
if (!ha_info->is_trx_read_write())
continue;
/**
* 调用引擎的 prepare 在存储层生成 XA 事务。
* 先 binlog prepare, 再 innodb prepare;
* binlog prepare: 将上一次 commit 队列中最大的 seq num 写入本次事务的 last_commit 中
* innodb prepare: 在 innodb 中更改 undo 日志段的状态为 trx_undo_prepared, 并将 xid 写入 undo log header。
* */
if ((err = ht->prepare(ht, thd, all)))
{
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
error = 1;
}
// ha_prepare_count++
thd->status_var.ha_prepare_count++;
}
}
}
/*
* binlog prepare;
*/
static int binlog_prepare(handlerton *hton, THD *thd, bool all)
{
if (!all)
{
// 将上一次 commit 队列中最大的 seq number 写入本次事务的 last_commit 中。
thd->get_transaction()->store_commit_parent(mysql_bin_log.m_dependency_tracker.get_max_committed_timestamp());
}
DBUG_RETURN(all && is_loggable_xa_prepare(thd) ? mysql_bin_log.commit(thd, true) : 0);
}
/*******************************************************************/ /**
Innodb prepare 一个 X/Open XA 分布式事务。
static int
innobase_xa_prepare(
/*================*/
handlerton *hton, /*!< in: InnoDB handlerton ; innodb引擎 */
THD *thd, /*!< in: handle to the MySQL thread of
the user whose XA transaction should
be prepared ; mysql线程 */
bool prepare_trx) /*!< in: true - prepare transaction
false - the current SQL statement
ended ; true: prepare 事务
false: 当前 SQL 语句结束, 语句级别的提交 */
{
// trx
trx_t *trx = check_trx_exists(thd);
// 获取thd的 xid, 同时设置到 trx -> xid 中
thd_get_xid(thd, (MYSQL_XID *)trx->xid);
/\* 释放可能的 FIFO ticket 和 search latch。
因为我们要保留 trx\_sys -> mutex, 我们必须首先释放 search system latch 来遵守锁存顺序。
\*/
trx\_search\_latch\_release\_if\_reserved(trx);
// prepare trx
if (prepare\_trx || (!thd\_test\_options(thd, OPTION\_NOT\_AUTOCOMMIT | OPTION\_BEGIN)))
{
/\* preapre 整个事务, 或者这是一个SQL语句结束, autocommit 是打开状态 \*/
// 事务已经在 mysql 2pc 协调器中注册。
ut\_ad(trx\_is\_registered\_for\_2pc(trx));
// trx prepare
dberr\_t err = trx\_prepare\_for\_mysql(trx);
}
else
{
/\* 语句的提交动作, 而非真正的事务提交。 \*/
// 需要释放语句 hold 的 auto\_increment 锁
lock\_unlock\_table\_autoinc(trx);
// 记录本语句的 undo 信息, 以便语句级的回滚
// 标记最新SQL语句结束。
trx\_mark\_sql\_stat\_end(trx);
}
return (0);
}
/**
* trx prepare
*/
dberr_t
trx_prepare_for_mysql(trx_t *trx)
{
trx->op_info = "preparing";
// prepare trx.
trx_prepare(trx);
}
/****************************************************************/ /**
prepare trx.*/
static void
trx_prepare(
/*========*/
trx_t *trx) /*!< in/out: transaction */
{
// 回滚段 != NULL && redo 段被修改
if (trx->rsegs.m_redo.rseg != NULL && trx_is_redo_rseg_updated(trx))
{
// 为指定的回滚段 preapre 一个事务。lsn 为当前已 commit 的 lsn
lsn = trx_prepare_low(trx, &trx->rsegs.m_redo, false);
}
if (trx->rsegs.m\_noredo.rseg != NULL && trx\_is\_noredo\_rseg\_updated(trx))
{
// 为指定的回滚段 preapre 一个事务。
trx\_prepare\_low(trx, &trx->rsegs.m\_noredo, true);
}
/\*--------------------------------------\*/
// 事务状态为 TRX\_STATE\_ACTIVE 状态, 修改事务状态
trx->state = TRX\_STATE\_PREPARED;
// 事务系统中处于 xa prepared 状态的事务的数量
trx\_sys->n\_prepared\_trx++;
/\*--------------------------------------\*/
/\* Release read locks after PREPARE for READ COMMITTED
and lower isolation.
对 rc 隔离级别, 在 prepare 之后释放 read locks, 降低隔离度
\*/
if (trx->isolation\_level <= TRX\_ISO\_READ\_COMMITTED)
{
/\* Stop inheriting GAP locks.
停止继承 GAP lock。
\*/
trx->skip\_lock\_inheritance = true;
/\* Release only GAP locks for now.
释放 GAP lock。
\*/
lock\_trx\_release\_read\_locks(trx, true);
}
switch (thd\_requested\_durability(trx->mysql\_thd))
{
case HA\_IGNORE\_DURABILITY:
/\*
在 binlog group commit 的 prepare 阶段, 我们设置 HA\_IGNORE\_DURABILITY , 这样在这个阶段不会 flush redo log。
这样我们就可以在 binlog group commit 的 flush 阶段在将 binary log写入二进制日志之前, 在一个组中 flush redo log。
\*/
break;
case ..
}
}
/****************************************************************/ /**
为指定的回滚段 preapre 一个事务。 */
static lsn_t
trx_prepare_low(
/*============*/
trx_t *trx, /*!< in/out: transaction */
trx_undo_ptr_t *undo_ptr, /*!< in/out: pointer to rollback
segment scheduled for prepare. 指向回滚段的指针 */
bool noredo_logging) /*!< in: turn-off redo logging. 不需要redo log */
{
lsn_t lsn;
// insert 或者 undo 回滚段不为 NULL
if (undo_ptr->insert_undo != NULL || undo_ptr->update_undo != NULL)
{
// start a sync mtr
mtr_start_sync(&mtr);
// 设置 mtr mode
if (noredo_logging)
{
mtr_set_log_mode(&mtr, MTR_LOG_NO_REDO);
}
/\*
将 undo 日志段状态从 trx\_undo\_active 修改为 trx\_undo\_prepared:
更改 undo 回滚段将其设置为 prepare 状态。
\*/
mutex\_enter(&rseg->mutex);
// insert undo log 不为 NULL
if (undo\_ptr->insert\_undo != NULL)
{
/\*
这里不需要获取 trx->undo\_mutex, 因为只允许一个 OS 线程为该事务做事务准备。
\*/
// 将 undo 日志段状态从 trx\_undo\_active 修改为 trx\_undo\_prepared 状态
trx\_undo\_set\_state\_at\_prepare(
trx, undo\_ptr->insert\_undo, false, &mtr);
}
// 将 undo 日志段状态从 trx\_undo\_active 修改为 trx\_undo\_prepared 状态
if (undo\_ptr->update\_undo != NULL)
{
trx\_undo\_set\_state\_at\_prepare(
trx, undo\_ptr->update\_undo, false, &mtr);
}
mutex\_exit(&rseg->mutex);
lsn = mtr.commit\_lsn();
}
else
{
lsn = 0;
}
return (lsn);
}
/* 修改 undo 日志段的状态*/
page_t *
trx_undo_set_state_at_prepare(
trx_t *trx,
trx_undo_t *undo,
bool rollback,
mtr_t *mtr)
{
// 获取 undo page 页, 并在其上加 x-latch
undo_page = trx_undo_page_get(
page_id_t(undo->space, undo->hdr_page_no),
undo->page_size, mtr);
// undo 段 header
seg_hdr = undo_page + TRX_UNDO_SEG_HDR;
// 如果是 XA rollback
if (rollback)
{
ut_ad(undo->state == TRX_UNDO_PREPARED);
// 将 undo 段的状态从 TRX_UNDO_PREPARED 修改为 TRX_UNDO_ACTIVE 状态
mlog_write_ulint(seg_hdr + TRX_UNDO_STATE, TRX_UNDO_ACTIVE,
MLOG_2BYTES, mtr);
return (undo_page);
}
/*------------------------------*/
// 是 XA prepare, 则将 undo 段的状态从 TRX_UNDO_ACTIVE 修改为 TRX_UNDO_PREPARED, 并将 xid 写入 undo。
ut_ad(undo->state == TRX_UNDO_ACTIVE);
undo->state = TRX_UNDO_PREPARED;
undo->xid = *trx->xid;
/*------------------------------*/
// 在 undo 段中更新当前 undo 段的状态
mlog_write_ulint(seg_hdr + TRX_UNDO_STATE, undo->state,
MLOG_2BYTES, mtr);
// 在 undo 段 last undo log header 中写入 xid
offset = mach_read_from_2(seg_hdr + TRX_UNDO_LAST_LOG);
undo_header = undo_page + offset;
mlog_write_ulint(undo_header + TRX_UNDO_XID_EXISTS,
TRUE, MLOG_1BYTE, mtr);
trx_undo_write_xid(undo_header, &undo->xid, mtr);
return (undo_page);
}
/*
在事务协调器中提交事务。
该函数将在二进制日志和存储引擎中提交事务。
*/
TC_LOG::enum_result MYSQL_BIN_LOG::commit(THD *thd, bool all)
{
……..
{
/* The prepare phase of XA transaction two phase logging. */
int err = 0;
bool one_phase = get_xa_opt(thd) == XA_ONE_PHASE;
assert(thd->lex->sql\_command != SQLCOM\_XA\_COMMIT || one\_phase);
// xid event 生成并写入 binlog cache, 在真正的写操作语句生成的event之后
XID\_STATE \*xs = thd->get\_transaction()->xid\_state();
XA\_prepare\_log\_event end\_evt(thd, xs->get\_xid(), one\_phase);
err = cache\_mngr->trx\_cache.finalize(thd, &end\_evt, xs);
........
}
trx\_stuff\_logged = true;
}
........
// 提交。
// ordered\_commit: 事务在 binlog 阶段提交的核心函数。
if (ordered\_commit(thd, all, skip\_commit))
DBUG\_RETURN(RESULT\_INCONSISTENT);
/\*
Mark the flag m\_is\_binlogged to true only after we are done
with checking all the error cases.
检查完所有错误情况后, 将标记 m\_is\_binlogged 标记为 true.
\*/
if (is\_loggable\_xa\_prepare(thd))
thd->get\_transaction()->xid\_state()->set\_binlogged();
}
DBUG_RETURN(RESULT_SUCCESS);
}
/**
Flush and commit the transaction.
This will execute an ordered flush and commit of all outstanding
transactions and is the main function for the binary log group
commit logic. The function performs the ordered commit in two
phases.
The first phase flushes the caches to the binary log and under
LOCK_log and marks all threads that were flushed as not pending.
The second phase executes under LOCK_commit and commits all
transactions in order.
The procedure is:
#ifdef HAVE_REPLICATION
/**
* 先形成 flush 队列, 非 leader 线程将被阻塞, 直到 commit 阶段被 leader 线程唤醒。
* 然后leader线程获取 Lock log锁
*/
if (has_commit_order_manager(thd))
{
Slave_worker *worker = dynamic_cast
Commit_order_manager *mngr = worker->get_commit_order_manager();
if (mngr->wait\_for\_its\_turn(worker, all))
{
thd->commit\_error = THD::CE\_COMMIT\_ERROR;
DBUG\_RETURN(thd->commit\_error);
}
// 获取 Lock\_log 锁, 非 leader 线程将被阻塞, 直到commit之后被 leader 线程唤醒, 非 leader 线程这里返回 true, 线程应该等待提交完成。
if (change\_stage(thd, Stage\_manager::FLUSH\_STAGE, thd, NULL, &LOCK\_log))
DBUG\_RETURN(finish\_commit(thd));
}
else
#endif
// 获取 Lock_log 锁, 非 leader 线程将被阻塞, 直到被 leader 线程唤醒, 非 leader 线程这里返回 true, 线程应该等待提交完成。
if (change_stage(thd, Stage_manager::FLUSH_STAGE, thd, NULL, &LOCK_log))
{
DBUG_RETURN(finish_commit(thd));
}
THD *wait_queue = NULL, *final_queue = NULL;
mysql_mutex_t *leave_mutex_before_commit_stage = NULL;
my_off_t flush_end_pos = 0;
bool update_binlog_end_pos_after_sync;
DEBUG_SYNC(thd, "waiting_in_the_middle_of_flush_stage");
// 执行 flush 阶段操作。
/*
* 1. 对 flush 队列进行 fetch, 本次处理的flush队列就固定了
2. 在 innodb 存储引擎中 flush redo log, 做 innodb 层 redo 持久化。
3. 为 flush 队列中每个事务生成 gtid。
4. 将 flush队列中每个线程的 binlog cache flush 到 binlog 日志文件中。这里包含两步:
1. 将事务的 GTID event直接写入 binlog 磁盘文件中
2. 将事务生成的别的 event 写入 binlog file cache 中
*/
flush_error = process_flush_stage_queue(&total_bytes, &do_rotate,
&wait_queue);
// 将 binary log cache(IO cache) flush到文件中
if (flush_error == 0 && total_bytes > 0)
flush_error = flush_cache_to_file(&flush_end_pos);
// sync_binlog 是否等于 1
update_binlog_end_pos_after_sync = (get_sync_period() == 1);
/*
如果 flush 操作成功, 则调用 after_flush hook。
*/
if (flush_error == 0)
{
const char *file_name_ptr = log_file_name + dirname_length(log_file_name);
assert(flush_end_pos != 0);
if (RUN_HOOK(binlog_storage, after_flush,
(thd, file_name_ptr, flush_end_pos)))
{
sql_print_error("Failed to run 'after_flush' hooks");
flush_error = ER_ERROR_ON_WRITE;
}
// 不等于 1, 通知 dump 线程
if (!update_binlog_end_pos_after_sync)
// 更新 binlog end pos, 通知 dump 线程向从库发送 event
update_binlog_end_pos();
DBUG_EXECUTE_IF("crash_commit_after_log", DBUG_SUICIDE(););
}
if (flush_error)
{
/*
Handle flush error (if any) after leader finishes it's flush stage.
如果存在 flush 错误, 则处理 flush错误
*/
handle_binlog_flush_or_sync_error(thd, false /* need_lock_log */,
(thd->commit_error == THD::CE_FLUSH_GNO_EXHAUSTED_ERROR)
? ER(ER_GNO_EXHAUSTED)
: NULL);
}
/*
Stage #2: Syncing binary log file to disk
sync binary log file to disk.
*/
/** 释放 Lock_log mutex, 获取 Lock_sync mutex
* 第一个进入的 flush 队列的 leader 为本阶段的 leader, 其他 flush 队列加入 sync 队列, 其他 flush 队列的
* leader会被阻塞, 直到 commit 阶段被 leader 线程唤醒。
* */
if (change_stage(thd, Stage_manager::SYNC_STAGE, wait_queue, &LOCK_log, &LOCK_sync))
{
DBUG_RETURN(finish_commit(thd));
}
/*
根据 delay 的设置来决定是否延迟一段时间, 如果 delay 的时间越久, 那么加入 sync 队列的
事务就越多【last commit 是在 binlog prepare 时生成的, 尚未更改, 因此加入 sync 队列的
事务是同一组事务】, 提高了从库 mts 的效率。
*/
if (!flush_error && (sync_counter + 1 >= get_sync_period()))
stage_manager.wait_count_or_timeout(opt_binlog_group_commit_sync_no_delay_count,
opt_binlog_group_commit_sync_delay,
Stage_manager::SYNC_STAGE);
// fetch sync 队列, 对 sync 队列进行固化。
final_queue = stage_manager.fetch_queue_for(Stage_manager::SYNC_STAGE);
// 这里 sync_binlog file到磁盘中
if (flush_error == 0 && total_bytes > 0)
{
// 根据 sync_binlog 的设置决定是否刷盘
std::pair
sync_error = result.first;
}
// 在这里 sync_binlog = 1, 更新 binlog end_pos, 通知 dump 线程发送 event
if (update_binlog_end_pos_after_sync)
{
THD *tmp_thd = final_queue;
const char *binlog_file = NULL;
my_off_t pos = 0;
while (tmp_thd->next_to_commit != NULL)
tmp_thd = tmp_thd->next_to_commit;
if (flush_error == 0 && sync_error == 0)
{
tmp_thd->get_trans_fixed_pos(&binlog_file, &pos);
// 更新 binlog end pos, 通知 dump 线程
update_binlog_end_pos(binlog_file, pos);
}
}
leave_mutex_before_commit_stage = &LOCK_sync;
/*
Stage #3: Commit all transactions in order.
按顺序在 Innodb 层提交所有事务。
如果我们不需要对提交顺序进行排序, 并且每个线程必须执行 handlerton 提交, 那么这个阶段可以跳过。
然而, 由于我们保留了前一阶段的锁, 如果我们跳过这个阶段, 则必须进行解锁。
*/
commit_stage:
// 如果需要顺序提交
if (opt_binlog_order_commits &&
(sync_error == 0 || binlog_error_action != ABORT_SERVER))
{
// SYNC队列加入 COMMIT 队列, 第一个进入的 SYNC 队列的 leader 为本阶段的 leader。其他 sync 队列
// 加入 commit 队列的 leade 会被阻塞, 直到 COMMIT 阶段后被 leader 线程唤醒。
// 释放 lock_sync mutex, 持有 lock_commit mutex.
if (change_stage(thd, Stage_manager::COMMIT_STAGE,
final_queue, leave_mutex_before_commit_stage,
&LOCK_commit))
{
DBUG_RETURN(finish_commit(thd));
}
// 固化 commit 队列
THD *commit_queue = stage_manager.fetch_queue_for(Stage_manager::COMMIT_STAGE);
// 调用 after_sync hook
if (flush_error == 0 && sync_error == 0)
// 调用 after_sync hook.注意:对于after_sync, 这里将等待binlog dump 线程收到slave节点关于队列中事务最新的 binlog_file和 binlog_pos的ACK。
sync_error = call_after_sync_hook(commit_queue);
/\*
process\_commit\_stage\_queue 将为队列中每个 thd 持有的 GTID
调用 update\_on\_commit 或 update\_on\_rollback。
这样做的目的是确保 gtid 按照顺序添加到 GTIDs中, 避免出现不必要的间隙
如果我们只允许每个线程在完成提交时调用 update\_on\_commit, 则无法保证 GTID
顺序, 并且 gtid\_executed 之间可能出现空隙。发生这种情况, server必须从
Gtid\_set 中添加和删除间隔, 添加或删除间隔需要一个互斥锁, 这会降低性能。
\*/
process\_commit\_stage\_queue(thd, commit\_queue);
// 退出 Lock\_commit 锁
mysql\_mutex\_unlock(&LOCK\_commit);
/\*
Process after\_commit after LOCK\_commit is released for avoiding
3-way deadlock among user thread, rotate thread and dump thread.
在 LOCK\_commit 释放之后处理 after\_commit 来避免 user thread, rotate thread 和 dump thread的
3路死锁。
\*/
process\_after\_commit\_stage\_queue(thd, commit\_queue);
final\_queue = commit\_queue;
}
else
{
// 释放锁, 调用 after_sync hook.
if (leave_mutex_before_commit_stage)
mysql_mutex_unlock(leave_mutex_before_commit_stage);
if (flush_error == 0 && sync_error == 0)
sync_error = call_after_sync_hook(final_queue);
}
/*
Handle sync error after we release all locks in order to avoid deadlocks
为了避免死锁, 在释放所有的 locks 之后处理sync error
*/
if (sync_error)
handle_binlog_flush_or_sync_error(thd, true /* need_lock_log */, NULL);
/* Commit done so signal all waiting threads
commit完成之后通知所有处于 wait 状态的线程
*/
stage_manager.signal_done(final_queue);
/*
Finish the commit before executing a rotate, or run the risk of a
deadlock. We don't need the return value here since it is in
thd->commit_error, which is returned below.
在执行 rotate 之前完成commit, 否则可能出现死锁。
*/
(void)finish_commit(thd);
/*
If we need to rotate, we do it without commit error.
Otherwise the thd->commit_error will be possibly reset.
rotate
*/
if (DBUG_EVALUATE_IF("force_rotate", 1, 0) ||
(do_rotate && thd->commit_error == THD::CE_NONE &&
!is_rotating_caused_by_incident))
{
/*
如果需要进行 binlog rotate, 则进行 rotate 操作。
*/
DEBUG\_SYNC(thd, "ready\_to\_do\_rotation");
bool check\_purge = false;
mysql\_mutex\_lock(&LOCK\_log);
/\*
If rotate fails then depends on binlog\_error\_action variable
appropriate action will be taken inside rotate call.
\*/
int error = rotate(false, &check\_purge);
mysql\_mutex\_unlock(&LOCK\_log);
if (error)
thd->commit\_error = THD::CE\_COMMIT\_ERROR;
else if (check\_purge)
// rotate判断是否需要 expire binlog.
purge();
}
/*
flush or sync errors are handled above (using binlog_error_action).
Hence treat only COMMIT_ERRORs as errors.
*/
DBUG_RETURN(thd->commit_error == THD::CE_COMMIT_ERROR);
}
/**
Enter a stage of the ordered commit procedure.
进入有序提交阶段。
Entering is stage is done by:
bool MYSQL_BIN_LOG::change_stage(THD *thd,
Stage_manager::StageID stage, THD *queue,
mysql_mutex_t *leave_mutex,
mysql_mutex_t *enter_mutex)
{
assert(0 <= stage && stage < Stage_manager::STAGE_COUNTER);
assert(enter_mutex);
assert(queue);
/*
一旦会话入队, enroll_for 将释放 leave_mutex
*/
// 当前线程非 leader 线程, 非 leader 线程将被阻塞, 直到 commit 阶段被 leader 线程唤醒。
if (!stage_manager.enroll_for(stage, queue, leave_mutex))
{
assert(!thd_get_cache_mngr(thd)->dbug_any_finalized());
DBUG_RETURN(true);
}
/*
* 以下是 leader, 获取 enter_mutex
*/
bool need_lock_enter_mutex =
!(is_rotating_caused_by_incident && enter_mutex == &LOCK_log);
if (need_lock_enter_mutex)
mysql_mutex_lock(enter_mutex);
else
mysql_mutex_assert_owner(enter_mutex);
DBUG_RETURN(false);
}
// 返回是否是 leader 线程, 非 leader 线程将被阻塞, 直到 commit阶段被 leader线程唤醒
bool Stage_manager::enroll_for(StageID stage, THD *thd, mysql_mutex_t *stage_mutex)
{
// 如果队列是空的, 那么我们就是 leader
bool leader = m_queue[stage].append(thd);
#ifdef HAVE_REPLICATION
// 如果处于 flush 阶段并且存在 commit_order
if (stage == FLUSH_STAGE && has_commit_order_manager(thd))
{
// slave worker线程
Slave_worker *worker = dynamic_cast
// slave worker 线程的 commit_order_manager
Commit_order_manager *mngr = worker->get_commit_order_manager();
// 取消注册事务, worker为执行该事务的线程
mngr->unregister_trx(worker);
}
#endif
/*
如果 state_mutex 是 Lock_log, 那么当 binlog rotate时不应该解决, 在 rotation 时应该保持这个锁。
*/
bool need_unlock_stage_mutex =
!(mysql_bin_log.is_rotating_caused_by_incident &&
stage_mutex == mysql_bin_log.get_log_lock());
if (stage_mutex && need_unlock_stage_mutex)
mysql_mutex_unlock(stage_mutex);
/*
如果 queue不是empty, 那么我们是跟随者并等待leader处理这个queue。
如果我们持有一个 mutex, 那必须在 sleep 之前释放这个mutex
*/
// 非 leader 线程将被阻塞
if (!leader)
{
mysql_mutex_lock(&m_lock_done);
#ifndef NDEBUG
/*
Leader can be awaiting all-clear to preempt follower's execution.
With setting the status the follower ensures it won't execute anything
including thread-specific code.
*/
thd->get_transaction()->m_flags.ready_preempt = 1;
if (leader_await_preempt_status)
mysql_cond_signal(&m_cond_preempt);
#endif
while (thd->get_transaction()->m_flags.pending)
mysql_cond_wait(&m_cond_done, &m_lock_done);
mysql_mutex_unlock(&m_lock_done);
}
return leader;
}
/*
执行 flush 阶段。
*/
int MYSQL_BIN_LOG::process_flush_stage_queue(my_off_t *total_bytes_var,
bool *rotate_var,
THD **out_queue_var)
{
// 需要持有 LOCK_log 锁
mysql_mutex_assert_owner(&LOCK_log);
/*
获取整个 flush 队列并清空flush队列, 对 flush 队列进行固化。同时便于下一批事务可以使用 flush 队列。
*/
THD *first_seen = stage_manager.fetch_queue_for(Stage_manager::FLUSH_STAGE);
/*
在 innodb 存储引擎中 flush redo log, 做 innodb 层 redo 持久化。
*/
ha_flush_logs(NULL, true);
// 为 flush 队列中每个事务生成 gtid
assign_automatic_gtids_to_flush_group(first_seen);
/*
刷新每个线程的 binlog cache 到 binlog 文件中。
*/
for (THD *head = first_seen; head; head = head->next_to_commit)
{
// 将线程的 binlog cache 刷新到 binlog 日志文件
// 首先将 gtid event 直接写入 binlog。
// 再将 other event 写入到 binlog file cache 中
std::pair
total_bytes += result.second;
}
*out_queue_var = first_seen;
*total_bytes_var = total_bytes;
// 判断 binlog 文件是否需要切换
// 这里是在写入之后判断的, 因此一个事务的binlog都位于同一个文件中
if (total_bytes > 0 && my_b_tell(&log_file) >= (my_off_t)max_size)
*rotate_var = true;
}
THD *fetch_queue_for(StageID stage)
{
return m_queue[stage].fetch_and_empty();
}
/** Flush InnoDB redo logs to the file system.
* flush innodb redo logs到文件系统中。
如果在 flush 阶段被 binlog 组提交调用, 则为 true, 其他情况为 false.
@return false */
static bool
innobase_flush_logs(
handlerton *hton,
bool binlog_group_flush)
{
DBUG_ENTER("innobase_flush_logs");
assert(hton == innodb_hton_ptr);
// read only, return false
if (srv_read_only_mode)
{
DBUG_RETURN(false);
}
// 如果 binlog\_group\_flush 为 true, 则我们在 flush 阶段被 binlog 组提交调用, 否则为 false。
// innodb\_flush\_log\_at\_trx\_commit = 0 的情况下, write & sync 每秒一次, 不需要在 binlog group commit时提交。
if (binlog\_group\_flush && srv\_flush\_log\_at\_trx\_commit == 0)
{
/\* innodb\_flush\_log\_at\_trx\_commit=0
(write and sync once per second).
Do not flush the redo log during binlog group commit. \*/
DBUG\_RETURN(false);
}
/\* 将 redo log buffer 刷新到 redo log file文件中。
如果在 flush logs 或者 srv\_flush\_log\_at\_trx\_commit = 1则进行 flush。
\*/
log\_buffer\_flush\_to\_disk(!binlog\_group\_flush || srv\_flush\_log\_at\_trx\_commit == 1);
DBUG\_RETURN(false);
}
/**
* 为 commit group 中每个事务生成 GTID
*/
bool MYSQL_BIN_LOG::assign_automatic_gtids_to_flush_group(THD *first_seen)
{
bool error = false;
bool is_global_sid_locked = false;
rpl_sidno locked_sidno = 0;
// 遍历 flush 队列中每个事务
for (THD *head = first_seen; head; head = head->next_to_commit)
{
/* Generate GTID
* 为每个事务生成 GTID
*/
if (head->variables.gtid_next.type == AUTOMATIC_GROUP)
{
if (!is_global_sid_locked)
{
global_sid_lock->rdlock();
is_global_sid_locked = true;
}
// 为指定的事务生成 GTID
if (gtid_state->generate_automatic_gtid(head,
head->get_transaction()->get_rpl_transaction_ctx()->get_sidno(),
head->get_transaction()->get_rpl_transaction_ctx()->get_gno(),
&locked_sidno) != RETURN_STATUS_OK)
}
else
{
if (head->variables.gtid_next.type == GTID_GROUP)
assert(head->owned_gtid.sidno > 0);
else
{
assert(head->variables.gtid_next.type == ANONYMOUS_GROUP);
assert(head->owned_gtid.sidno == THD::OWNED_SIDNO_ANONYMOUS);
}
}
}
DBUG_RETURN(error);
}
/**
Flush caches for session.
将线程的 binlog cache 刷新到 binlog 日志文件
首先将 gtid event 直接写入 binlog。
再将 other event 写入到 binlog cache 中
set_trans_pos 是一个指向二进制日志当前使用的文件名的指针调用, rotation 将改变这个变量的内容。
*/
std::pair
MYSQL_BIN_LOG::flush_thread_caches(THD *thd)
{
// cache_mgr
binlog_cache_mngr *cache_mngr = thd_get_cache_mngr(thd);
my_off_t bytes = 0;
bool wrote_xid = false;
// binlog cache 进行 flush 操作。
// 首先将 gtid event 直接写入 binlog。
// 再将 other event 写入到 binlog cache 中
int error = cache_mngr->flush(thd, &bytes, &wrote_xid);
if (!error && bytes > 0)
{
thd->set_trans_pos(log_file_name, my_b_tell(&log_file));
if (wrote_xid)
inc_prep_xids(thd);
}
DBUG_PRINT("debug", ("bytes: %llu", bytes));
return std::make_pair(error, bytes);
}
/*
Convenience method to flush both caches to the binary log.
// 对 binlog cache 进行 flush 操作。
*/
int flush(THD *thd, my_off_t *bytes_written, bool *wrote_xid)
{
my_off_t stmt_bytes = 0;
my_off_t trx_bytes = 0;
assert(stmt_cache.has_xid() == 0);
// 对指定线程的 binlog cache 进行 flush 操作。
// 首先将 gtid event 直接写入 binlog。
// 再将 other event 写入到 binlog cache 中。
int error = stmt_cache.flush(thd, &stmt_bytes, wrote_xid);
if (error)
return error;
DEBUG_SYNC(thd, "after_flush_stm_cache_before_flush_trx_cache");
if (int error = trx_cache.flush(thd, &trx_bytes, wrote_xid))
return error;
*bytes_written = stmt_bytes + trx_bytes;
return 0;
}
/**
Flush caches to the binary log.
将缓存 flush 到二进制日志文件。
首先将 gtid event 直接写入 binlog。
再将 other event 写入到 binlog cache 中。
If the cache is finalized, the cache will be flushed to the binary
log file. If the cache is not finalized, nothing will be done.
如果 cache 确定, 则把 cache flush到二进制日志文件。如果 cache 尚未最终确定, 则不做任何操作。
If flushing fails for any reason, an error will be reported and the
cache will be reset. Flushing can fail in two circumstances:
如果由于任何原因 flush 失败, 则报告错误并重置缓存。flush可能存在如下两种方式的失败:
It was not possible to write the cache to the file. In this case,
it does not make sense to keep the cache.
无法将 cache 写入文件, 这种情况下, 保存缓存是没有意义的。
The cache was successfully written to disk but post-flush actions
(such as binary log rotation) failed. In this case, the cache is
already written to disk and there is no reason to keep it.
cache已成功写入磁盘, 但是之后的操作失败, 例如 binlog rotation。这种情况下,
缓存已经写入了磁盘, 没有必要保留他。
@see binlog_cache_data::finalize
*/
int binlog_cache_data::flush(THD *thd, my_off_t *bytes_written, bool *wrote_xid)
{
int error = 0;
if (flags.finalized)
{
// bytes_in_cache
my_off_t bytes_in_cache = my_b_tell(&cache_log);
// trx
Transaction_ctx *trn_ctx = thd->get_transaction();
// seq_no
trn_ctx->sequence_number = mysql_bin_log.m_dependency_tracker.step();
// last_committed
if (trn_ctx->last_committed == SEQ_UNINIT)
trn_ctx->last_committed = trn_ctx->sequence_number - 1;
/*
如果事务已经写入语句缓存, 则在flush语句缓存之前写入 GTID 信息;
*/
Binlog_event_writer writer(mysql_bin_log.get_log_file());
/* The GTID ownership process might set the commit_error */
error = (thd->commit_error == THD::CE_FLUSH_ERROR ||
thd->commit_error == THD::CE_FLUSH_GNO_EXHAUSTED_ERROR);
if (!error)
// gtid 写入 binlog 文件, 这里直接写入磁盘 binlog 文件, 没有写入 binlog file cache
if ((error = mysql_bin_log.write_gtid(thd, this, &writer)))
thd->commit_error = THD::CE_FLUSH_ERROR;
if (!error)
// 将其他 event 写入 binlog file cache
error = mysql_bin_log.write_cache(thd, this, &writer);
if (flags.with_xid && error == 0)
*wrote_xid = true;
reset();
if (bytes_written)
*bytes_written = bytes_in_cache;
}
assert(!flags.finalized);
DBUG_RETURN(error);
}
/**
Flush the I/O cache to file.
将 IO cache flush到文件中。
如果写入了任何字节, 则将binlog cache flush到文件。
如果 flush 成功, 则发出 flush 成功信号。
*/
int MYSQL_BIN_LOG::flush_cache_to_file(my_off_t *end_pos_var)
{
if (flush_io_cache(&log_file))
{
THD *thd = current_thd;
thd->commit_error = THD::CE_FLUSH_ERROR;
return ER_ERROR_ON_WRITE;
}
*end_pos_var = my_b_tell(&log_file);
return 0;
}
/**
* 更新 binlog end pos 点位, 通知 dump 线程发送 binlog
*/
void update_binlog_end_pos()
{
/*
binlog_end_pos 仅在 master binlog 上使用。
*/
if (is_relay_log)
signal_update();
else
{
// 持有 binlog end pos mutex.
lock_binlog_end_pos();
binlog_end_pos = my_b_tell(&log_file);
// 通知 dump 线程
signal_update();
// 释放 binlog end pos mutex
unlock_binlog_end_pos();
}
}
void signal_update()
{
DBUG_ENTER("MYSQL_BIN_LOG::signal_update");
signal_cnt++;
mysql_cond_broadcast(&update_cond);
DBUG_VOID_RETURN;
}
/**
Call fsync() to sync the file to disk.
sync binlog file 到磁盘中。
根据 sync_binlog 的设置决定是否刷盘。
*/
std::pair
MYSQL_BIN_LOG::sync_binlog_file(bool force)
{
bool synced = false;
unsigned int sync_period = get_sync_period();
if (force || (sync_period && ++sync_counter >= sync_period))
{
sync_counter = 0;
if (DBUG_EVALUATE_IF("simulate_error_during_sync_binlog_file", 1,
mysql_file_sync(log_file.file,
MYF(MY_WME | MY_IGNORE_BADFD))))
{
THD *thd = current_thd;
thd->commit_error = THD::CE_SYNC_ERROR;
return std::make_pair(true, synced);
}
synced = true;
}
return std::make_pair(false, synced);
}
/**
Auxiliary function used in ordered_commit.
调用 after_sync hook. 增强半同步复制相关 after_sync 相关。
等待 dump 线程收到 slave 对于队列中最大的 binlog_file 和 binlog_pos 的 ACK。
*/
static inline int call_after_sync_hook(THD *queue_head)
{
const char *log_file = NULL;
my_off_t pos = 0;
if (NO_HOOK(binlog_storage))
return 0;
assert(queue_head != NULL);
// 遍历 queue 中的线程, 获取到queue队列中事务最大的 binlog file 和 pos
for (THD *thd = queue_head; thd != NULL; thd = thd->next_to_commit)
if (likely(thd->commit_error == THD::CE_NONE))
thd->get_trans_fixed_pos(&log_file, &pos);
// 等待 dump 线程收到最大的 binlog file 和 pos 的 ACK。
if (DBUG_EVALUATE_IF("simulate_after_sync_hook_error", 1, 0) ||
RUN_HOOK(binlog_storage, after_sync, (queue_head, log_file, pos)))
{
return ER_ERROR_ON_WRITE;
}
return 0;
}
/**
提交一系列会话。
这个函数用于提交从 first 开始的队列中的会话。
如果 ordered commit 的 flushing 阶段出错, 则会传入err code, 并标记所有线程。
还会将事务的 GTID 添加到 gtid_executed 中。
*/
void MYSQL_BIN_LOG::process_commit_stage_queue(THD *thd, THD *first)
{
// 持有 lock_commit lock.
mysql_mutex_assert_owner(&LOCK_commit);
// 遍历 commit 队列中的线程
for (THD *head = first; head; head = head->next_to_commit)
{
/*
如果 flush 失败, 则为 session 设置 commit_error, 跳过改事务并继续下一个事务。
这会将所有的线程标记失败, 因为 flush 失败。
如果 flush 成功, 链接上session并在引擎中提交。
*/
// seq_no != 0
if (head->get_transaction()->sequence_number != SEQ_UNINIT)
{
// lock LOCK_slave_trans_dep_tracker
mysql_mutex_lock(&LOCK_slave_trans_dep_tracker);
// 更新提交的最大的 seqno, last_commit
m_dependency_tracker.update_max_committed(head);
mysql_mutex_unlock(&LOCK_slave_trans_dep_tracker);
}
/*
应忽略 flush/sync 错误并继续提交阶段。
因此此时 thd->commit_error 不能是 COMMIT_ERROR。
*/
// 是否是一个真正的提交
bool all = head->get\_transaction()->m\_flags.real\_commit;
// 如果 commit\_low
if (head->get\_transaction()->m\_flags.commit\_low)
{
/\*
storage engine commit
存储引擎提交。
\*/
if (ha\_commit\_low(head, all, false))
head->commit\_error = THD::CE\_COMMIT\_ERROR;
}
}
/*
Handle the GTID of the threads.
gtid_executed table is kept updated even though transactions fail to be
logged. That's required by slave auto positioning.
处理各session的 GTID。
即使无法记录事务, gtid_executed 表也会保持更新。这是 slave auto position 必须的。
*/
// 更新 gtid_executed
gtid_state->update_commit_group(first);
for (THD *head = first; head; head = head->next_to_commit)
{
/*
在存储引擎提交后减少准备好的 XID 计数器。
当遇到 flush 错误或session转储存储时, 还是需要减少准备好的 XID, 以避免用户线程, rotate线程
和 dump线程之间的三向死锁。
*/
if (head->get_transaction()->m_flags.xid_written)
// 减少 xid 计数器
dec_prep_xids(head);
}
}
/**
在 Innodb 中 commit 一个事务, 并标记 SQL 语句结束。*/
static int
innobase_commit(
/*============*/
handlerton *hton, /*!< in: InnoDB handlerton ; 存储引擎 */
THD *thd, /*!< in: MySQL thread handle of the
user for whom the transaction should
be committed ; mysql thread headle */
bool commit_trx) /*!< in: true - commit transaction
false - the current SQL statement
ended ; true: 事务提交 false: 当前 SQL 语句结束 */
{
// 获取 mysql handler 对象的 innodb trx, 如果相应的 MySQL线程结构缺乏一个则创建一个 innodb事务对象。
trx_t *trx = check_trx_exists(thd);
TrxInInnoDB trx_in_innodb(trx);
// 如果事务已标记异步回滚
if (trx_in_innodb.is_aborted())
{
// 进行回滚
innobase_rollback(hton, thd, commit_trx);
DBUG_RETURN(convert_error_code_to_mysql(
DB_FORCED_ABORT, 0, thd));
}
/*
事务仅在 commit 或者回滚时取消注册。
如果取消注册, 我们不能立即释放资源, 我们可以立即返回。
目前, 虽然没有什么需要清理的, 但我们还是谨慎行事, 进行清理。
*/
if (!trx_is_registered_for_2pc(trx) && trx_is_started(trx))
{
}
// read_only
bool read_only = trx->read_only || trx->id == 0;
// 如果是 事务 commit
if (commit_trx || (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
{
/*
我们正在提交整个事务, 或者这是一个SQL语句结束并且 autocommit = on;
我们需要当前的 binlog 位置才能让 mysqlbackup 工作。
*/
// 不是只读事务
if (!read_only)
{
// 依据参数 innobase_commit_concurrency 来判断是否有过多的线程同时提交。
while (innobase_commit_concurrency > 0)
{
// 获取 commit_cond_m mutex
mysql_mutex_lock(&commit_cond_m);
// commit_threads + 1
++commit_threads;
// 正在提交的事务小于 innodb_commit_concurrency, 则释放 commit_cond_m mutex, 进入提交流程
if (commit_threads <= innobase_commit_concurrency)
{
mysql_mutex_unlock(&commit_cond_m);
break;
}
--commit_threads;
mysql_cond_wait(&commit_cond, &commit_cond_m);
mysql_mutex_unlock(&commit_cond_m);
}
/\*
下面的调用读取正在提交的事务的 binlog pos.
其他引擎的二进制日志记录和 Innodb 无关, 因为 Innodb 要求的是提交 Innodb 事务在 MySQL 二进制日志
中的顺序和 Innodb 日志中的顺序相同, 这是由 server 保证的。
如果没有指定 binary log, 或者事务没有写 binlog, file name将是一个空指针。
\*/
ulonglong pos;
// 获取thd最新写入的 binlog 的 pos
thd\_binlog\_pos(thd, &trx->mysql\_log\_file\_name, &pos);
trx->mysql\_log\_offset = static\_cast<int64\_t>(pos);
/\* Don't do write + flush right now. For group commit
to work we want to do the flush later.
现在不要进行 flush, 对于组提交, 我们希望之后再 flush。
\*/
trx->flush\_log\_later = true;
}
// innobase commit, innodb层的提交
innobase\_commit\_low(trx);
// 非 read\_only
if (!read\_only)
{
trx->flush\_log\_later = false;
// 当前正在提交的线程 -1
if (innobase\_commit\_concurrency > 0)
{
mysql\_mutex\_lock(&commit\_cond\_m);
ut\_ad(commit\_threads > 0);
--commit\_threads;
mysql\_cond\_signal(&commit\_cond);
mysql\_mutex\_unlock(&commit\_cond\_m);
}
}
// trx提交完成, 取消注册
trx\_deregister\_from\_2pc(trx);
/\* Now do a write + flush of logs.
现在 write/flush logs.
\*/
if (!read\_only)
{
// write/flush 指定 trx lsn前的redo log到磁盘中。
// 这里会再次 flush redo log。
trx_commit_complete_for_mysql(trx);
}
}
else
{
/*
如果是非事务提交
仅仅需要释放指定SQL语句对应的表上的 auto-inc 锁。
*/
if (!read_only)
{
lock_unlock_table_autoinc(trx);
}
/*
存储当前事务的 undo_no, 以便如果需要roll back下一个SQL语句时直到rollback到哪里。
*/
trx_mark_sql_stat_end(trx);
}
/\* Reset the number AUTO-INC rows required \*/
trx->n\_autoinc\_rows = 0;
/\* This is a statement level variable. \*/
trx->fts\_next\_doc\_id = 0;
// 强制一个线程 leave innodb, 即使他有多余的 tickets.
innobase\_srv\_conc\_force\_exit\_innodb(trx);
DBUG\_RETURN(0);
}
/**
Process after commit for a sequence of sessions.
处理 after_commit.
*/
void MYSQL_BIN_LOG::process_after_commit_stage_queue(THD *thd, THD *first)
{
// 遍历 commit 队列中的线程
for (THD *head = first; head; head = head->next_to_commit)
{
if (head->get_transaction()->m_flags.run_hooks &&
head->commit_error != THD::CE_COMMIT_ERROR)
{
/*
hook 可能会移动到 if 之外, 并且这可能是唯一的一处 after_commit 调用。
*/
bool all = head->get_transaction()->m_flags.real_commit;
(void)RUN_HOOK(transaction, after_commit, (head, all));
head->get_transaction()->m_flags.run_hooks = false;
}
}
}
/* finish commit **/
int MYSQL_BIN_LOG::finish_commit(THD *thd)
{
if (unlikely(!is_open()))
{
// 清空 binlog cache临时文件和内存
binlog_cache_mngr *cache_mngr = thd_get_cache_mngr(thd);
if (cache_mngr)
cache_mngr->reset();
}
if (thd->get_transaction()->sequence_number != SEQ_UNINIT)
{
mysql_mutex_lock(&LOCK_slave_trans_dep_tracker);
// 更新 max_commit
m_dependency_tracker.update_max_committed(thd);
mysql_mutex_unlock(&LOCK_slave_trans_dep_tracker);
}
if (thd->get_transaction()->m_flags.commit_low)
{
const bool all = thd->get_transaction()->m_flags.real_commit;
/*
这里, flush error 和 sync error 将被忽略
*/
assert(thd->commit_error != THD::CE_COMMIT_ERROR);
/*
storage engine commit
存储引擎提交
*/
if (ha_commit_low(thd, all, false))
thd->commit_error = THD::CE_COMMIT_ERROR;
if (thd->get_transaction()->m_flags.xid_written)
dec_prep_xids(thd);
/*
after_commit hook
*/
if ((thd->commit_error != THD::CE_COMMIT_ERROR) &&
thd->get_transaction()->m_flags.run_hooks)
{
(void)RUN_HOOK(transaction, after_commit, (thd, all));
thd->get_transaction()->m_flags.run_hooks = false;
}
}
else if (thd->get_transaction()->m_flags.xid_written)
dec_prep_xids(thd);
if (!thd->owned_gtid.is_empty())
{
/*
更新 gtid_executed
*/
if (thd->commit_error == THD::CE_NONE)
{
gtid_state->update_on_commit(thd);
}
else
gtid_state->update_on_rollback(thd);
}
手机扫一扫
移动阅读更方便
你可能感兴趣的文章