在pgpool的源代码中有有一个pgpool_main.c文件,在该文件中有一个pgpool的主函数pgpoolmain控制着pgpool的运行及相关操作。
libpcp_ext.h文件中定义了pgpool在一个集群中所运行的数据库节点个数如下宏定义及128个;
#define MAX_NUM_BACKENDS 128
#define MAX_CONNECTION_SLOTS MAX_NUM_BACKENDS
#define MAX_DB_HOST_NAMELEN 128
#define MAX_PATH_LENGTH 256
在pgpool中对后端数据库状态的几种定义如下:
typedef enum {
CON_UNUSED, /* unused slot */
CON_CONNECT_WAIT, /* waiting for connection starting */
CON_UP, /* up and running */
CON_DOWN /* down, disconnected */
} BACKEND_STATUS;
/* backend status name strings */
#define BACKEND_STATUS_CON_UNUSED "unused"
#define BACKEND_STATUS_CON_CONNECT_WAIT "waiting"
#define BACKEND_STATUS_CON_UP "up"
#define BACKEND_STATUS_CON_DOWN "down"
PostgreSQL数据库在pgpool集群中的描述信息如下:
/*
* PostgreSQL backend descriptor. Placed on shared memory area.
*/
typedef struct {
char backend_hostname[MAX_DB_HOST_NAMELEN]; /* backend host name */
int backend_port; /* backend port numbers */
BACKEND_STATUS backend_status; /* backend status */
double backend_weight; /* normalized backend load balance ratio */
double unnormalized_weight; /* descripted parameter */
char backend_data_directory[MAX_PATH_LENGTH];
unsigned short flag; /* various flags */
unsigned long long int standby_delay; /* The replication delay against the primary */
} BackendInfo;
typedef struct {
sig_atomic_t num_backends; /* Number of used PostgreSQL backends.
* This needs to be a sig_atomic_t type
* since it is replaced by a local
* variable while reloading pgpool.conf.
*/
BackendInfo backend_info[MAX_NUM_BACKENDS];#记录了pgpool中数据库的最大数;
} BackendDesc;
/*
* Calculate next valid master node id.
* If no valid node found, returns -1.
*/
这个函数就是控制选举下一个master节点的函数;
static int get_next_master_node(void)
{
int i;
for (i=0;i
{
/*
* Do not use VALID_BACKEND macro in raw mode.
* VALID_BACKEND return true only if the argument is master
* node id. In other words, standby nodes are false. So need
* to check backend status with VALID_BACKEND_RAW.
*/
if (RAW_MODE)
{
if (VALID_BACKEND_RAW(i))
break;
}
else
{
if (VALID_BACKEND(i))
break;
}
}
if (i == pool_config->backend_desc->num_backends)
i = -1;
return i;
}
/*
* backend connection error, failover/failback request, if possible
* failover() must be called under protecting signals.
*/
static void failover(void)
{
int i, j, k;
int node_id;
int new_master;
int new_primary;
int nodes[MAX_NUM_BACKENDS];
bool need_to_restart_children;
bool partial_restart;
int status;
int sts;
bool need_to_restart_pcp = false;
bool all_backend_down = true;
ereport(DEBUG1,
(errmsg("failover handler called")));
memset(nodes, 0, sizeof(int) * MAX_NUM_BACKENDS);
/*
* this could happen in a child process if a signal has been sent
* before resetting signal handler
*/
if (getpid() != mypid)
{
ereport(DEBUG1,
(errmsg("failover handler called"),
errdetail("I am not parent")));
kill(pcp_pid, SIGUSR2);
return;
}
/*
* processing SIGTERM, SIGINT or SIGQUIT
*/
if (exiting)
{
ereport(DEBUG1,
(errmsg("failover handler called while exiting")));
kill(pcp_pid, SIGUSR2);
return;
}
/*
* processing fail over or switch over
*/
if (switching)
{
ereport(DEBUG1,
(errmsg("failover handler called while switching")));
kill(pcp_pid, SIGUSR2);
return;
}
Req_info->switching = true;
switching = 1;
for(;;)
{
POOL_REQUEST_KIND reqkind;
int queue_index;
int node_id_set[MAX_NUM_BACKENDS];
int node_count;
unsigned char request_details;
WDFailoverCMDResults wdInterlockingRes;
pool_semaphore_lock(REQUEST_INFO_SEM);
if(Req_info->request_queue_tail == Req_info->request_queue_head) /* request queue is empty*/
{
switching = 0;
Req_info->switching = false;
pool_semaphore_unlock(REQUEST_INFO_SEM);
break;
}
/* make a local copy of request */
Req_info->request_queue_head++;
queue_index = Req_info->request_queue_head % MAX_REQUEST_QUEUE_SIZE;
memcpy(node_id_set, Req_info->request[queue_index].node_id , (sizeof(int) * Req_info->request[queue_index].count));
reqkind = Req_info->request[queue_index].kind;
request_details = Req_info->request[queue_index].request_details;
node_count = Req_info->request[queue_index].count;
pool_semaphore_unlock(REQUEST_INFO_SEM);
ereport(DEBUG1,
(errmsg("failover handler"),
errdetail("kind: %d flags: %x node_count: %d index:%d", reqkind, request_details, node_count, queue_index)));
if (reqkind == CLOSE_IDLE_REQUEST)
{
kill_all_children(SIGUSR1);
continue;
}
/* start watchdog interlocking */
wdInterlockingRes = wd_start_failover_interlocking();
/*
* if not in replication mode/master slave mode, we treat this a restart request.
* otherwise we need to check if we have already failovered.
*/
ereport(DEBUG1,
(errmsg("failover handler"),
errdetail("starting to select new master node")));
node_id = node_id_set[0];
/* failback request? */
if (reqkind == NODE_UP_REQUEST)
{
if (node_id < 0 || node_id >= MAX_NUM_BACKENDS ||
(reqkind == NODE_UP_REQUEST && !(RAW_MODE &&
BACKEND_INFO(node_id).backend_status == CON_DOWN) && VALID_BACKEND(node_id)) ||
(reqkind == NODE_DOWN_REQUEST && !VALID_BACKEND(node_id)))
{
if (node_id < 0 || node_id >= MAX_NUM_BACKENDS)
ereport(LOG,
(errmsg("invalid failback request, node id: %d is invalid. node id must be between [0 and %d]",node_id,MAX_NUM_BACKENDS)));
else
ereport(LOG,
(errmsg("invalid failback request, status: [%d] of node id : %d is invalid for failback",BACKEND_INFO(node_id).backend_status,node_id)));
if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
wd_end_failover_interlocking();
continue;
}
ereport(LOG,
(errmsg("starting fail back. reconnect host %s(%d)",
BACKEND_INFO(node_id).backend_hostname,
BACKEND_INFO(node_id).backend_port)));
/* Check to see if all backends are down */
for (i=0;i<NUM_BACKENDS;i++)
{
if (BACKEND_INFO(i).backend_status != CON_DOWN &&
BACKEND_INFO(i).backend_status != CON_UNUSED)
{
ereport(LOG,
(errmsg("Node %d is not down (status: %d)",
i, BACKEND_INFO(i).backend_status)));
all_backend_down = false;
break;
}
}
BACKEND_INFO(node_id).backend_status = CON_CONNECT_WAIT; /* unset down status */
(void)write_status_file();
/* Aquire failback start command lock */
if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
{
trigger_failover_command(node_id, pool_config->failback_command,
MASTER_NODE_ID, get_next_master_node(), PRIMARY_NODE_ID);
wd_failover_lock_release(FAILBACK_LOCK);
}
else
{
/*
* Okay we are not allowed to execute the failover command
* so we need to wait till the one who is executing the command
* finish with it.
*/
wd_wait_until_command_complete_or_timeout(FAILBACK_LOCK);
}
}
else if (reqkind == PROMOTE_NODE_REQUEST)
{
if (node_id != -1 && VALID_BACKEND(node_id))
{
ereport(LOG,
(errmsg("starting promotion. promote host %s(%d)",
BACKEND_INFO(node_id).backend_hostname,
BACKEND_INFO(node_id).backend_port)));
}
else
{
ereport(LOG,
(errmsg("failover: no backends are promoted")));
if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
wd_end_failover_interlocking();
continue;
}
}
else /* NODE_DOWN_REQUEST */
{
int cnt = 0;
for (i = 0; i < node_count; i++)
{
if (node_id_set[i] != -1 &&
((RAW_MODE && VALID_BACKEND_RAW(node_id_set[i])) ||
VALID_BACKEND(node_id_set[i])))
{
ereport(LOG,
(errmsg("starting degeneration. shutdown host %s(%d)",
BACKEND_INFO(node_id_set[i]).backend_hostname,
BACKEND_INFO(node_id_set[i]).backend_port)));
BACKEND_INFO(node_id_set[i]).backend_status = CON_DOWN; /* set down status */
(void)write_status_file();
/* save down node */
nodes[node_id_set[i]] = 1;
cnt++;
}
}
if (cnt == 0)
{
ereport(LOG,
(errmsg("failover: no backends are degenerated")));
if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
wd_end_failover_interlocking();
continue;
}
}
new_master = get_next_master_node();
if (new_master < 0)
{
ereport(LOG,
(errmsg("failover: no valid backends node found")));
}
ereport(DEBUG1, (errmsg("failover/failback request details: STREAM: %d reqkind: %d detail: %x node_id: %d",
STREAM, reqkind, request_details & REQ_DETAIL_SWITCHOVER,
node_id)));
/* On 2011/5/2 Tatsuo Ishii says: if mode is streaming replication
* and request is NODE_UP_REQUEST (failback case) we don't need to
* restart all children. Existing session will not use newly
* attached node, but load balanced node is not changed until this
* session ends, so it's harmless anyway.
*/
/*
* On 2015/9/21 Tatsuo Ishii says: this judgment is not sufficient if
* all backends were down. Child process has local status in which all
* backends are down. In this case even if new connection arrives from
* frontend, the child will not accept it because the local status
* shows all backends are down. For this purpose we refer to
* "all_backend_down" variable, which was set before updating backend status.
*
* See bug 248 for more details.
*/
if (STREAM && reqkind == NODE_UP_REQUEST && all_backend_down == false)
{
ereport(LOG,
(errmsg("Do not restart children because we are failbacking node id %d host: %s port: %d and we are in streaming replication mode and not all backends were down", node_id,
BACKEND_INFO(node_id).backend_hostname,
BACKEND_INFO(node_id).backend_port)));
need_to_restart_children = false;
partial_restart = false;
}
/*
* If the mode is streaming replication and the request is
* NODE_DOWN_REQUEST and it's actually a switch over request, we don't
* need to restart all children, except the node is primary.
*/
else if (STREAM && reqkind == NODE_DOWN_REQUEST &&
request_details & REQ_DETAIL_SWITCHOVER && node_id != PRIMARY_NODE_ID)
{
ereport(LOG,
(errmsg("Do not restart children because we are switching over node id %d host: %s port: %d and we are in streaming replication mode", node_id,
BACKEND_INFO(node_id).backend_hostname,
BACKEND_INFO(node_id).backend_port)));
need_to_restart_children = true;
partial_restart = true;
for (i = 0; i < pool_config->num_init_children; i++)
{
bool restart = false;
for (j=0;j
{
for (k=0;k<NUM_BACKENDS;k++)
{
ConnectionInfo *con = pool_coninfo(i, j, k);
if (con->connected && con->load_balancing_node == node_id)
{
ereport(LOG,
(errmsg("child pid %d needs to restart because pool %d uses backend %d",
process_info[i].pid, j, node_id)));
restart = true;
break;
}
}
}
if (restart)
{
pid_t pid = process_info[i].pid;
if (pid)
{
kill(pid, SIGQUIT);
ereport(DEBUG1,
(errmsg("failover handler"),
errdetail("kill process with PID:%d", pid)));
}
}
}
}
else
{
ereport(LOG,
(errmsg("Restart all children")));
/* kill all children */
for (i = 0; i < pool_config->num_init_children; i++)
{
pid_t pid = process_info[i].pid;
if (pid)
{
kill(pid, SIGQUIT);
ereport(DEBUG1,
(errmsg("failover handler"),
errdetail("kill process with PID:%d", pid)));
}
}
need_to_restart_children = true;
partial_restart = false;
}
if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
{
/* Exec failover_command if needed */
for (i = 0; i < pool_config->backend_desc->num_backends; i++)
{
if (nodes[i])
trigger_failover_command(i, pool_config->failover_command,
MASTER_NODE_ID, new_master, PRIMARY_NODE_ID);
}
wd_failover_lock_release(FAILOVER_LOCK);
}
else
{
wd_wait_until_command_complete_or_timeout(FAILOVER_LOCK);
}
/* no need to wait since it will be done in reap_handler */
#ifdef NOT_USED
while (wait(NULL) > 0)
;
if (errno != ECHILD)
ereport(LOG,
(errmsg("failover_handler: wait() failed. reason:%s", strerror(errno))));
#endif
if (reqkind == PROMOTE_NODE_REQUEST && VALID_BACKEND(node_id))
new_primary = node_id;
/*
* If the down node was a standby node in streaming replication
* mode, we can avoid calling find_primary_node_repeatedly() and
* recognize the former primary as the new primary node, which
* will reduce the time to process standby down.
*/
else if (MASTER_SLAVE && pool_config->master_slave_sub_mode == STREAM_MODE &&
reqkind == NODE_DOWN_REQUEST)
{
if (Req_info->primary_node_id != node_id)
new_primary = Req_info->primary_node_id;
else
new_primary = find_primary_node_repeatedly();
}
else
new_primary = find_primary_node_repeatedly();
/*
* If follow_master_command is provided and in master/slave
* streaming replication mode, we start degenerating all backends
* as they are not replicated anymore.
*/
int follow_cnt = 0;
if (MASTER_SLAVE && pool_config->master_slave_sub_mode == STREAM_MODE)
{
if (*pool_config->follow_master_command != '\0' ||
reqkind == PROMOTE_NODE_REQUEST)
{
/* only if the failover is against the current primary */
if (((reqkind == NODE_DOWN_REQUEST) &&
(nodes[Req_info->primary_node_id])) ||
((reqkind == PROMOTE_NODE_REQUEST) &&
(VALID_BACKEND(node_id))))
{
for (i = 0; i < pool_config->backend_desc->num_backends; i++)
{
/* do not degenerate the new primary */
if ((new_primary >= 0) && (i != new_primary)) {
BackendInfo *bkinfo;
bkinfo = pool_get_node_info(i);
ereport(LOG,
(errmsg("starting follow degeneration. shutdown host %s(%d)",
bkinfo->backend_hostname,
bkinfo->backend_port)));
bkinfo->backend_status = CON_DOWN; /* set down status */
(void)write_status_file();
follow_cnt++;
}
}
if (follow_cnt == 0)
{
ereport(LOG,
(errmsg("failover: no follow backends are degenerated")));
}
else
{
/* update new master node */
new_master = get_next_master_node();
ereport(LOG,
(errmsg("failover: %d follow backends have been degenerated", follow_cnt)));
}
}
}
}
/*
* follow master command also uses the same locks used by trigring command
*/
if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
{
if ((follow_cnt > 0) && (*pool_config->follow_master_command != '\0'))
{
follow_pid = fork_follow_child(Req_info->master_node_id, new_primary,
Req_info->primary_node_id);
}
wd_failover_lock_release(FOLLOW_MASTER_LOCK);
}
else
{
wd_wait_until_command_complete_or_timeout(FOLLOW_MASTER_LOCK);
}
/* Save primary node id */
Req_info->primary_node_id = new_primary;
ereport(LOG,
(errmsg("failover: set new primary node: %d", Req_info->primary_node_id)));
if (new_master >= 0)
{
Req_info->master_node_id = new_master;
ereport(LOG,
(errmsg("failover: set new master node: %d", Req_info->master_node_id)));
}
/* Kill children and restart them if needed */
if (need_to_restart_children)
{
for (i=0;i
{
/*
* Try to kill pgpool child because previous kill signal
* may not be received by pgpool child. This could happen
* if multiple PostgreSQL are going down (or even starting
* pgpool, without starting PostgreSQL can trigger this).
* Child calls degenerate_backend() and it tries to aquire
* semaphore to write a failover request. In this case the
* signal mask is set as well, thus signals are never
* received.
*/
bool restart = false;
if (partial_restart)
{
for (j=0;j
{
for (k=0;k<NUM_BACKENDS;k++)
{
ConnectionInfo *con = pool_coninfo(i, j, k);
if (con->connected && con->load_balancing_node == node_id)
{
ereport(LOG,
(errmsg("child pid %d needs to restart because pool %d uses backend %d",
process_info[i].pid, j, node_id)));
restart = true;
break;
}
}
}
}
else
restart = true;
if (restart)
{
if (process_info[i].pid)
{
kill(process_info[i].pid, SIGQUIT);
process_info[i].pid = fork_a_child(fds, i);
process_info[i].start_time = time(NULL);
}
}
else
process_info[i].need_to_restart = 1;
}
}
else
{
/* Set restart request to each child. Children will exit(1)
* whenever they are convenient.
*/
for (i=0;i
{
process_info[i].need_to_restart = 1;
}
}
/*
* Send restart request to worker child.
*/
kill(worker_pid, SIGUSR1);
if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
wd_end_failover_interlocking();
if (reqkind == NODE_UP_REQUEST)
{
ereport(LOG,
(errmsg("failback done. reconnect host %s(%d)",
BACKEND_INFO(node_id).backend_hostname,
BACKEND_INFO(node_id).backend_port)));
}
else if (reqkind == PROMOTE_NODE_REQUEST)
{
ereport(LOG,
(errmsg("promotion done. promoted host %s(%d)",
BACKEND_INFO(node_id).backend_hostname,
BACKEND_INFO(node_id).backend_port)));
}
else
{
/* Temporary black magic. Without this regression 055 does not finish */
fprintf(stderr, "failover done. shutdown host %s(%d)",
BACKEND_INFO(node_id).backend_hostname,
BACKEND_INFO(node_id).backend_port);
ereport(LOG,
(errmsg("failover done. shutdown host %s(%d)",
BACKEND_INFO(node_id).backend_hostname,
BACKEND_INFO(node_id).backend_port)));
}
need_to_restart_pcp = true;
}
switching = 0;
Req_info->switching = false;
/* kick wakeup_handler in pcp_child to notice that
* failover/failback done
*/
kill(pcp_pid, SIGUSR2);
if(need_to_restart_pcp)
{
sleep(1);
/*
* Send restart request to pcp child.
*/
kill(pcp_pid, SIGUSR1);
for (;;)
{
sts = waitpid(pcp_pid, &status, 0);
if (sts != -1)
break;
if (sts == -1)
{
if (errno == EINTR)
continue;
else
{
ereport(WARNING,
(errmsg("failover: waitpid failed. reason: %s", strerror(errno))));
continue;
}
}
}
if (WIFSIGNALED(status))
ereport(LOG,
(errmsg("PCP child %d exits with status %d by signal %d in failover()", pcp_pid, status, WTERMSIG(status))));
else
ereport(LOG,
(errmsg("PCP child %d exits with status %d in failover()", pcp_pid, status)));
pcp_pid = pcp_fork_a_child(pcp_unix_fd, pcp_inet_fd, pcp_conf_file);
ereport(LOG,
(errmsg("fork a new PCP child pid %d in failover()", pcp_pid)));
}
}
手机扫一扫
移动阅读更方便
你可能感兴趣的文章