최근에 또 지인에게서 Redis Expire 관련 이야기가 나와서 다시 한번 살짝 정리해 보기로 합니다.
Redis 에서는 Expire라는 기능이 있어서 해당 Key의 수명을 지정해줄 수 있습니다. 그래서 수명이 지난 Key는 자동으로 사라지게 됩니다. 이를 이용하면, 실제로 필요없는 Key를 계속 관리할 필요가 없어집니다. 그런데 실제로는 이 메모리 해제가 쉽게 일어나지 않습니다. 이것은 기본적으로 Expire 가 된 메모리가 해제되는 시점이 3가지 경우가 있기 때문입니다.
- Expire 기간이 지난 Key에 Access 하는 경우
- 명령을 수행하지 않고 이벤트가 발생하는 Tick 마다 적당량씩 삭제한다.
- 메모리가 부족할 때 메모리 정책에 따라서 메모리가 확보될때 까지 삭제한다.
먼저 첫번째 경우를 확인해보겠습니다.(Expire 기간이 지난 Key에 Access 하는 경우)
다음은 레디스에서 Key에 접근할 때 사용하는 lookupKey 함수입니다.
/* Lookup a key for read or write operations, or return NULL if the key is not
* found in the specified DB. This function implements the functionality of
* lookupKeyRead(), lookupKeyWrite() and their ...WithFlags() variants.
*
* Side-effects of calling this function:
*
* 1. A key gets expired if it reached it's TTL.
* 2. The key's last access time is updated.
* 3. The global keys hits/misses stats are updated (reported in INFO).
* 4. If keyspace notifications are enabled, a "keymiss" notification is fired.
*
* Flags change the behavior of this command:
*
* LOOKUP_NONE (or zero): No special flags are passed.
* LOOKUP_NOTOUCH: Don't alter the last access time of the key.
* LOOKUP_NONOTIFY: Don't trigger keyspace event on key miss.
* LOOKUP_NOSTATS: Don't increment key hits/misses counters.
* LOOKUP_WRITE: Prepare the key for writing (delete expired keys even on
* replicas, use separate keyspace stats and events (TODO)).
* LOOKUP_NOEXPIRE: Perform expiration check, but avoid deleting the key,
* so that we don't have to propagate the deletion.
*
* Note: this function also returns NULL if the key is logically expired but
* still existing, in case this is a replica and the LOOKUP_WRITE is not set.
* Even if the key expiry is master-driven, we can correctly report a key is
* expired on replicas even if the master is lagging expiring our key via DELs
* in the replication link. */
robj *lookupKey(redisDb *db, robj *key, int flags) {
dictEntry *de = dictFind(db->dict,key->ptr);
robj *val = NULL;
if (de) {
val = dictGetVal(de);
/* Forcing deletion of expired keys on a replica makes the replica
* inconsistent with the master. We forbid it on readonly replicas, but
* we have to allow it on writable replicas to make write commands
* behave consistently.
*
* It's possible that the WRITE flag is set even during a readonly
* command, since the command may trigger events that cause modules to
* perform additional writes. */
int is_ro_replica = server.masterhost && server.repl_slave_ro;
int expire_flags = 0;
if (flags & LOOKUP_WRITE && !is_ro_replica)
expire_flags |= EXPIRE_FORCE_DELETE_EXPIRED;
if (flags & LOOKUP_NOEXPIRE)
expire_flags |= EXPIRE_AVOID_DELETE_EXPIRED;
if (expireIfNeeded(db, key, expire_flags)) {
/* The key is no longer valid. */
val = NULL;
}
}
if (val) {
/* Update the access time for the ageing algorithm.
* Don't do it if we have a saving child, as this will trigger
* a copy on write madness. */
if (!hasActiveChildProcess() && !(flags & LOOKUP_NOTOUCH)){
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
updateLFU(val);
} else {
val->lru = LRU_CLOCK();
}
}
if (!(flags & (LOOKUP_NOSTATS | LOOKUP_WRITE)))
server.stat_keyspace_hits++;
/* TODO: Use separate hits stats for WRITE */
} else {
if (!(flags & (LOOKUP_NONOTIFY | LOOKUP_WRITE)))
notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id);
if (!(flags & (LOOKUP_NOSTATS | LOOKUP_WRITE)))
server.stat_keyspace_misses++;
/* TODO: Use separate misses stats and notify event for WRITE */
}
return val;
}
코드 중간에 expireIfNeeded 라는 함수를 호출해서 해당 key가 expire 되었다면 삭제합니다.
/* This function is called when we are going to perform some operation
* in a given key, but such key may be already logically expired even if
* it still exists in the database. The main way this function is called
* is via lookupKey*() family of functions.
*
* The behavior of the function depends on the replication role of the
* instance, because by default replicas do not delete expired keys. They
* wait for DELs from the master for consistency matters. However even
* replicas will try to have a coherent return value for the function,
* so that read commands executed in the replica side will be able to
* behave like if the key is expired even if still present (because the
* master has yet to propagate the DEL).
*
* In masters as a side effect of finding a key which is expired, such
* key will be evicted from the database. Also this may trigger the
* propagation of a DEL/UNLINK command in AOF / replication stream.
*
* On replicas, this function does not delete expired keys by default, but
* it still returns 1 if the key is logically expired. To force deletion
* of logically expired keys even on replicas, use the EXPIRE_FORCE_DELETE_EXPIRED
* flag. Note though that if the current client is executing
* replicated commands from the master, keys are never considered expired.
*
* On the other hand, if you just want expiration check, but need to avoid
* the actual key deletion and propagation of the deletion, use the
* EXPIRE_AVOID_DELETE_EXPIRED flag.
*
* The return value of the function is 0 if the key is still valid,
* otherwise the function returns 1 if the key is expired. */
int expireIfNeeded(redisDb *db, robj *key, int flags) {
if (server.lazy_expire_disabled) return 0;
if (!keyIsExpired(db,key)) return 0;
/* If we are running in the context of a replica, instead of
* evicting the expired key from the database, we return ASAP:
* the replica key expiration is controlled by the master that will
* send us synthesized DEL operations for expired keys. The
* exception is when write operations are performed on writable
* replicas.
*
* Still we try to return the right information to the caller,
* that is, 0 if we think the key should be still valid, 1 if
* we think the key is expired at this time.
*
* When replicating commands from the master, keys are never considered
* expired. */
if (server.masterhost != NULL) {
if (server.current_client == server.master) return 0;
if (!(flags & EXPIRE_FORCE_DELETE_EXPIRED)) return 1;
}
/* In some cases we're explicitly instructed to return an indication of a
* missing key without actually deleting it, even on masters. */
if (flags & EXPIRE_AVOID_DELETE_EXPIRED)
return 1;
/* If 'expire' action is paused, for whatever reason, then don't expire any key.
* Typically, at the end of the pause we will properly expire the key OR we
* will have failed over and the new primary will send us the expire. */
if (isPausedActionsWithUpdate(PAUSE_ACTION_EXPIRE)) return 1;
/* Delete the key */
deleteExpiredKeyAndPropagate(db,key);
return 1;
}
이제 두번째 케이스입니다.(명령을 수행하지 않고 이벤트가 발생하는 Tick 마다 적당량씩 삭제한다.)
Redis 에는 특정시간마다 serverCron 이라는 함수가 호출이 되고 여기서 필요한 작업들을 수행하게 됩니다. 참고로 특정 지표수집들도 이때 이루어집니다.
/* This is our timer interrupt, called server.hz times per second.
* Here is where we do a number of things that need to be done asynchronously.
* For instance:
*
* - Active expired keys collection (it is also performed in a lazy way on
* lookup).
* - Software watchdog.
* - Update some statistic.
* - Incremental rehashing of the DBs hash tables.
* - Triggering BGSAVE / AOF rewrite, and handling of terminated children.
* - Clients timeout of different kinds.
* - Replication reconnection.
* - Many more...
*
* Everything directly called here will be called server.hz times per second,
* so in order to throttle execution of things we want to do less frequently
* a macro is used: run_with_period(milliseconds) { .... }
*/
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
int j;
UNUSED(eventLoop);
UNUSED(id);
UNUSED(clientData);
/* Software watchdog: deliver the SIGALRM that will reach the signal
* handler if we don't return here fast enough. */
if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);
server.hz = server.config_hz;
/* Adapt the server.hz value to the number of configured clients. If we have
* many clients, we want to call serverCron() with an higher frequency. */
if (server.dynamic_hz) {
while (listLength(server.clients) / server.hz >
MAX_CLIENTS_PER_CLOCK_TICK)
{
server.hz *= 2;
if (server.hz > CONFIG_MAX_HZ) {
server.hz = CONFIG_MAX_HZ;
break;
}
}
}
/* for debug purposes: skip actual cron work if pause_cron is on */
if (server.pause_cron) return 1000/server.hz;
run_with_period(100) {
long long stat_net_input_bytes, stat_net_output_bytes;
long long stat_net_repl_input_bytes, stat_net_repl_output_bytes;
atomicGet(server.stat_net_input_bytes, stat_net_input_bytes);
atomicGet(server.stat_net_output_bytes, stat_net_output_bytes);
atomicGet(server.stat_net_repl_input_bytes, stat_net_repl_input_bytes);
atomicGet(server.stat_net_repl_output_bytes, stat_net_repl_output_bytes);
trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands);
trackInstantaneousMetric(STATS_METRIC_NET_INPUT,
stat_net_input_bytes + stat_net_repl_input_bytes);
trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT,
stat_net_output_bytes + stat_net_repl_output_bytes);
trackInstantaneousMetric(STATS_METRIC_NET_INPUT_REPLICATION,
stat_net_repl_input_bytes);
trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT_REPLICATION,
stat_net_repl_output_bytes);
}
/* We have just LRU_BITS bits per object for LRU information.
* So we use an (eventually wrapping) LRU clock.
*
* Note that even if the counter wraps it's not a big problem,
* everything will still work but some object will appear younger
* to Redis. However for this to happen a given object should never be
* touched for all the time needed to the counter to wrap, which is
* not likely.
*
* Note that you can change the resolution altering the
* LRU_CLOCK_RESOLUTION define. */
unsigned int lruclock = getLRUClock();
atomicSet(server.lruclock,lruclock);
cronUpdateMemoryStats();
/* We received a SIGTERM or SIGINT, shutting down here in a safe way, as it is
* not ok doing so inside the signal handler. */
if (server.shutdown_asap && !isShutdownInitiated()) {
int shutdownFlags = SHUTDOWN_NOFLAGS;
if (server.last_sig_received == SIGINT && server.shutdown_on_sigint)
shutdownFlags = server.shutdown_on_sigint;
else if (server.last_sig_received == SIGTERM && server.shutdown_on_sigterm)
shutdownFlags = server.shutdown_on_sigterm;
if (prepareForShutdown(shutdownFlags) == C_OK) exit(0);
} else if (isShutdownInitiated()) {
if (server.mstime >= server.shutdown_mstime || isReadyToShutdown()) {
if (finishShutdown() == C_OK) exit(0);
/* Shutdown failed. Continue running. An error has been logged. */
}
}
/* Show some info about non-empty databases */
if (server.verbosity <= LL_VERBOSE) {
run_with_period(5000) {
for (j = 0; j < server.dbnum; j++) {
long long size, used, vkeys;
size = dictSlots(server.db[j].dict);
used = dictSize(server.db[j].dict);
vkeys = dictSize(server.db[j].expires);
if (used || vkeys) {
serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
}
}
}
}
/* Show information about connected clients */
if (!server.sentinel_mode) {
run_with_period(5000) {
serverLog(LL_DEBUG,
"%lu clients connected (%lu replicas), %zu bytes in use",
listLength(server.clients)-listLength(server.slaves),
listLength(server.slaves),
zmalloc_used_memory());
}
}
/* We need to do a few operations on clients asynchronously. */
clientsCron();
/* Handle background operations on Redis databases. */
databasesCron();
/* Start a scheduled AOF rewrite if this was requested by the user while
* a BGSAVE was in progress. */
if (!hasActiveChildProcess() &&
server.aof_rewrite_scheduled &&
!aofRewriteLimited())
{
rewriteAppendOnlyFileBackground();
}
/* Check if a background saving or AOF rewrite in progress terminated. */
if (hasActiveChildProcess() || ldbPendingChildren())
{
run_with_period(1000) receiveChildInfo();
checkChildrenDone();
} else {
/* If there is not a background saving/rewrite in progress check if
* we have to save/rewrite now. */
for (j = 0; j < server.saveparamslen; j++) {
struct saveparam *sp = server.saveparams+j;
/* Save if we reached the given amount of changes,
* the given amount of seconds, and if the latest bgsave was
* successful or if, in case of an error, at least
* CONFIG_BGSAVE_RETRY_DELAY seconds already elapsed. */
if (server.dirty >= sp->changes &&
server.unixtime-server.lastsave > sp->seconds &&
(server.unixtime-server.lastbgsave_try >
CONFIG_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == C_OK))
{
serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
sp->changes, (int)sp->seconds);
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
rdbSaveBackground(SLAVE_REQ_NONE,server.rdb_filename,rsiptr);
break;
}
}
/* Trigger an AOF rewrite if needed. */
if (server.aof_state == AOF_ON &&
!hasActiveChildProcess() &&
server.aof_rewrite_perc &&
server.aof_current_size > server.aof_rewrite_min_size)
{
long long base = server.aof_rewrite_base_size ?
server.aof_rewrite_base_size : 1;
long long growth = (server.aof_current_size*100/base) - 100;
if (growth >= server.aof_rewrite_perc && !aofRewriteLimited()) {
serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
rewriteAppendOnlyFileBackground();
}
}
}
/* Just for the sake of defensive programming, to avoid forgetting to
* call this function when needed. */
updateDictResizePolicy();
/* AOF postponed flush: Try at every cron cycle if the slow fsync
* completed. */
if ((server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE) &&
server.aof_flush_postponed_start)
{
flushAppendOnlyFile(0);
}
/* AOF write errors: in this case we have a buffer to flush as well and
* clear the AOF error in case of success to make the DB writable again,
* however to try every second is enough in case of 'hz' is set to
* a higher frequency. */
run_with_period(1000) {
if ((server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE) &&
server.aof_last_write_status == C_ERR)
{
flushAppendOnlyFile(0);
}
}
/* Clear the paused actions state if needed. */
updatePausedActions();
/* Replication cron function -- used to reconnect to master,
* detect transfer failures, start background RDB transfers and so forth.
*
* If Redis is trying to failover then run the replication cron faster so
* progress on the handshake happens more quickly. */
if (server.failover_state != NO_FAILOVER) {
run_with_period(100) replicationCron();
} else {
run_with_period(1000) replicationCron();
}
/* Run the Redis Cluster cron. */
run_with_period(100) {
if (server.cluster_enabled) clusterCron();
}
/* Run the Sentinel timer if we are in sentinel mode. */
if (server.sentinel_mode) sentinelTimer();
/* Cleanup expired MIGRATE cached sockets. */
run_with_period(1000) {
migrateCloseTimedoutSockets();
}
/* Stop the I/O threads if we don't have enough pending work. */
stopThreadedIOIfNeeded();
/* Resize tracking keys table if needed. This is also done at every
* command execution, but we want to be sure that if the last command
* executed changes the value via CONFIG SET, the server will perform
* the operation even if completely idle. */
if (server.tracking_clients) trackingLimitUsedSlots();
/* Start a scheduled BGSAVE if the corresponding flag is set. This is
* useful when we are forced to postpone a BGSAVE because an AOF
* rewrite is in progress.
*
* Note: this code must be after the replicationCron() call above so
* make sure when refactoring this file to keep this order. This is useful
* because we want to give priority to RDB savings for replication. */
if (!hasActiveChildProcess() &&
server.rdb_bgsave_scheduled &&
(server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == C_OK))
{
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
if (rdbSaveBackground(SLAVE_REQ_NONE,server.rdb_filename,rsiptr) == C_OK)
server.rdb_bgsave_scheduled = 0;
}
run_with_period(100) {
if (moduleCount()) modulesCron();
}
/* Fire the cron loop modules event. */
RedisModuleCronLoopV1 ei = {REDISMODULE_CRON_LOOP_VERSION,server.hz};
moduleFireServerEvent(REDISMODULE_EVENT_CRON_LOOP,
0,
&ei);
server.cronloops++;
return 1000/server.hz;
}
그리고 위의 serverCron 에 있는 함수 중에 databaseCron 이라는 함수가 실제로 메모리 해제를 하게 됩니다. databaseCron 에서는 Expire를 제거하거나, 메모리 Fragmentation 을 줄인다거나 db의 rehashing 등을 통해서 메모리 효율을 높이고 좀 더 빠르게 데이터를 찾게 하기 위한 선처리를 한다고 보면 될듯합니다.
/* This function handles 'background' operations we are required to do
* incrementally in Redis databases, such as active key expiring, resizing,
* rehashing. */
void databasesCron(void) {
/* Expire keys by random sampling. Not required for slaves
* as master will synthesize DELs for us. */
if (server.active_expire_enabled) {
if (iAmMaster()) {
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
} else {
expireSlaveKeys();
}
}
/* Defrag keys gradually. */
activeDefragCycle();
/* Perform hash tables rehashing if needed, but only if there are no
* other processes saving the DB on disk. Otherwise rehashing is bad
* as will cause a lot of copy-on-write of memory pages. */
if (!hasActiveChildProcess()) {
/* We use global counters so if we stop the computation at a given
* DB we'll be able to start from the successive in the next
* cron loop iteration. */
static unsigned int resize_db = 0;
static unsigned int rehash_db = 0;
int dbs_per_call = CRON_DBS_PER_CALL;
int j;
/* Don't test more DBs than we have. */
if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum;
/* Resize */
for (j = 0; j < dbs_per_call; j++) {
tryResizeHashTables(resize_db % server.dbnum);
resize_db++;
}
/* Rehash */
if (server.activerehashing) {
for (j = 0; j < dbs_per_call; j++) {
int work_done = incrementallyRehash(rehash_db);
if (work_done) {
/* If the function did some work, stop here, we'll do
* more at the next cron loop. */
break;
} else {
/* If this db didn't need rehash, we'll try the next one. */
rehash_db++;
rehash_db %= server.dbnum;
}
}
}
}
}
여기서 activeExpireCycle 함수에서 말 그대로 Expire를 처리하게 됩니다.
/* Try to expire a few timed out keys. The algorithm used is adaptive and
* will use few CPU cycles if there are few expiring keys, otherwise
* it will get more aggressive to avoid that too much memory is used by
* keys that can be removed from the keyspace.
*
* Every expire cycle tests multiple databases: the next call will start
* again from the next db. No more than CRON_DBS_PER_CALL databases are
* tested at every iteration.
*
* The function can perform more or less work, depending on the "type"
* argument. It can execute a "fast cycle" or a "slow cycle". The slow
* cycle is the main way we collect expired cycles: this happens with
* the "server.hz" frequency (usually 10 hertz).
*
* However the slow cycle can exit for timeout, since it used too much time.
* For this reason the function is also invoked to perform a fast cycle
* at every event loop cycle, in the beforeSleep() function. The fast cycle
* will try to perform less work, but will do it much more often.
*
* The following are the details of the two expire cycles and their stop
* conditions:
*
* If type is ACTIVE_EXPIRE_CYCLE_FAST the function will try to run a
* "fast" expire cycle that takes no longer than ACTIVE_EXPIRE_CYCLE_FAST_DURATION
* microseconds, and is not repeated again before the same amount of time.
* The cycle will also refuse to run at all if the latest slow cycle did not
* terminate because of a time limit condition.
*
* If type is ACTIVE_EXPIRE_CYCLE_SLOW, that normal expire cycle is
* executed, where the time limit is a percentage of the REDIS_HZ period
* as specified by the ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC define. In the
* fast cycle, the check of every database is interrupted once the number
* of already expired keys in the database is estimated to be lower than
* a given percentage, in order to avoid doing too much work to gain too
* little memory.
*
* The configured expire "effort" will modify the baseline parameters in
* order to do more work in both the fast and slow expire cycles.
*/
#define ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP 20 /* Keys for each DB loop. */
#define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds. */
#define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* Max % of CPU to use. */
#define ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE 10 /* % of stale keys after which
we do extra efforts. */
void activeExpireCycle(int type) {
/* Adjust the running parameters according to the configured expire
* effort. The default effort is 1, and the maximum configurable effort
* is 10. */
unsigned long
effort = server.active_expire_effort-1, /* Rescale from 0 to 9. */
config_keys_per_loop = ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP +
ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP/4*effort,
config_cycle_fast_duration = ACTIVE_EXPIRE_CYCLE_FAST_DURATION +
ACTIVE_EXPIRE_CYCLE_FAST_DURATION/4*effort,
config_cycle_slow_time_perc = ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC +
2*effort,
config_cycle_acceptable_stale = ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE-
effort;
/* This function has some global state in order to continue the work
* incrementally across calls. */
static unsigned int current_db = 0; /* Next DB to test. */
static int timelimit_exit = 0; /* Time limit hit in previous call? */
static long long last_fast_cycle = 0; /* When last fast cycle ran. */
int j, iteration = 0;
int dbs_per_call = CRON_DBS_PER_CALL;
long long start = ustime(), timelimit, elapsed;
/* If 'expire' action is paused, for whatever reason, then don't expire any key.
* Typically, at the end of the pause we will properly expire the key OR we
* will have failed over and the new primary will send us the expire. */
if (isPausedActionsWithUpdate(PAUSE_ACTION_EXPIRE)) return;
if (type == ACTIVE_EXPIRE_CYCLE_FAST) {
/* Don't start a fast cycle if the previous cycle did not exit
* for time limit, unless the percentage of estimated stale keys is
* too high. Also never repeat a fast cycle for the same period
* as the fast cycle total duration itself. */
if (!timelimit_exit &&
server.stat_expired_stale_perc < config_cycle_acceptable_stale)
return;
if (start < last_fast_cycle + (long long)config_cycle_fast_duration*2)
return;
last_fast_cycle = start;
}
/* We usually should test CRON_DBS_PER_CALL per iteration, with
* two exceptions:
*
* 1) Don't test more DBs than we have.
* 2) If last time we hit the time limit, we want to scan all DBs
* in this iteration, as there is work to do in some DB and we don't want
* expired keys to use memory for too much time. */
if (dbs_per_call > server.dbnum || timelimit_exit)
dbs_per_call = server.dbnum;
/* We can use at max 'config_cycle_slow_time_perc' percentage of CPU
* time per iteration. Since this function gets called with a frequency of
* server.hz times per second, the following is the max amount of
* microseconds we can spend in this function. */
timelimit = config_cycle_slow_time_perc*1000000/server.hz/100;
timelimit_exit = 0;
if (timelimit <= 0) timelimit = 1;
if (type == ACTIVE_EXPIRE_CYCLE_FAST)
timelimit = config_cycle_fast_duration; /* in microseconds. */
/* Accumulate some global stats as we expire keys, to have some idea
* about the number of keys that are already logically expired, but still
* existing inside the database. */
long total_sampled = 0;
long total_expired = 0;
/* Try to smoke-out bugs (server.also_propagate should be empty here) */
serverAssert(server.also_propagate.numops == 0);
for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) {
/* Expired and checked in a single loop. */
unsigned long expired, sampled;
redisDb *db = server.db+(current_db % server.dbnum);
/* Increment the DB now so we are sure if we run out of time
* in the current DB we'll restart from the next. This allows to
* distribute the time evenly across DBs. */
current_db++;
/* Continue to expire if at the end of the cycle there are still
* a big percentage of keys to expire, compared to the number of keys
* we scanned. The percentage, stored in config_cycle_acceptable_stale
* is not fixed, but depends on the Redis configured "expire effort". */
do {
unsigned long num, slots;
long long now, ttl_sum;
int ttl_samples;
iteration++;
/* If there is nothing to expire try next DB ASAP. */
if ((num = dictSize(db->expires)) == 0) {
db->avg_ttl = 0;
break;
}
slots = dictSlots(db->expires);
now = mstime();
/* When there are less than 1% filled slots, sampling the key
* space is expensive, so stop here waiting for better times...
* The dictionary will be resized asap. */
if (slots > DICT_HT_INITIAL_SIZE &&
(num*100/slots < 1)) break;
/* The main collection cycle. Sample random keys among keys
* with an expire set, checking for expired ones. */
expired = 0;
sampled = 0;
ttl_sum = 0;
ttl_samples = 0;
if (num > config_keys_per_loop)
num = config_keys_per_loop;
/* Here we access the low level representation of the hash table
* for speed concerns: this makes this code coupled with dict.c,
* but it hardly changed in ten years.
*
* Note that certain places of the hash table may be empty,
* so we want also a stop condition about the number of
* buckets that we scanned. However scanning for free buckets
* is very fast: we are in the cache line scanning a sequential
* array of NULL pointers, so we can scan a lot more buckets
* than keys in the same time. */
long max_buckets = num*20;
long checked_buckets = 0;
while (sampled < num && checked_buckets < max_buckets) {
for (int table = 0; table < 2; table++) {
if (table == 1 && !dictIsRehashing(db->expires)) break;
unsigned long idx = db->expires_cursor;
idx &= DICTHT_SIZE_MASK(db->expires->ht_size_exp[table]);
dictEntry *de = db->expires->ht_table[table][idx];
long long ttl;
/* Scan the current bucket of the current table. */
checked_buckets++;
while(de) {
/* Get the next entry now since this entry may get
* deleted. */
dictEntry *e = de;
de = de->next;
ttl = dictGetSignedIntegerVal(e)-now;
if (activeExpireCycleTryExpire(db,e,now)) {
expired++;
/* Propagate the DEL command */
postExecutionUnitOperations();
}
if (ttl > 0) {
/* We want the average TTL of keys yet
* not expired. */
ttl_sum += ttl;
ttl_samples++;
}
sampled++;
}
}
db->expires_cursor++;
}
total_expired += expired;
total_sampled += sampled;
/* Update the average TTL stats for this database. */
if (ttl_samples) {
long long avg_ttl = ttl_sum/ttl_samples;
/* Do a simple running average with a few samples.
* We just use the current estimate with a weight of 2%
* and the previous estimate with a weight of 98%. */
if (db->avg_ttl == 0) db->avg_ttl = avg_ttl;
db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50);
}
/* We can't block forever here even if there are many keys to
* expire. So after a given amount of milliseconds return to the
* caller waiting for the other active expire cycle. */
if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */
elapsed = ustime()-start;
if (elapsed > timelimit) {
timelimit_exit = 1;
server.stat_expired_time_cap_reached_count++;
break;
}
}
/* We don't repeat the cycle for the current database if there are
* an acceptable amount of stale keys (logically expired but yet
* not reclaimed). */
} while (sampled == 0 ||
(expired*100/sampled) > config_cycle_acceptable_stale);
}
elapsed = ustime()-start;
server.stat_expire_cycle_time_used += elapsed;
latencyAddSampleIfNeeded("expire-cycle",elapsed/1000);
/* Update our estimate of keys existing but yet to be expired.
* Running average with this sample accounting for 5%. */
double current_perc;
if (total_sampled) {
current_perc = (double)total_expired/total_sampled;
} else
current_perc = 0;
server.stat_expired_stale_perc = (current_perc*0.05)+
(server.stat_expired_stale_perc*0.95);
}
코드가 복잡해보이지만 요약하면, expire key들이 있는 hash_table을 하나씩 스캔해 나가면서 삭제하는 것입니다. 다만 key가 얼마나 있을지 모르기 때문에 시간을 계산하고, 개수를 세서, 지울 개수를 결정합니다. 정리하면 다음과 같은 기준으로 동작합니다.
ACTIVE_EXPIRE_CYCLE_SLOW | 정해진 시간(time_limit)만큼 루프를 돈다. 하나의 DB당 정해진 개수를 expire 하면 다음 DB로 이동한다. |
ACTIVE_EXPIRE_CYCLE_FAST | type 이 FAST일 때는 time_limit 를 확 줄인다. 나머지는 동일 |
이제 세 번째 케이스는 메모리가 필요한 시점에 메모리가 부족해서 지우게 되는데, 이 때는 memory-policy를 따라서 삭제하게 됩니다. 이 부분은 performEvictions 함수에서 동작하게 됩니다.
/* Check that memory usage is within the current "maxmemory" limit. If over
* "maxmemory", attempt to free memory by evicting data (if it's safe to do so).
*
* It's possible for Redis to suddenly be significantly over the "maxmemory"
* setting. This can happen if there is a large allocation (like a hash table
* resize) or even if the "maxmemory" setting is manually adjusted. Because of
* this, it's important to evict for a managed period of time - otherwise Redis
* would become unresponsive while evicting.
*
* The goal of this function is to improve the memory situation - not to
* immediately resolve it. In the case that some items have been evicted but
* the "maxmemory" limit has not been achieved, an aeTimeProc will be started
* which will continue to evict items until memory limits are achieved or
* nothing more is evictable.
*
* This should be called before execution of commands. If EVICT_FAIL is
* returned, commands which will result in increased memory usage should be
* rejected.
*
* Returns:
* EVICT_OK - memory is OK or it's not possible to perform evictions now
* EVICT_RUNNING - memory is over the limit, but eviction is still processing
* EVICT_FAIL - memory is over the limit, and there's nothing to evict
* */
int performEvictions(void) {
/* Note, we don't goto update_metrics here because this check skips eviction
* as if it wasn't triggered. it's a fake EVICT_OK. */
if (!isSafeToPerformEvictions()) return EVICT_OK;
int keys_freed = 0;
size_t mem_reported, mem_tofree;
long long mem_freed; /* May be negative */
mstime_t latency, eviction_latency;
long long delta;
int slaves = listLength(server.slaves);
int result = EVICT_FAIL;
if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL) == C_OK) {
result = EVICT_OK;
goto update_metrics;
}
if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION) {
result = EVICT_FAIL; /* We need to free memory, but policy forbids. */
goto update_metrics;
}
unsigned long eviction_time_limit_us = evictionTimeLimitUs();
mem_freed = 0;
latencyStartMonitor(latency);
monotime evictionTimer;
elapsedStart(&evictionTimer);
/* Try to smoke-out bugs (server.also_propagate should be empty here) */
serverAssert(server.also_propagate.numops == 0);
while (mem_freed < (long long)mem_tofree) {
int j, k, i;
static unsigned int next_db = 0;
sds bestkey = NULL;
int bestdbid;
redisDb *db;
dict *dict;
dictEntry *de;
if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
{
struct evictionPoolEntry *pool = EvictionPoolLRU;
while (bestkey == NULL) {
unsigned long total_keys = 0, keys;
/* We don't want to make local-db choices when expiring keys,
* so to start populate the eviction pool sampling keys from
* every DB. */
for (i = 0; i < server.dbnum; i++) {
db = server.db+i;
dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ?
db->dict : db->expires;
if ((keys = dictSize(dict)) != 0) {
evictionPoolPopulate(i, dict, db->dict, pool);
total_keys += keys;
}
}
if (!total_keys) break; /* No keys to evict. */
/* Go backward from best to worst element to evict. */
for (k = EVPOOL_SIZE-1; k >= 0; k--) {
if (pool[k].key == NULL) continue;
bestdbid = pool[k].dbid;
if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
de = dictFind(server.db[bestdbid].dict,
pool[k].key);
} else {
de = dictFind(server.db[bestdbid].expires,
pool[k].key);
}
/* Remove the entry from the pool. */
if (pool[k].key != pool[k].cached)
sdsfree(pool[k].key);
pool[k].key = NULL;
pool[k].idle = 0;
/* If the key exists, is our pick. Otherwise it is
* a ghost and we need to try the next element. */
if (de) {
bestkey = dictGetKey(de);
break;
} else {
/* Ghost... Iterate again. */
}
}
}
}
/* volatile-random and allkeys-random policy */
else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM)
{
/* When evicting a random key, we try to evict a key for
* each DB, so we use the static 'next_db' variable to
* incrementally visit all DBs. */
for (i = 0; i < server.dbnum; i++) {
j = (++next_db) % server.dbnum;
db = server.db+j;
dict = (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ?
db->dict : db->expires;
if (dictSize(dict) != 0) {
de = dictGetRandomKey(dict);
bestkey = dictGetKey(de);
bestdbid = j;
break;
}
}
}
/* Finally remove the selected key. */
if (bestkey) {
db = server.db+bestdbid;
robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
/* We compute the amount of memory freed by db*Delete() alone.
* It is possible that actually the memory needed to propagate
* the DEL in AOF and replication link is greater than the one
* we are freeing removing the key, but we can't account for
* that otherwise we would never exit the loop.
*
* Same for CSC invalidation messages generated by signalModifiedKey.
*
* AOF and Output buffer memory will be freed eventually so
* we only care about memory used by the key space. */
delta = (long long) zmalloc_used_memory();
latencyStartMonitor(eviction_latency);
dbGenericDelete(db,keyobj,server.lazyfree_lazy_eviction,DB_FLAG_KEY_EVICTED);
latencyEndMonitor(eviction_latency);
latencyAddSampleIfNeeded("eviction-del",eviction_latency);
delta -= (long long) zmalloc_used_memory();
mem_freed += delta;
server.stat_evictedkeys++;
signalModifiedKey(NULL,db,keyobj);
notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
keyobj, db->id);
propagateDeletion(db,keyobj,server.lazyfree_lazy_eviction);
postExecutionUnitOperations();
decrRefCount(keyobj);
keys_freed++;
if (keys_freed % 16 == 0) {
/* When the memory to free starts to be big enough, we may
* start spending so much time here that is impossible to
* deliver data to the replicas fast enough, so we force the
* transmission here inside the loop. */
if (slaves) flushSlavesOutputBuffers();
/* Normally our stop condition is the ability to release
* a fixed, pre-computed amount of memory. However when we
* are deleting objects in another thread, it's better to
* check, from time to time, if we already reached our target
* memory, since the "mem_freed" amount is computed only
* across the dbAsyncDelete() call, while the thread can
* release the memory all the time. */
if (server.lazyfree_lazy_eviction) {
if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) {
break;
}
}
/* After some time, exit the loop early - even if memory limit
* hasn't been reached. If we suddenly need to free a lot of
* memory, don't want to spend too much time here. */
if (elapsedUs(evictionTimer) > eviction_time_limit_us) {
// We still need to free memory - start eviction timer proc
startEvictionTimeProc();
break;
}
}
} else {
goto cant_free; /* nothing to free... */
}
}
/* at this point, the memory is OK, or we have reached the time limit */
result = (isEvictionProcRunning) ? EVICT_RUNNING : EVICT_OK;
cant_free:
if (result == EVICT_FAIL) {
/* At this point, we have run out of evictable items. It's possible
* that some items are being freed in the lazyfree thread. Perform a
* short wait here if such jobs exist, but don't wait long. */
mstime_t lazyfree_latency;
latencyStartMonitor(lazyfree_latency);
while (bioPendingJobsOfType(BIO_LAZY_FREE) &&
elapsedUs(evictionTimer) < eviction_time_limit_us) {
if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) {
result = EVICT_OK;
break;
}
usleep(eviction_time_limit_us < 1000 ? eviction_time_limit_us : 1000);
}
latencyEndMonitor(lazyfree_latency);
latencyAddSampleIfNeeded("eviction-lazyfree",lazyfree_latency);
}
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("eviction-cycle",latency);
update_metrics:
if (result == EVICT_RUNNING || result == EVICT_FAIL) {
if (server.stat_last_eviction_exceeded_time == 0)
elapsedStart(&server.stat_last_eviction_exceeded_time);
} else if (result == EVICT_OK) {
if (server.stat_last_eviction_exceeded_time != 0) {
server.stat_total_eviction_exceeded_time += elapsedUs(server.stat_last_eviction_exceeded_time);
server.stat_last_eviction_exceeded_time = 0;
}
}
return result;
}
코드 자체는 복잡하지만, 핵심적인 코드는 다음 부분입니다. MAXMEMORY_POLICY에 따라서 그냥 dict 인지 expires 키들만 있는 곳인지가 결정되게 됩니다.
for (i = 0; i < server.dbnum; i++) {
db = server.db+i;
dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ?
db->dict : db->expires;
if ((keys = dictSize(dict)) != 0) {
evictionPoolPopulate(i, dict, db->dict, pool);
total_keys += keys;
}
}
그리고 이 performEvictions 함수는 매 명령이 처리되는 processCommand 함수 안에서 호출이 되게 됩니다.
그러면 메모리 관리가 잘 될거 같지만… 한 가지 문제가 더 남아있습니다. 실제로 Redis를 관리하다 보면, expire로 키가 사라지거나, 심지어 Del 로 많은 key를 지워도 예상한 것보다 메모리 해제가 되지 않는 경우를 경험하게 됩니다. 이것은 Redis가 메모리 관리를 온전히 jemalloc 에 맡기는데, jemalloc 이 메모리 페이지를 할당하고, 언제 반납하는가에 따라서 물리 메모리 사용량이 바뀌기 때문입니다.
간단히 예를 들어 우리가 메모리를 많이 삭제했지만, 연속된 메모리가 아니라서, 4k 페이지나, 더 큰 페이지에 실제로 데이터는 비워졌지만, 페이지는 반납되지 않고, 다음번에는 그 공간보다 더 큰 메모리가 필요해서, 계속 새로운 페이지를 할당 받을 수 있습니다. 이것을 우리는 fragmentation(파편화) 라고 부릅니다. 그래서 실제로 flushall 이나 flushdb로 삭제하면 바로 메모리가 돌아오는 것을 확인할 수 있는데, 그냥은 데이터를 아무리 삭제해도 메모리가 삭제되지 않는 경우가 종종 발생하므로 이에 대한 주의가 필요합니다.
좀 더 재미난 내용을 원하시면 Defrag 관련 함수를 찾아보시면…