本文想解决的问题:
redis 如何感知并触发 key 过期的?
如何防止大规模的 key 同时过期,导致 redis 主循环阻塞在清理过期 key?
如何防止大 key 过期,导致 redis 主循环阻塞在清理大 key?
主动过期操作 activeExpireCycle
为什么要分为 beforesleep 期间执行的 FAST 循环 和 在定时任务中的 SLOW 循环?
源码存在 expireSlaveKeys
函数,所以从库会主动过期 key?
从库如何组织和识别 Writable-Slave 写入的 key 和 从主库同步的 key?
delGenericCommand
是删除 key 操作,为什么也需要检查 key 是否过期?理论上讲,key 是否过期对于删除操作来说,应该是无伤大雅的吧?
redis 如何感知并触发 key 过期的?
主动过期
- beforesleep: matser 在每次事件循环的
epoll_wait
之前执行的 beforeslepp 回调函数会执行一次 FAST 的主动过期循环。 更多关于 beforesleep 回调函数,详见 Redis 源码解读之事件循环机制- 定时任务: 频率为
server.hz
的定时事件的serverCron
回调函数会执行一次 SLOW 的主动过期循环(master), 或者执行从库写入 key 的过期(slave)。被动过期: 执行
scanGenericCommand
,delGenericCommand
,dbRandomKey
,lookupKeyWriteWithFlags
,lookupKeyReadWithFlags
操作前,会校验操作的 key 是否已经过期。如果已过期,则删除该 key(master), 或者不做操作(slave)。
- 每次执行清除超时 key 操作,会有执行时间,执行轮次等监控。当发现执行超时时间超过一定阈值,就会结束本轮清理工作。
可以启动
lazyfree_lazy_expire
,调用dbAsyncDelete()
函数使用 bio(Background I/O) 的 bio_lazy_free 线程池后台清理。当然,bio 又是另外一个故事了,有机会备好瓜子儿,咱细聊~
activeExpireCycle
为什么要分为 beforesleep 期间执行的 FAST 循环 和 在定时任务中的 SLOW 循环?
一般来说,
ACTIVE_EXPIRE_CYCLE_FAST
执行频率高,执行时间更短。正常情况下,定时任务执行
ACTIVE_EXPIRE_CYCLE_SLOW
操作,清理过期 key。这样做的好处是:在可接受范围内(server.stat_expired_stale_perc < config_cycle_acceptable_stale
), 通过较为保守的速度清理过期key,以达到节省 CPU 负载的目的。当过期 key 过多时,定时任务的
ACTIVE_EXPIRE_CYCLE_SLOW
操作在一个周期内无法让过期 key 减少到符合预期(server.stat_expired_stale_perc < config_cycle_acceptable_stale
)。这时候就需要启动 beforesleep 的ACTIVE_EXPIRE_CYCLE_FAST
操作。这样做的好处是:当过期 key 过多时,通过更为激进的方式来清理过期 key, 避免大量的过期 key 消耗过多的内存。The algorithm used is adaptive and will use few CPU cycles if there are few expiring keys, otherwise it will get more aggressive to avoid that too much memory is used by keys that can be removed from the keyspace.
expireSlaveKeys
函数,所以从库会主动过期 key?会的,但是不是从主库同步的key。
从主库同步的 key,过期时间由主库维护,主库同步 DEL 操作到从库。
从库如果是 READ-WRITE 模式,就可以继续写入数据。从库自己写入的数据就需要自己来维护其过期操作。
全局变量
dict *slaveKeysWithExpire
维护从库写入的 key 一个 bitmap。该 bitmap 记录在哪几个 DB 中存有从库写入的这个 key。比如:
((dictFind(slaveKeysWithExpire,keyname)->v.u64) & 1) == 1
则在编号 0 的 DB 存在keyname
这个 key。The dictionary has an SDS string representing the key as the hash table key, while the value is a 64 bit unsigned integer with the bits corresponding to the DB where the keys may exist set to 1.
delGenericCommand
是删除 key 操作,为什么也需要检查 key 是否过期?理论上讲,key 是否过期对于删除操作来说,应该是无伤大雅的吧?
delGenericCommand
源码如下。可以看到,删除操作会返回实际删除掉的 key 的数量。因此过期 key 需要过滤掉。/* This command implements DEL and LAZYDEL. */ void delGenericCommand(client *c, int lazy) { int numdel = 0, j; for (j = 1; j < c->argc; j++) { expireIfNeeded(c->db,c->argv[j]); int deleted = lazy ? dbAsyncDelete(c->db,c->argv[j]) : dbSyncDelete(c->db,c->argv[j]); if (deleted) { signalModifiedKey(c,c->db,c->argv[j]); notifyKeyspaceEvent(NOTIFY_GENERIC, "del",c->argv[j],c->db->id); server.dirty++; numdel++; } } addReplyLongLong(c,numdel); }
redisSever
部分字段在操作 key 过期过程中用到的部分 redisSever
的字段如下:
stat_expired_stale_perc
: 预估的过期 key 占 key 总数的比例。每次执行主动过期操作会统计遍历过程中过期 key 占遍历 key 总数的比例(current_perc = (double)total_expired/total_sampled
)。它与之前的 stat_expired_stale_perc
加权平均,就是新的预估的过期 key 的比例(server.stat_expired_stale_perc = (current_perc*0.05)+ (server.stat_expired_stale_perc*0.95)
)。
lazyfree_lazy_expire
: 是否开启 bio 删除过期大 key。
其他字段,见源码注释:
// in server.h
struct redisServer {
//... ...
long long stat_expiredkeys; /* Number of expired keys */
int active_expire_enabled; /* Can be disabled for testing purposes. */
double stat_expired_stale_perc; /* Percentage of keys probably expired */
long long stat_expire_cycle_time_used; /* Cumulative microseconds used. */
unsigned long expires_cursor; /* Cursor of the active expire cycle. */
long long stat_expired_time_cap_reached_count; /* Early expire cylce stops.*/
int hz; /* serverCron() calls frequency in hertz *//* Lazy free */
int lazyfree_lazy_expire;
/* Cluster */
int cluster_enabled; /* Is cluster enabled? */
struct clusterState *cluster; /* State of the cluster */
/* Replication (slave) */
char *masterhost; /* Hostname of master */
int repl_slave_ro; /* Slave is read only? */
//... ...
}
过期 key 主要有两种方式删除, 一种是定时事件 serverCron
和 beforeSleep 触发的主动删除,另一种是在读取 key 过程中被动删除。
可以看到,定时任务会根据当前 redis 服务的状态来确定超时方式:
ACTIVE_EXPIRE_CYCLE_SLOW
操作。expireSlaveKeys()
删除 Writable-Slave 写入的过期 key。(弱弱地说一句,这类 key 的删除简单粗暴,没有 FAST/SLOW 的控制,单纯通过定时事件触发) beforeSleep
回调函数会在 master 每次进入 epoll_wait
之前确认是否需要进行 ACTIVE_EXPIRE_CYCLE_FAST
操作来减少过期 key 对内存的消耗。
activeExpireCycle
函数遍历 expire
字典,删除过期 key。同时完成预估未过期 key 的平均 ttl,过期操作耗时采样,预估过期 key 占所有 key 的比例等操作。删除过期 key 有两种方式, 同步删除和异步删除。当 server.lazyfree_lazy_expire == 1
, 会使用后台线程池 bio 实现异步删除过期大 key。(这里小挖一个坑,后面有空再介绍下 bio)。
propagateExpire
函数同步 DEL
操作到 AOF 文件和 slave。
// in expire.c
/* Helper function for the activeExpireCycle() function.
* This function will try to expire the key that is stored in the hash table
* entry 'de' of the 'expires' hash table of a Redis database.
*
* If the key is found to be expired, it is removed from the database and
* 1 is returned. Otherwise no operation is performed and 0 is returned.
*
* When a key is expired, server.stat_expiredkeys is incremented.
*
* The parameter 'now' is the current time in milliseconds as is passed
* to the function to avoid too many gettimeofday() syscalls. */
int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) {
long long t = dictGetSignedIntegerVal(de);
if (now > t) {
sds key = dictGetKey(de);
robj *keyobj = createStringObject(key,sdslen(key));
propagateExpire(db,keyobj,server.lazyfree_lazy_expire);
if (server.lazyfree_lazy_expire)
dbAsyncDelete(db,keyobj);
else
dbSyncDelete(db,keyobj);
notifyKeyspaceEvent(NOTIFY_EXPIRED,
"expired",keyobj,db->id);
signalModifiedKey(NULL, db, keyobj);
decrRefCount(keyobj);
server.stat_expiredkeys++;
return 1;
} else {
return 0;
}
}
// in db.c
/* Propagate expires into slaves and the AOF file.
* When a key expires in the master, a DEL operation for this key is sent
* to all the slaves and the AOF file if enabled.
*
* This way the key expiry is centralized in one place, and since both
* AOF and the master->slave link guarantee operation ordering, everything
* will be consistent even if we allow write operations against expiring
* keys. */
void propagateExpire(redisDb *db, robj *key, int lazy) {
robj *argv[2];
argv[0] = lazy ? shared.unlink : shared.del;
argv[1] = key;
incrRefCount(argv[0]);
incrRefCount(argv[1]);
if (server.aof_state != AOF_OFF)
feedAppendOnlyFile(server.delCommand,db->id,argv,2);
replicationFeedSlaves(server.slaves,db->id,argv,2);
decrRefCount(argv[0]);
decrRefCount(argv[1]);
}
/* Try to expire a few timed out keys. The algorithm used is adaptive and
* will use few CPU cycles if there are few expiring keys, otherwise
* it will get more aggressive to avoid that too much memory is used by
* keys that can be removed from the keyspace.
*
* Every expire cycle tests multiple databases: the next call will start
* again from the next db, with the exception of exists for time limit: in that
* case we restart again from the last database we were processing. Anyway
* no more than CRON_DBS_PER_CALL databases are tested at every iteration.
*
* The function can perform more or less work, depending on the "type"
* argument. It can execute a "fast cycle" or a "slow cycle". The slow
* cycle is the main way we collect expired cycles: this happens with
* the "server.hz" frequency (usually 10 hertz).
*
* However the slow cycle can exit for timeout, since it used too much time.
* For this reason the function is also invoked to perform a fast cycle
* at every event loop cycle, in the beforeSleep() function. The fast cycle
* will try to perform less work, but will do it much more often.
*
* The following are the details of the two expire cycles and their stop
* conditions:
*
* If type is ACTIVE_EXPIRE_CYCLE_FAST the function will try to run a
* "fast" expire cycle that takes no longer than ACTIVE_EXPIRE_CYCLE_FAST_DURATION
* microseconds, and is not repeated again before the same amount of time.
* The cycle will also refuse to run at all if the latest slow cycle did not
* terminate because of a time limit condition.
*
* If type is ACTIVE_EXPIRE_CYCLE_SLOW, that normal expire cycle is
* executed, where the time limit is a percentage of the REDIS_HZ period
* as specified by the ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC define. In the
* fast cycle, the check of every database is interrupted once the number
* of already expired keys in the database is estimated to be lower than
* a given percentage, in order to avoid doing too much work to gain too
* little memory.
*
* The configured expire "effort" will modify the baseline parameters in
* order to do more work in both the fast and slow expire cycles.
*/
#define ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP 20 /* Keys for each DB loop. */
#define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds. */
#define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* Max % of CPU to use. */
#define ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE 10 /* % of stale keys after which
we do extra efforts. */
void activeExpireCycle(int type) {
/* Adjust the running parameters according to the configured expire
* effort. The default effort is 1, and the maximum configurable effort
* is 10. */
unsigned long
effort = server.active_expire_effort-1, /* Rescale from 0 to 9. */
config_keys_per_loop = ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP +
ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP/4*effort,
config_cycle_fast_duration = ACTIVE_EXPIRE_CYCLE_FAST_DURATION +
ACTIVE_EXPIRE_CYCLE_FAST_DURATION/4*effort,
config_cycle_slow_time_perc = ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC +
2*effort,
config_cycle_acceptable_stale = ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE-
effort;
/* This function has some global state in order to continue the work
* incrementally across calls. */
static unsigned int current_db = 0; /* Last DB tested. */
static int timelimit_exit = 0; /* Time limit hit in previous call? */
static long long last_fast_cycle = 0; /* When last fast cycle ran. */
int j, iteration = 0;
int dbs_per_call = CRON_DBS_PER_CALL;
long long start = ustime(), timelimit, elapsed;
/* When clients are paused the dataset should be static not just from the
* POV of clients not being able to write, but also from the POV of
* expires and evictions of keys not being performed. */
if (clientsArePaused()) return;
if (type == ACTIVE_EXPIRE_CYCLE_FAST) {
/* Don't start a fast cycle if the previous cycle did not exit
* for time limit, unless the percentage of estimated stale keys is
* too high. Also never repeat a fast cycle for the same period
* as the fast cycle total duration itself. */
if (!timelimit_exit &&
server.stat_expired_stale_perc < config_cycle_acceptable_stale)
return;
if (start < last_fast_cycle + (long long)config_cycle_fast_duration*2)
return;
last_fast_cycle = start;
}
/* We usually should test CRON_DBS_PER_CALL per iteration, with
* two exceptions:
*
* 1) Don't test more DBs than we have.
* 2) If last time we hit the time limit, we want to scan all DBs
* in this iteration, as there is work to do in some DB and we don't want
* expired keys to use memory for too much time. */
if (dbs_per_call > server.dbnum || timelimit_exit)
dbs_per_call = server.dbnum;
/* We can use at max 'config_cycle_slow_time_perc' percentage of CPU
* time per iteration. Since this function gets called with a frequency of
* server.hz times per second, the following is the max amount of
* microseconds we can spend in this function. */
timelimit = config_cycle_slow_time_perc*1000000/server.hz/100;
timelimit_exit = 0;
if (timelimit <= 0) timelimit = 1;
if (type == ACTIVE_EXPIRE_CYCLE_FAST)
timelimit = config_cycle_fast_duration; /* in microseconds. */
/* Accumulate some global stats as we expire keys, to have some idea
* about the number of keys that are already logically expired, but still
* existing inside the database. */
long total_sampled = 0;
long total_expired = 0;
for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) {
/* Expired and checked in a single loop. */
unsigned long expired, sampled;
redisDb *db = server.db+(current_db % server.dbnum);
/* Increment the DB now so we are sure if we run out of time
* in the current DB we'll restart from the next. This allows to
* distribute the time evenly across DBs. */
current_db++;
/* Continue to expire if at the end of the cycle there are still
* a big percentage of keys to expire, compared to the number of keys
* we scanned. The percentage, stored in config_cycle_acceptable_stale
* is not fixed, but depends on the Redis configured "expire effort". */
do {
unsigned long num, slots;
long long now, ttl_sum;
int ttl_samples;
iteration++;
/* If there is nothing to expire try next DB ASAP. */
if ((num = dictSize(db->expires)) == 0) {
db->avg_ttl = 0;
break;
}
slots = dictSlots(db->expires);
now = mstime();
/* When there are less than 1% filled slots, sampling the key
* space is expensive, so stop here waiting for better times...
* The dictionary will be resized asap. */
if (num && slots > DICT_HT_INITIAL_SIZE &&
(num*100/slots < 1)) break;
/* The main collection cycle. Sample random keys among keys
* with an expire set, checking for expired ones. */
expired = 0;
sampled = 0;
ttl_sum = 0;
ttl_samples = 0;
if (num > config_keys_per_loop)
num = config_keys_per_loop;
/* Here we access the low level representation of the hash table
* for speed concerns: this makes this code coupled with dict.c,
* but it hardly changed in ten years.
*
* Note that certain places of the hash table may be empty,
* so we want also a stop condition about the number of
* buckets that we scanned. However scanning for free buckets
* is very fast: we are in the cache line scanning a sequential
* array of NULL pointers, so we can scan a lot more buckets
* than keys in the same time. */
long max_buckets = num*20;
long checked_buckets = 0;
while (sampled < num && checked_buckets < max_buckets) {
for (int table = 0; table < 2; table++) {
if (table == 1 && !dictIsRehashing(db->expires)) break;
unsigned long idx = db->expires_cursor;
idx &= db->expires->ht[table].sizemask;
dictEntry *de = db->expires->ht[table].table[idx];
long long ttl;
/* Scan the current bucket of the current table. */
checked_buckets++;
while(de) {
/* Get the next entry now since this entry may get
* deleted. */
dictEntry *e = de;
de = de->next;
ttl = dictGetSignedIntegerVal(e)-now;
if (activeExpireCycleTryExpire(db,e,now)) expired++;
if (ttl > 0) {
/* We want the average TTL of keys yet
* not expired. */
ttl_sum += ttl;
ttl_samples++;
}
sampled++;
}
}
db->expires_cursor++;
}
total_expired += expired;
total_sampled += sampled;
/* Update the average TTL stats for this database. */
if (ttl_samples) {
long long avg_ttl = ttl_sum/ttl_samples;
/* Do a simple running average with a few samples.
* We just use the current estimate with a weight of 2%
* and the previous estimate with a weight of 98%. */
if (db->avg_ttl == 0) db->avg_ttl = avg_ttl;
db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50);
}
/* We can't block forever here even if there are many keys to
* expire. So after a given amount of milliseconds return to the
* caller waiting for the other active expire cycle. */
if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */
elapsed = ustime()-start;
if (elapsed > timelimit) {
timelimit_exit = 1;
server.stat_expired_time_cap_reached_count++;
break;
}
}
/* We don't repeat the cycle for the current database if there are
* an acceptable amount of stale keys (logically expired but yet
* not reclaimed). */
} while (sampled == 0 ||
(expired*100/sampled) > config_cycle_acceptable_stale);
}
elapsed = ustime()-start;
server.stat_expire_cycle_time_used += elapsed;
latencyAddSampleIfNeeded("expire-cycle",elapsed/1000);
/* Update our estimate of keys existing but yet to be expired.
* Running average with this sample accounting for 5%. */
double current_perc;
if (total_sampled) {
current_perc = (double)total_expired/total_sampled;
} else
current_perc = 0;
server.stat_expired_stale_perc = (current_perc*0.05)+
(server.stat_expired_stale_perc*0.95);
}
这个函数相对比较简单,直接上源码了。大概就是遍历检查 slaveKeysWithExpire
所标记的 Writable-Slave 写入的 key ,删除其中超时的 key。
/*-----------------------------------------------------------------------------
* Expires of keys created in writable slaves
*
* Normally slaves do not process expires: they wait the masters to synthesize
* DEL operations in order to retain consistency. However writable slaves are
* an exception: if a key is created in the slave and an expire is assigned
* to it, we need a way to expire such a key, since the master does not know
* anything about such a key.
*
* In order to do so, we track keys created in the slave side with an expire
* set, and call the expireSlaveKeys() function from time to time in order to
* reclaim the keys if they already expired.
*
* Note that the use case we are trying to cover here, is a popular one where
* slaves are put in writable mode in order to compute slow operations in
* the slave side that are mostly useful to actually read data in a more
* processed way. Think at sets intersections in a tmp key, with an expire so
* that it is also used as a cache to avoid intersecting every time.
*
* This implementation is currently not perfect but a lot better than leaking
* the keys as implemented in 3.2.
*----------------------------------------------------------------------------*/
/* The dictionary where we remember key names and database ID of keys we may
* want to expire from the slave. Since this function is not often used we
* don't even care to initialize the database at startup. We'll do it once
* the feature is used the first time, that is, when rememberSlaveKeyWithExpire()
* is called.
*
* The dictionary has an SDS string representing the key as the hash table
* key, while the value is a 64 bit unsigned integer with the bits corresponding
* to the DB where the keys may exist set to 1. Currently the keys created
* with a DB id > 63 are not expired, but a trivial fix is to set the bitmap
* to the max 64 bit unsigned value when we know there is a key with a DB
* ID greater than 63, and check all the configured DBs in such a case. */
dict *slaveKeysWithExpire = NULL;
/* Check the set of keys created by the master with an expire set in order to
* check if they should be evicted. */
void expireSlaveKeys(void) {
if (slaveKeysWithExpire == NULL ||
dictSize(slaveKeysWithExpire) == 0) return;
int cycles = 0, noexpire = 0;
mstime_t start = mstime();
while(1) {
dictEntry *de = dictGetRandomKey(slaveKeysWithExpire);
sds keyname = dictGetKey(de);
uint64_t dbids = dictGetUnsignedIntegerVal(de);
uint64_t new_dbids = 0;
/* Check the key against every database corresponding to the
* bits set in the value bitmap. */
int dbid = 0;
while(dbids && dbid < server.dbnum) {
if ((dbids & 1) != 0) {
redisDb *db = server.db+dbid;
dictEntry *expire = dictFind(db->expires,keyname);
int expired = 0;
if (expire &&
activeExpireCycleTryExpire(server.db+dbid,expire,start))
{
expired = 1;
}
/* If the key was not expired in this DB, we need to set the
* corresponding bit in the new bitmap we set as value.
* At the end of the loop if the bitmap is zero, it means we
* no longer need to keep track of this key. */
if (expire && !expired) {
noexpire++;
new_dbids |= (uint64_t)1 << dbid;
}
}
dbid++;
dbids >>= 1;
}
/* Set the new bitmap as value of the key, in the dictionary
* of keys with an expire set directly in the writable slave. Otherwise
* if the bitmap is zero, we no longer need to keep track of it. */
if (new_dbids)
dictSetUnsignedIntegerVal(de,new_dbids);
else
dictDelete(slaveKeysWithExpire,keyname);
/* Stop conditions: found 3 keys we can't expire in a row or
* time limit was reached. */
cycles++;
if (noexpire > 3) break;
if ((cycles % 64) == 0 && mstime()-start > 1) break;
if (dictSize(slaveKeysWithExpire) == 0) break;
}
}
调用关系都很简单,大家看图应该很容易理解。
delGenericCommand
为什么也需要提前删除过期 key 在本文第一 part 已经提到,这里就不再赘述。 手机扫一扫
移动阅读更方便
你可能感兴趣的文章