[입 개발] memcached slab 구조

slab

MAX_NUMBER_OF_SLAB_CLASSES = 63 + 1로 정의됨

#!c
static slabclass_t slabclass[MAX_NUMBER_OF_SLAB_CLASSES];

slabclass_t 구조체는 다음과 같다. slabclass 에서 list_size 는
slab_list 의 capacity 이다. 그리고 slabs는 현재 몇개의 slab이
할당되었는지는 나타낸다.(현재의 length)

#!c
typedef struct {
unsigned int size; /* sizes of items */
unsigned int perslab; /* how many items per slab */

void *slots; /* list of item ptrs */
unsigned int sl_curr; /* total free items in list */

unsigned int slabs; /* how many slabs were allocated for this class */

void **slab_list; /* array of slab pointers */
unsigned int list_size; /* size of prev array */

size_t requested; /* The number of requested bytes */
} slabclass_t;

memcache 는 시작시에 factor 값에 의해서 사이즈 Range 별로 slabclass id를 가지도록
설정한다. slabs_preallocate 을 이용해서 미리 메모리를 할당해둘 수 도 있다.
이걸 이용하지 않으면 chunk 가 필요할 때 chunk를 할당한다. 기본 chunk size는 1MB이다.

#!c
#define CHUNK_ALIGN_BYTES 8

CHUNK_ALIGN_BYTES = 8

#!c
void slabs_init(const size_t limit, const double factor, const bool prealloc) {
int i = POWER_SMALLEST - 1;
//default chunk_size = 48
//sizeof(item) = 48
//factor = 1.25
//item_size_max = 1024 * 1024
//POWER_SMALLEST = 1

unsigned int size = sizeof(item) + settings.chunk_size;

mem_limit = limit;

if (prealloc) {
/* Allocate everything in a big chunk with malloc */
mem_base = malloc(mem_limit);
if (mem_base != NULL) {
mem_current = mem_base;
mem_avail = mem_limit;
} else {
fprintf(stderr, "Warning: Failed to allocate requested memory in"
" one large chunk.\nWill allocate in smaller chunks\n");
}
}

memset(slabclass, 0, sizeof(slabclass));

while (++i < MAX_NUMBER_OF_SLAB_CLASSES-1 && size <= settings.item_size_max / factor) {
/* Make sure items are always n-byte aligned */
if (size % CHUNK_ALIGN_BYTES)
size += CHUNK_ALIGN_BYTES - (size % CHUNK_ALIGN_BYTES);

slabclass[i].size = size;
slabclass[i].perslab = settings.item_size_max / slabclass[i].size;
size *= factor;
if (settings.verbose > 1) {
fprintf(stderr, "slab class %3d: chunk size %9u perslab %7u\n",
i, slabclass[i].size, slabclass[i].perslab);
}
}

power_largest = i;
slabclass[power_largest].size = settings.item_size_max;
slabclass[power_largest].perslab = 1;
if (settings.verbose > 1) {
fprintf(stderr, "slab class %3d: chunk size %9u perslab %7u\n",
i, slabclass[i].size, slabclass[i].perslab);
}

/* for the test suite: faking of how much we've already malloc'd */
{
char *t_initial_malloc = getenv("T_MEMD_INITIAL_MALLOC");
if (t_initial_malloc) {
mem_malloced = (size_t)atol(t_initial_malloc);
}

}

if (prealloc) {
slabs_preallocate(power_largest);
}
}

slab size 초기화는 다음과 같이 이루어진다. 이 의미는 size가 96까지를 다루는 slabclass
는 그 안에 10922개의 아이템을 저장할 수 있다는 의미이다.

size: 96 perslab: 10922
size: 120 perslab: 8738
size: 152 perslab: 6898
size: 192 perslab: 5461
size: 240 perslab: 4369
size: 304 perslab: 3449
size: 384 perslab: 2730
size: 480 perslab: 2184
size: 600 perslab: 1747
size: 752 perslab: 1394
size: 944 perslab: 1110
size: 1184 perslab: 885
size: 1480 perslab: 708
size: 1856 perslab: 564
size: 2320 perslab: 451
size: 2904 perslab: 361
size: 3632 perslab: 288
size: 4544 perslab: 230
size: 5680 perslab: 184
size: 7104 perslab: 147
size: 8880 perslab: 118
size: 11104 perslab: 94
size: 13880 perslab: 75
size: 17352 perslab: 60
size: 21696 perslab: 48
size: 27120 perslab: 38
size: 33904 perslab: 30
size: 42384 perslab: 24
size: 52984 perslab: 19
size: 66232 perslab: 15
size: 82792 perslab: 12
size: 103496 perslab: 10
size: 129376 perslab: 8
size: 161720 perslab: 6
size: 202152 perslab: 5
size: 252696 perslab: 4
size: 315872 perslab: 3
size: 394840 perslab: 2
size: 493552 perslab: 2
size: 616944 perslab: 1
size: 771184 perslab: 1

이렇게 만들어진 slabclass 는 slabs_clsid 를 통해서 접근 할 수 있다. 인덱스를
하나씩 증가하면 적절한 크기의 slabclass를 선택한다.

#!c
unsigned int slabs_clsid(const size_t size) {
int res = POWER_SMALLEST;

if (size == 0)
return 0;
while (size > slabclass[res].size)
if (res++ == power_largest) /* won't fit in the biggest slab */
return 0;
return res;
}

slabclass는 item 과도 관계가 있다. 실제 slab_alloc 은 item_alloc 이 호출될때
호출되게 된다.

#!c
typedef unsigned int rel_time_t;

typedef struct _stritem {
/* Protected by LRU locks */
struct _stritem *next;
struct _stritem *prev;
/* Rest are protected by an item lock */
struct _stritem *h_next; /* hash chain next */
rel_time_t time; /* least recent access */
rel_time_t exptime; /* expire time */
int nbytes; /* size of data */
unsigned short refcount;
uint8_t nsuffix; /* length of flags-and-length string */
uint8_t it_flags; /* ITEM_* above */
uint8_t slabs_clsid;/* which slab class we're in */
uint8_t nkey; /* key length, w/terminating null and padding */
/* this odd type prevents type-punning issues when we do
* the little shuffle to save space when not using CAS. */
union {
uint64_t cas;
char end;
} data[];
/* if it_flags & ITEM_CAS we have 8 bytes CAS */
/* then null-terminated key */
/* then " flags length\r\n" (no terminating null) */
/* then data with terminating \r\n (no terminating null; it's binary!) */
} item;

최초 item 할당시에 당연히 slab 도 없기 때문에, do_slabs_newslab()를 호출하게 된다.
SLAB_GLOBAL_PAGE_POOL 는 Reassignment를 위한 것이므로 최초에는 slabclass만 존재하고
내부에 할당된 slab은 없음. get_page_from_global_pool() 에서도 데이터가 없으므로 실제
메모리 할당은 memory_allocate 에 의해서 이루어진다.

#!c
static int do_slabs_newslab(const unsigned int id) {
slabclass_t *p = &slabclass[id];
slabclass_t *g = &slabclass[SLAB_GLOBAL_PAGE_POOL];
int len = settings.slab_reassign ? settings.item_size_max
: p->size * p->perslab;
char *ptr;

if ((mem_limit && mem_malloced + len > mem_limit && p->slabs > 0
&& g->slabs == 0)) {
mem_limit_reached = true;
MEMCACHED_SLABS_SLABCLASS_ALLOCATE_FAILED(id);
return 0;
}

if ((grow_slab_list(id) == 0) ||
(((ptr = get_page_from_global_pool()) == NULL) &&
((ptr = memory_allocate((size_t)len)) == 0))) {

MEMCACHED_SLABS_SLABCLASS_ALLOCATE_FAILED(id);
return 0;
}

memset(ptr, 0, (size_t)len);
split_slab_page_into_freelist(ptr, id);

p->slab_list[p->slabs++] = ptr;
MEMCACHED_SLABS_SLABCLASS_ALLOCATE(id);

return 1;
}

할당된 메모리 ptr은 split_slab_page_into_freelist 에 의해서 초기화 된다.

#!c
static void split_slab_page_into_freelist(char *ptr, const unsigned int id) {
slabclass_t *p = &slabclass[id];
int x;
for (x = 0; x < p->perslab; x++) {
do_slabs_free(ptr, 0, id);
ptr += p->size;
}
}

p->size는 slabclass item의 크기이므로 그 값만큼 증가하면서 do_slabs_free 를 호출해서
item을 저장할 수 있는 형태로 정보를 저장한다. ptr 이 p->size 만큼 계속 증가하는데 주목하자.

#!c
static void do_slabs_free(void *ptr, const size_t size, unsigned int id) {
slabclass_t *p;
item *it;

assert(id >= POWER_SMALLEST && id <= power_largest);
if (id < POWER_SMALLEST || id > power_largest)
return;

MEMCACHED_SLABS_FREE(size, id, ptr);
p = &slabclass[id];

it = (item *)ptr;
it->it_flags = ITEM_SLABBED;
it->slabs_clsid = 0;
it->prev = 0;
it->next = p->slots;
if (it->next) it->next->prev = it;
p->slots = it;

p->sl_curr++;
p->requested -= size;
return;
}

p는 slabclass, it는 해당 ptr의 메모리 영역이다. p->slots는 기존에 할당된 item list를
가리키는 포인터이다. 즉 it->next 로 현재의 it->next 에 기존의 item list를 저장하고,
기존의 item list의 prev는 새롭게 추가될 it가 된다. 그리고 다시 p->slots은 it를 가리키게
되므로, linked list로 slabclass에 할당되는 모든 item들이 double linked list 형식으로
저장되게 된다.(실제로 메모리 할당은 item_max_size 형태로 할당되지만… 논리적으로 이어진다.)

[입 개발] Redis 지표 중에 instantaneous 정보들은 어떤걸까?

Redis 에서 info를 해보면 여러가지 정보들이 있습니다.

# Stats
total_connections_received:1
total_commands_processed:0
instantaneous_ops_per_sec:0
total_net_input_bytes:6
total_net_output_bytes:0
total_net_repl_input_bytes:0
total_net_repl_output_bytes:0
instantaneous_input_kbps:0.00
instantaneous_output_kbps:0.00
instantaneous_input_repl_kbps:0.00
instantaneous_output_repl_kbps:0.00
rejected_connections:0
sync_full:0
sync_partial_ok:0
sync_partial_err:0
expired_keys:0
expired_stale_perc:0.00
expired_time_cap_reached_count:0
expire_cycle_cpu_milliseconds:0
evicted_keys:0
evicted_clients:0
total_eviction_exceeded_time:0
current_eviction_exceeded_time:0
keyspace_hits:0
keyspace_misses:0
pubsub_channels:0
pubsub_patterns:0
pubsubshard_channels:0
latest_fork_usec:0
total_forks:0
migrate_cached_sockets:0
slave_expires_tracked_keys:0
active_defrag_hits:0
active_defrag_misses:0
active_defrag_key_hits:0
active_defrag_key_misses:0
total_active_defrag_time:0
current_active_defrag_time:0
tracking_total_keys:0
tracking_total_items:0
tracking_total_prefixes:0
unexpected_error_replies:0
total_error_replies:0
dump_payload_sanitizations:0
total_reads_processed:1
total_writes_processed:0
io_threaded_reads_processed:0
io_threaded_writes_processed:0
reply_buffer_shrinks:1
reply_buffer_expands:0

위의 정보들을 보면, 여러가지 정보가 있지만, instantaneous_ops_per_sec 이라는 항목처럼 instantaneous_ 라는 접두어로 시작하는 항목들이 있습니다. instantaneous 의 번역을 보면 즉각적인 내용이 보입니다. 즉각적이라 현재 정보만 보여주는 걸까요?

해당 내용을 보게 된 것은 Redis 이전을 고민하는데 네트웍 사용량이 얼마나 되는지를 분석해보고 싶어하는 분이 계셨기 때문입니다. 아래 처럼 input_kbps, output_kbps 등이 보이는데 해당 값이 맞지 않는다는 느낌이었던거죠.

instantaneous_input_kbps:0.00
instantaneous_output_kbps:0.00
instantaneous_input_repl_kbps:0.00
instantaneous_output_repl_kbps:0.00

구글링 해보면 제대로 맞지 않는다는 질문도 있습니다. https://groups.google.com/g/redis-db/c/Bv0KO1aSO6k?fbclid=IwAR3RvskpiNMB5ITcc5s6hD8KpadLUSDjYVwyGjVeqnn5xgabJGLS3Np2wM8&pli=1

hi :
    I want to monitor redis traffic through redis info.But I found instantaneous_input_kbps and instantaneous_output_kbps are not the true traffic about redis.
Instantaneous_input_kbps is smaller than the true traffic.

Is there something wrong?

######################sending data to redis
#network traffic
Time              -------------traffic------------ 
Time               bytin  bytout   pktin  pktout   
26/02/16-16:30:25   1.2M  310.0K    5.5K    3.9K   
26/02/16-16:30:27   1.2M  314.2K    5.5K    3.9K   
26/02/16-16:30:29   1.1M  321.5K    5.4K    3.9K   
26/02/16-16:30:31   1.2M  313.7K    5.5K    3.9K   
26/02/16-16:30:33   1.3M  321.0K    5.6K    4.0K   
26/02/16-16:30:35   1.2M  308.2K    5.3K    3.8K   
26/02/16-16:30:37   1.2M  308.9K    5.4K    3.8K   


####redis info
instantaneous_input_kbps:678.29
instantaneous_output_kbps:4.85
instantaneous_input_kbps:666.07
instantaneous_output_kbps:4.79
instantaneous_input_kbps:676.34
instantaneous_output_kbps:4.85
instantaneous_input_kbps:675.49
instantaneous_output_kbps:4.84
instantaneous_input_kbps:671.42
instantaneous_output_kbps:4.82
instantaneous_input_kbps:667.50
instantaneous_output_kbps:4.80
instantaneous_input_kbps:666.37
instantaneous_output_kbps:4.79

#####################stop sending data
Time              -------------traffic------------ 
Time               bytin  bytout   pktin  pktout   
26/02/16-16:34:59 157.5K   22.7K  226.00  228.00   
26/02/16-16:35:01 159.8K   24.9K  235.00  237.00   
26/02/16-16:35:03 208.9K  134.5K  901.00  947.00   
26/02/16-16:35:05 133.3K   61.1K  285.00  269.00   
^C

##redis info
instantaneous_output_kbps:1.17
instantaneous_input_kbps:0.01
instantaneous_output_kbps:1.17
instantaneous_input_kbps:0.01
instantaneous_output_kbps:1.16
instantaneous_input_kbps:0.01
instantaneous_output_kbps:1.17
instantaneous_input_kbps:0.01
instantaneous_output_kbps:1.17
instantaneous_input_kbps:0.01
instantaneous_output_kbps:1.17

일단 결론부터 말하자면, 앞에 instantaneous 가 붙는 값들은 정확한 값이 아니라 샘플링을 통해서 몇개씩 저장해서 확인하는 값입니다. 계속 기록되고 있는게 아니라, 순간 순간의 값들을 샘플링해서 이에 대한 정보를 보여줍니다.

일단 해당 정보를 가져오는 것을 확인해 봅시다.

info 함수를 만드는 곳을 보면 다음과 같이 가져오고 있습니다. 해당 정보를 가져오는 곳을 보면 아래와 같이 getInstantaneousMetric 함수를 통해서 정보를 가져오고 있습니다.

getInstantaneousMetric(STATS_METRIC_COMMAND)

그리고 해당 함수를 가보면 다음과 같이 간단하게 구현되어 있습니다.

/* Return the mean of all the samples. */
long long getInstantaneousMetric(int metric) {
    int j;
    long long sum = 0;

    for (j = 0; j < STATS_METRIC_SAMPLES; j++)
        sum += server.inst_metric[metric].samples[j];
    return sum / STATS_METRIC_SAMPLES;
}

코드를 보면 server.inst_metric[metric]./samples[j] 값을 가져와서 sum을 한 다음에 평균을 만들어서 전달하고 있습니다. 이제 그럼 저 server.inst_metric 값을 저장하는 곳을 확인해 봅시다.

크게 이동 하지 않고 바로 위에 trackInstantaneousMetric 라는 함수가 있습니다.

/* Add a sample to the operations per second array of samples. */
void trackInstantaneousMetric(int metric, long long current_reading) {
    long long now = mstime();
    long long t = now - server.inst_metric[metric].last_sample_time;
    long long ops = current_reading -
                    server.inst_metric[metric].last_sample_count;
    long long ops_sec;

    ops_sec = t > 0 ? (ops*1000/t) : 0;

    server.inst_metric[metric].samples[server.inst_metric[metric].idx] =
        ops_sec;
    server.inst_metric[metric].idx++;
    server.inst_metric[metric].idx %= STATS_METRIC_SAMPLES;
    server.inst_metric[metric].last_sample_time = now;
    server.inst_metric[metric].last_sample_count = current_reading;
}

redis 는 매 Tick 마다 serverCron 이라는 작업을 호출합니다. 그리고 그 안에 매 100ms 마다. 아래와 같이 sampling 정보를 저장합니다.

    run_with_period(100) {
        long long stat_net_input_bytes, stat_net_output_bytes;
        long long stat_net_repl_input_bytes, stat_net_repl_output_bytes;
        atomicGet(server.stat_net_input_bytes, stat_net_input_bytes);
        atomicGet(server.stat_net_output_bytes, stat_net_output_bytes);
        atomicGet(server.stat_net_repl_input_bytes, stat_net_repl_input_bytes);
        atomicGet(server.stat_net_repl_output_bytes, stat_net_repl_output_bytes);

        trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands);
        trackInstantaneousMetric(STATS_METRIC_NET_INPUT,
                stat_net_input_bytes + stat_net_repl_input_bytes);
        trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT,
                stat_net_output_bytes + stat_net_repl_output_bytes);
        trackInstantaneousMetric(STATS_METRIC_NET_INPUT_REPLICATION,
                                 stat_net_repl_input_bytes);
        trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT_REPLICATION,
                                 stat_net_repl_output_bytes);
    }

여기서 볼 것은 Sampling 방식입니다. network에서 packet 을 읽거나 쓸 때마다 stat_net_input_bytes 나 stat_net_output_bytes 값이 증가하게 되고, 해당 시점을 값을 100ms 마다 저장해서 이 값의 차이를 Sample 로 저장하게 됩니다.

예를 들어, stat_net_input_bytes 는 readQueryFromClient 함수안에서 다음과 같이 패킷을 읽을 때 마다 증가하게 됩니다.

    c->lastinteraction = server.unixtime;
    if (c->flags & CLIENT_MASTER) {
        c->read_reploff += nread;
        atomicIncr(server.stat_net_repl_input_bytes, nread);
    } else {
        atomicIncr(server.stat_net_input_bytes, nread);
    }

오늘은 간단히 redis 에서 샘플링 데이터를 어떻게 저장하고 보여주는지를 분석해 봤습니다.

[입 개발] Spring 의 CacheEvict 에서 allEntries=true 는 Redis에서 어떻게 동작하게 될까?

최근에 우연히 아는 분의 글을 보다가 갑자기 궁금함이 생겼습니다. 네이버 클라우드의 Redis를 사용중인데 CacheEvict 에서 allEntries = true 를 줬을 경우, 동작이 실패한다는 것이었습니다. Spring에서 Cache를 쉽게 제공하는 방법중에 @Cacheable, @CachePut @CacheEvict 를 제공합니다. (물론 저는 이걸 잘 모르지만…) 그 중에 Cache를 정리할 때 사용하는 CacheEvict 은 allEntries 라는 옵션이 있는데 이게 true가 되면 어떻게 동작할까? 라는 의문이었습니다. (일단 실무에서는 사용하지 않는 것이 좋은 옵션으로 보입니다.)

여기서 먼저 Evict 에 대해서는 Cache 자체를 지운다는 의미보다는, 메모리가 부족해서 더 이상 캐시할 수 없을 때, 메모리를 확보하기 위해서, 기존 캐시된 데이터를 지우는 것을 의미합니다.

그런데 저도 저 옵션을 본 기억도 없고(정말인가~~~~) 저게 뜨면 어떻게 되지라는 게 궁금해서 일단 잠시 살펴보게 되었습니다. 먼저 CacheEvict.java 는 다음과 같이 구성되어 있습니다. 일단 주석은 다 지웁니다.

package org.springframework.cache.annotation;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import org.springframework.core.annotation.AliasFor;

@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface CacheEvict {
    @AliasFor("cacheNames")
    String[] value() default {};

    @AliasFor("value")
    String[] cacheNames() default {};

    String key() default "";

    String keyGenerator() default "";

    String cacheManager() default "";

    String cacheResolver() default "";    

    String condition() default "";

    boolean allEntries() default false;

    boolean beforeInvocation() default false;
}

보시면 아시겠지만 allEntries() 는 boolean 형태의 정보를 저장하고 있습니다. 그런데 사실 우리가 알아보고자 하는 부분은 Redis에서 어떻게 동작하는 가 이므로, spring-data-redis 에서 어떻게 동작하는지를 알아야 합니다.

그런데 spring-data-redis에서는 allEntries 라는 정보를 확인하는 부분이 없습니다. 이 이야기는 결국 뭔가 다른 변수로 전환된다는 것이죠. 그래서 spring-framework 에서 allEntries 를 검색해봅니다. 다른 부분은 전부 사용하는 곳인데 다음과 같은 코드가 발견이 됩니다.

spring-context/src/main/java/org/springframework/cache/annotation/SpringCacheAnnotationParser.java:151:         builder.setCacheWide(cacheEvict.allEntries());

이제 SpringCacheAnnotationParser.java 를 살펴봅니다. 다른 부분 보다는 setCacheWide 라는 부분이 보이는 군요. allEntries 가 cacheWide 라는 변수로 변환되는 걸 알 수 있습니다.

    private CacheEvictOperation parseEvictAnnotation(
            AnnotatedElement ae, DefaultCacheConfig defaultConfig, CacheEvict cacheEvict) {

        CacheEvictOperation.Builder builder = new CacheEvictOperation.Builder();

        builder.setName(ae.toString());
        builder.setCacheNames(cacheEvict.cacheNames());
        builder.setCondition(cacheEvict.condition());
        builder.setKey(cacheEvict.key());
        builder.setKeyGenerator(cacheEvict.keyGenerator());
        builder.setCacheManager(cacheEvict.cacheManager());
        builder.setCacheResolver(cacheEvict.cacheResolver());
        builder.setCacheWide(cacheEvict.allEntries());
        builder.setBeforeInvocation(cacheEvict.beforeInvocation());

        defaultConfig.applyDefault(builder);
        CacheEvictOperation op = builder.build();
        validateCacheOperation(ae, op);

        return op;
    }

이제 cacheWide로 한번 검색해봅니다. spring-context/src/main/java/org/springframework/cache/interceptor/CacheEvictOperation.java 라는 파일에서 다음과 같은 코드를 사용합니다. 아래 코드를 보면 이제 내부에서는 아마도 isCacheWide 라는 함수로 사용이 될꺼라고 예상이 됩니다.

    public boolean isCacheWide() {
        return this.cacheWide;
    }

isCacheWide() 함수를 호출하는 곳이 한 군데 밖에 없다는 것을 확인할 수 있습니다.

spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java 를 살펴봅니다. performCacheEvict() 라는 메서드에서 isCacheWide 면 doClear 를, 그렇지 않으면 doEvict 을 호출합니다.

    private void performCacheEvict(
            CacheOperationContext context, CacheEvictOperation operation, @Nullable Object result) {

        Object key = null;
        for (Cache cache : context.getCaches()) {
            if (operation.isCacheWide()) {
                logInvalidating(context, operation, null);
                doClear(cache, operation.isBeforeInvocation());
            }
            else {
                if (key == null) {
                    key = generateKey(context, result);
                }
                logInvalidating(context, operation, key);
                doEvict(cache, key, operation.isBeforeInvocation());
            }
        }
    }

doClear() 함수는 spring-context/src/main/java/org/springframework/cache/interceptor/AbstractCacheInvoker.java 에 구현되어 있습니다.

    /**
     * Execute {@link Cache#evict(Object)}/{@link Cache#evictIfPresent(Object)} on the
     * specified {@link Cache} and invoke the error handler if an exception occurs.
     */
    protected void doEvict(Cache cache, Object key, boolean immediate) {
        try {
            if (immediate) {
                cache.evictIfPresent(key);
            }
            else {
                cache.evict(key);
            }
        }
        catch (RuntimeException ex) {
            getErrorHandler().handleCacheEvictError(ex, cache, key);
        }
    }

    /**
     * Execute {@link Cache#clear()} on the specified {@link Cache} and
     * invoke the error handler if an exception occurs.
     */
    protected void doClear(Cache cache, boolean immediate) {
        try {
            if (immediate) {
                cache.invalidate();
            }
            else {
                cache.clear();
            }
        }
        catch (RuntimeException ex) {
            getErrorHandler().handleCacheClearError(ex, cache);
        }
    }

이제 호출되는 것은 Cache 구조체라는 것을 알 수 있습니다. cache.invalidate() 거나 cache.clear() 거나…(아마도 cache.clear()겠죠? – 반전은 실제로는 둘다 cache.clear() 입니다.)

그런데 Cache 는 그냥 인터페이스 입니다.

public interface Cache {
    String getName();

    Object getNativeCache();

    @Nullable
    ValueWrapper get(Object key);

    @Nullable
    <T> T get(Object key, @Nullable Class<T> type);

    @Nullable
    <T> T get(Object key, Callable<T> valueLoader);

    void put(Object key, @Nullable Object value);

    @Nullable
    default ValueWrapper putIfAbsent(Object key, @Nullable Object value) {
        ValueWrapper existingValue = get(key);
        if (existingValue == null) {
            put(key, value);
        }
        return existingValue;
    }

    void evict(Object key);

    default boolean evictIfPresent(Object key) {
        evict(key);
        return false;
    }

    void clear();

    interface ValueWrapper {
        @Nullable
        Object get();
    }


    @SuppressWarnings("serial")
    class ValueRetrievalException extends RuntimeException {

        @Nullable
        private final Object key;

        public ValueRetrievalException(@Nullable Object key, Callable<?> loader, Throwable ex) {
            super(String.format("Value for key '%s' could not be loaded using '%s'", key, loader), ex);
            this.key = key;
        }

        @Nullable
        public Object getKey() {
            return this.key;
        }
    }

}

검색해보면 spring-framework 안에도 Cache를 구현한 클래스가 몇 개 있습니다.

  • AbstractValueAdaptingCache
  • NoOpCache
  • TransactionAwareCacheDecorator

spring-data-redis 코드를 보면 바로 Cache Interface 를 구현한 클래스는 없고 발견된 RedisCache 가 AbstractValueAdaptingCache 를 확장해서 구현하고 있습니다.

저는 역으로 ./src/main/java/org/springframework/data/redis/cache/RedisCache.java 라는 파일을 찾아서(이름에 Cache가 있는 파일을) 찾아서 확인을 해보니 AbstractValueAdaptingCache 를 사용하고 있는 것을 볼 수 있습니다.

public class RedisCache extends AbstractValueAdaptingCache {

    private static final byte[] BINARY_NULL_VALUE = RedisSerializer.java().serialize(NullValue.INSTANCE);

    private final String name;
    private final RedisCacheWriter cacheWriter;
    private final RedisCacheConfiguration cacheConfig;
    private final ConversionService conversionService;

    protected RedisCache(String name, RedisCacheWriter cacheWriter, RedisCacheConfiguration cacheConfig) {

        super(cacheConfig.getAllowCacheNullValues());

        Assert.notNull(name, "Name must not be null!");
        Assert.notNull(cacheWriter, "CacheWriter must not be null!");
        Assert.notNull(cacheConfig, "CacheConfig must not be null!");

        this.name = name;
        this.cacheWriter = cacheWriter;
        this.cacheConfig = cacheConfig;
        this.conversionService = cacheConfig.getConversionService();
    }
    ......
    @Override
    public void clear() {

        byte[] pattern = conversionService.convert(createCacheKey("*"), byte[].class);
        cacheWriter.clean(name, pattern);
    }
    ......
}

CacheWriter 의 clean을 호출 하는 것을 볼 수 있습니다. 다만 위의 pattern 을 잘 기억해야 합니다. 결국 “*” 를 가져옵니다. CacheWriter 는 RedisCacheWriter 입니다.

RedisCacheWriter 역시 interface 입니다.

public interface RedisCacheWriter extends CacheStatisticsProvider {
    ......
    void clean(String name, byte[] pattern);
    ......
}

RedisCache 에서 RedisCacheWriter 는 생성되면서 전달 받게 됩니다. 해당 클래스를 구현할 클래스를 찾아보도록 하겠습니다. Spring-data-redis 에서는 RedisCacheWriter 를 구현한 클래스는 DefaultRedisCacheWriter 하나 뿐입니다.

class DefaultRedisCacheWriter implements RedisCacheWriter {

    private final RedisConnectionFactory connectionFactory;
    private final Duration sleepTime;
    private final CacheStatisticsCollector statistics;
    private final BatchStrategy batchStrategy;

    ......
    @Override
    public void clean(String name, byte[] pattern) {

        Assert.notNull(name, "Name must not be null!");
        Assert.notNull(pattern, "Pattern must not be null!");

        execute(name, connection -> {

            boolean wasLocked = false;

            try {

                if (isLockingCacheWriter()) {
                    doLock(name, connection);
                    wasLocked = true;
                }

                long deleteCount = batchStrategy.cleanCache(connection, name, pattern);
                while (deleteCount > Integer.MAX_VALUE) {
                    statistics.incDeletesBy(name, Integer.MAX_VALUE);
                    deleteCount -= Integer.MAX_VALUE;
                }
                statistics.incDeletesBy(name, (int) deleteCount);

            } finally {

                if (wasLocked && isLockingCacheWriter()) {
                    doUnlock(name, connection);
                }
            }

            return "OK";
        });
    }
    ......
}

이제 거의 다 온거 같습니다. batchStrategy.cleanCache() 를 호출하고 있습니다. BatchStrategy 역시 interface 입니다. ./src/main/java/org/springframework/data/redis/cache/BatchStrategy.java 에서 볼 수 있습니다.

public interface BatchStrategy {

    /**
     * Remove all keys following the given pattern.
     *
     * @param connection the connection to use. Must not be {@literal null}.
     * @param name The cache name. Must not be {@literal null}.
     * @param pattern The pattern for the keys to remove. Must not be {@literal null}.
     * @return number of removed keys.
     */
    long cleanCache(RedisConnection connection, String name, byte[] pattern);

}

이제 BatchStrategy를 구현한 클래스를 살펴봅시다. 모두 src/main/java/org/springframework/data/redis/cache/BatchStrategies.java 에 존재하고 있습니다. 여기에는 두 개의 구현체가 존재하는 데 첫번째가 Keys 입니다. 보시면 cleanCache에서 아까 pattern (여기서는 “*” 이 전달되었습니다.) keys 명령을 통해서 패턴을 모두 가져오고 이걸 connection의 del 로 삭제하게 됩니다. 즉 Keys 를 쓰면 전부 지워집니다.

    static class Keys implements BatchStrategy {

        static Keys INSTANCE = new Keys();

        @Override
        public long cleanCache(RedisConnection connection, String name, byte[] pattern) {

            byte[][] keys = Optional.ofNullable(connection.keys(pattern)).orElse(Collections.emptySet())
                    .toArray(new byte[0][]);

            if (keys.length > 0) {
                connection.del(keys);
            }

            return keys.length;
        }
    }

두 번째 클래스는 Scan 입니다. Scan 명령을 통해서 Key를 전부 가져와서 다시 connection 의 del 을 통해서 다 지우게 됩니다.

    static class Scan implements BatchStrategy {

        private final int batchSize;

        Scan(int batchSize) {
            this.batchSize = batchSize;
        }

        @Override
        public long cleanCache(RedisConnection connection, String name, byte[] pattern) {

            Cursor<byte[]> cursor = connection.scan(ScanOptions.scanOptions().count(batchSize).match(pattern).build());

            long count = 0;

            PartitionIterator<byte[]> partitions = new PartitionIterator<>(cursor, batchSize);
            while (partitions.hasNext()) {

                List<byte[]> keys = partitions.next();
                count += keys.size();

                if (keys.size() > 0) {
                    connection.del(keys.toArray(new byte[0][]));
                }
            }

            return count;
        }
    }

네이버 클라우드 Redis 설명을 보면 https://guide.ncloud-docs.com/docs/ko/clouddbforredis-spec 다음과 같이 flushdb, flushall, keys 가 막혀있습니다.

일단 코드를 살펴본 대로면 FlushAll, FlushDB의 이슈는 아니므로 KEYS가 막혀있는 것이 문제일 수 있습니다. 그렇다면 어떻게 회피하면 될까요? 아까 BatchStrategy 가 KEYS와 Scan 두 가지 였습니다. 만약에 KEYS가 안된다면 Scan 을 쓰면 되지 않을까요?(기본적으로 Scan이 Default 값이긴 합니다.)

결국 해당 설정은 RedisManager 를 생성할 때 설정할 수 있습니다.

  @Bean
  public CacheManager redisCacheManager() {
    RedisCacheConfiguration redisCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig()      .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer())) .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()));
    
    RedisCacheManager redisCacheManager = RedisCacheManager.RedisCacheManagerBuilder.fromConnectionFactory(connectionFactory).cacheDefaults(redisCacheConfiguration).build();
    return redisCacheManager;
  }

보통 위와 같은 형태로 RedisCacheManager 를 구현하게 되는데, 코드를 보면 위의 RedisCacheManager.RedisCacheManagerBuilder.fromConnectionFactory() 함수는 다음과 같이 구현되어 있습니다.(src/main/java/org/springframework/data/redis/cache/RedisCacheManager.java)

        public static RedisCacheManagerBuilder fromConnectionFactory(RedisConnectionFactory connectionFactory) {

            Assert.notNull(connectionFactory, "ConnectionFactory must not be null!");

            return new RedisCacheManagerBuilder(RedisCacheWriter.nonLockingRedisCacheWriter(connectionFactory));
        }

보시면 RedisCacheWriter.nonLockingRedisCacheWriter 를 그냥 호출하고 있습니다. 이제 해당 코드를 살펴봅시다. src/main/java/org/springframework/data/redis/cache/RedisCacheWriter.java 의 nonLockingRedisCacheWriter 를 보면 그렇습니다. 다음과 같이 keys가 그냥 default 네요. 이래서 KEYS 명령이 막혀 있어서 동작하지 않는 것입니다.

    static RedisCacheWriter nonLockingRedisCacheWriter(RedisConnectionFactory connectionFactory) {
        return nonLockingRedisCacheWriter(connectionFactory, BatchStrategies.keys());
    }

그럼 이제 이걸 Scan으로만 바꿔주면 동작하겠네요. 다음과 같이 수정하면 됩니다.

RedisCacheManager redisCacheManager = 
  RedisCacheManager.build(RedisCacheWriter.nonLockingRedisCacheWriter(
  connectionFactory, 
  BatchStrategies.scan(1000))
).cacheDefaults(redisCacheConfiguration).build();
return redisCacheManager;

대략적으로 이런 흐름으로 관련 기능들을 쉽게 분석할 수 있습니다.

다만 결론부터 말하면 이건 네이버 클라우드 Redis 서비스의 문제가 아니라 Spring-data-redis 가 Compatibility 를 보장하기 위해서 KEYS를 기본으로 사용하고 있는 게 문제입니다. 실제로 https://github.com/spring-projects/spring-data-redis/pull/532 이런 패치도 올라왔지만, 거부 당했네요.

(이유가, 어차피 데이터가 많으면 DEL 여러개 하다가 Timeout 날꺼야 라는 이유라…) 오래된 프로젝트는 당연히 Compatibility를 지원해야 하니… 안좋은 점이 남아있는…) 그래서 이 부분은 Spring-data-redis의 잘못입니다.

[입 개발] Redis 7.x 에서의 ShardedPubSub

오래간만에 Redis 7에서 도입될 Sharded PubSub 에 대해서 좀 분석을 해보게 되었습니다. Sharded PubSub 은 기존 Redis Cluster 에서의 PubSub 의 단점을 해결하기 위해서 도입된 기술입니다. 먼저 Sharded Pub/Sub을 이야기하기 전에 Redis Cluster에서의 PubSub을 알아보고 단점을 먼저 확인해봅시다.

기본적으로 Redis Cluster 에서의 Pub/Sub은 모든 노드에 데이터를 뿌리게 됩니다. 그래서 Client #1이 한대의 서버에 publish를 하면 해당 노드는 모든 노드(Primary + Replica)들에 publish를 broadcast 하게 되고, 어떤 노드에 subscribe를 하든 해당 broadcast 되는 메시지를 받을 수 있습니다.

이 broadcast 는 클라이언트가 Primary에 subscribe 를 하든 Replica 에 subscribe 를 하든 상관없습니다. 물론 publish 역시 상관이 없습니다.

그리고 subscribe 하는 클라이언트는 해당 서버에 접속을 하게 되죠. 일단 Redis에서의 pub/sub의 메시지 전달은 at most once 입니다. 즉, 최대 한번, 또는 못받을 수가 있는 구조죠.

그럼 뭐가 성능상 문제냐? 라고 물으신다면, 항상 broadcast 하는 것이 문제입니다. 안그래도 Redis Pub/Sub 자체가 pattern 이라는 걸 지원해서, 모든 채널을 확인해야 하는 이슈가 있기 때문에, 보통 메시지 전달은 채널수 + 채널에 붙은 클라이언트 수 만큼 루프를 돌아야 합니다. 항상 모든 서버에 일단 broadcast 해야 한다는게 그래서 전체 성능에 영향을 미칠 수 가 있는거죠.(Redis는 Single Threaded 라 항상 오래 동작하는 기능은 조심해야 하는…), 클러스터가 커지면 커질수록 문제가 더 발생할 수 있습니다.

그럼 Sharded Pub/Sub은 뭐냐? 말 그대로입니다. Sharded 해서 특정 노드에만 해당 채널의 정보를 publish 하도록 하자가 됩니다. 이 구조는 보통 다음과 같습니다.

이는 기존 Redis Cluster 의 특성을 그대로 이용합니다. 일반적으로 key를 crc16으로 Hash 해서 해당 키가 속한 slot 을 처리하는 서버로만 메시지를 전달하게 되는데 (-MOVED 를 이용합니다.), 이 특성을 이용해서 Pub/Sub도 하나의 Shard 군에서만 처리하자 입니다.(제가 노드가 아니라 Shard 군이라고 한 것에 주의하세요.)

먼저 코드부터 확인해보도록 하겠습니다. pubsubtype 이라는 구조체가 ShardedPubSub을 지원하기 위해서 도입이 되었습니다.

typedef struct pubsubtype {
    int shard;
    dict *(*clientPubSubChannels)(client*);
    int (*subscriptionCount)(client*);
    dict **serverPubSubChannels;
    robj **subscribeMsg;
    robj **unsubscribeMsg;
}pubsubtype;

위의 pubsubtype 으로 두 개의 타입이 생성되어 있음. 기본인 pubSubType 과 ShardedPubSub을 위한 pubSubShardType 은 다음과 같습니다.

pubsubtype pubSubType = {
    .shard = 0,
    .clientPubSubChannels = getClientPubSubChannels,
    .subscriptionCount = clientSubscriptionsCount,
    .serverPubSubChannels = &server.pubsub_channels,
    .subscribeMsg = &shared.subscribebulk,
    .unsubscribeMsg = &shared.unsubscribebulk,
};

/*
 * Pub/Sub type for shard level channels bounded to a slot.
 */
pubsubtype pubSubShardType = {
    .shard = 1,
    .clientPubSubChannels = getClientPubSubShardChannels,
    .subscriptionCount = clientShardSubscriptionsCount,
    .serverPubSubChannels = &server.pubsubshard_channels,
    .subscribeMsg = &shared.ssubscribebulk,
    .unsubscribeMsg = &shared.sunsubscribebulk
};

이를 지원하기 위해서 redisServer 구조체에도 sharded channels을 구독하는 client의 정보를 저장하고 있는 변수가 추가됨.

dict *pubsubshard_channels;  /* Map channels to list of subscribed clients */
int cluster_allow_pubsubshard_when_down; /* Is pubsubshard allowed when the cluster is down, doesn't affect pubsub global. */

client 구조체에도 이를 지원하기 위한 변수가 추가됨

dict *pubsubshard_channels;  /* shard level channels a client is interested in (SSUBSCRIBE) */

여기서 주의해서 봐야 하는 것은 아래처럼 pubsub_channels 과 pubsubshard_channels 가 keylistDictType 으로 구성되어 있다는 것입니다.

server.pubsub_channels = dictCreate(&keylistDictType);
server.pubsub_patterns = dictCreate(&keylistDictType);
server.pubsubshard_channels = dictCreate(&keylistDictType);

getClientPubSubChannels 와 getClientPubSubShardChannels 는 아래와 같이 서로 다른 변수를 전달함.

dict* getClientPubSubChannels(client *c) {
    return c->pubsub_channels;
}

dict* getClientPubSubShardChannels(client *c) {
    return c->pubsubshard_channels;
}

clientSubscriptionsCount 와 clientShardSubscriptionsCount 는 다음과 같다.

/* Return the number of channels + patterns a client is subscribed to. */
int clientSubscriptionsCount(client *c) {
    return dictSize(c->pubsub_channels) + listLength(c->pubsub_patterns);
}

/* Return the number of shard level channels a client is subscribed to. */
int clientShardSubscriptionsCount(client *c) {
    return dictSize(c->pubsubshard_channels);
}

먼저 subscribe 하는 코드를 살펴보면 다음과 같다. sharded pubsub 을 위해서 ssubscribe 라는 명령이 존재한다. 코드를 보면 cluster_enabled 상태에서는 해당 채널이 등록되어 있지 않으면 slotToChannelAdd 명령을 통해서 cluster 의 slot_to_channels 에 추가하게 됩니다.

/* SSUBSCRIBE channel [channel ...] */
void ssubscribeCommand(client *c) {
    if (c->flags & CLIENT_DENY_BLOCKING) {
        /* A client that has CLIENT_DENY_BLOCKING flag on
         * expect a reply per command and so can not execute subscribe. */
        addReplyError(c, "SSUBSCRIBE isn't allowed for a DENY BLOCKING client");
        return;
    }

    for (int j = 1; j < c->argc; j++) {
        /* A channel is only considered to be added, if a
         * subscriber exists for it. And if a subscriber
         * already exists the slotToChannel doesn't needs
         * to be incremented. */
        if (server.cluster_enabled &
            (dictFind(*pubSubShardType.serverPubSubChannels, c->argv[j]) == NULL)) {
            slotToChannelAdd(c->argv[j]->ptr);
        }
        pubsubSubscribeChannel(c, c->argv[j], pubSubShardType);
    }
    c->flags |= CLIENT_PUBSUB;
}

그리고

/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
 * 0 if the client was already subscribed to that channel. */
int pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) {
    dictEntry *de;
    list *clients = NULL;
    int retval = 0;

    /* Add the channel to the client -> channels hash table */
    if (dictAdd(type.clientPubSubChannels(c),channel,NULL) == DICT_OK) {
        retval = 1;
        incrRefCount(channel);
        /* Add the client to the channel -> list of clients hash table */
        de = dictFind(*type.serverPubSubChannels, channel);
        if (de == NULL) {
            clients = listCreate();
            dictAdd(*type.serverPubSubChannels, channel, clients);
            incrRefCount(channel);
        } else {
            clients = dictGetVal(de);
        }
        listAddNodeTail(clients,c);
    }
    /* Notify the client */
    addReplyPubsubSubscribed(c,channel,type);
    return retval;
}

publish 는 spublish 라는 전용 커맨드가 있습니다. publishCommand 는 clusterPropagatePublish 라는 모든 Shard에 데이터를 보내는 것과 달리 spublishCommand 는 clusterPropagatePublishShard 함수는 해당 shard 에만 메세지를 보내게 됩니다.(이 부분이 가장 큰 차이입니다.) – 기존의 Pub/Sub은 모든 노드에 데이터를 보내기 때문에 아무 노드에만 Subscribe 를 해도 메시지를 전달 받을 수 있었지만, Sharded Pub/Sub에서는 이게 불가능합니다.

/* SPUBLISH <channel> <message> */
void spublishCommand(client *c) {
    int receivers = pubsubPublishMessageInternal(c->argv[1], c->argv[2], pubSubShardType);
    if (server.cluster_enabled) {
        clusterPropagatePublishShard(c->argv[1], c->argv[2]);
    } else {
        forceCommandPropagation(c,PROPAGATE_REPL);
    }
    addReplyLongLong(c,receivers);
}

여기서 살짝 publishCommand 함수를 살펴보면(Sharded PubSub이 아닌 경우) 다음과 같습니다. 거의 동일합니다.

/* PUBLISH <channel> <message> */
void publishCommand(client *c) {
    if (server.sentinel_mode) {
        sentinelPublishCommand(c);
        return;
    }

    int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
    if (server.cluster_enabled)
        clusterPropagatePublish(c->argv[1],c->argv[2]);
    else
        forceCommandPropagation(c,PROPAGATE_REPL);
    addReplyLongLong(c,receivers);
}

pubsubPublishMessageInternal 는 세 번째 파라매터로 처음에 소개한 pubsubtype 을 받게 되고 코드는 다음과 같습니다. pattern 을 사용하는 방식은 여기 코드에서도 나오지만 shard type을 지원하지 않습니다. 이는 pattern형식은 전체 노드를 subscribe 하지 않고서는 지원할 수 없기 때문입니다. (패턴의 hash 값이 어떤 slot에 들어갈지를 알 수 없음.)

/*
 * Publish a message to all the subscribers.
 */
int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type) {
    int receivers = 0;
    dictEntry *de;
    dictIterator *di;
    listNode *ln;
    listIter li;

    /* Send to clients listening for that channel */
    de = dictFind(*type.serverPubSubChannels, channel);
    if (de) {
        list *list = dictGetVal(de);
        listNode *ln;
        listIter li;

        listRewind(list,&li);
        while ((ln = listNext(&li)) != NULL) {
            client *c = ln->value;
            addReplyPubsubMessage(c,channel,message);
            updateClientMemUsage(c);
            receivers++;
        }
    }

    if (type.shard) {
        /* Shard pubsub ignores patterns. */
        return receivers;
    }

    /* Send to clients listening to matching channels */
    di = dictGetIterator(server.pubsub_patterns);
    if (di) {
        channel = getDecodedObject(channel);
        while((de = dictNext(di)) != NULL) {
            robj *pattern = dictGetKey(de);
            list *clients = dictGetVal(de);
            if (!stringmatchlen((char*)pattern->ptr,
                                sdslen(pattern->ptr),
                                (char*)channel->ptr,
                                sdslen(channel->ptr),0)) continue;

            listRewind(clients,&li);
            while ((ln = listNext(&li)) != NULL) {
                client *c = listNodeValue(ln);
                addReplyPubsubPatMessage(c,pattern,channel,message);
                updateClientMemUsage(c);
                receivers++;
            }
        }
        decrRefCount(channel);
        dictReleaseIterator(di);
    }

여기서 볼 부분은 실제로 위의 해당 channel 이 속한 cluster 로 clusterSendPublish를 통해서 전달되게 됩니다. 여기서 clusterGetNodesServingMySlots 에서는 해당 slot 을 가진 primary 와 replica를 모두 전달받게 됩니다. 즉, 보통 최소 2개 정도가 정상입니다.

/* -----------------------------------------------------------------------------
 * CLUSTER Pub/Sub shard support
 *
 * Publish this message across the slot (primary/replica).
 * -------------------------------------------------------------------------- */
void clusterPropagatePublishShard(robj *channel, robj *message) {
    list *nodes_for_slot = clusterGetNodesServingMySlots(server.cluster->myself);
    if (listLength(nodes_for_slot) != 0) {
        listIter li;
        listNode *ln;
        listRewind(nodes_for_slot, &li);
        while((ln = listNext(&li))) {
            clusterNode *node = listNodeValue(ln);
            if (node != myself) {
                clusterSendPublish(node->link, channel, message, CLUSTERMSG_TYPE_PUBLISHSHARD);
            }
        }
    }
    listRelease(nodes_for_slot);
}

clusterSendPublish 함수는 다음과 같이 publish 메세지를 해당 slot 을 가진 노드에 전달합니다.

/* Send a PUBLISH message.
 *
 * If link is NULL, then the message is broadcasted to the whole cluster.
 *
 * Sanitizer suppression: In clusterMsgDataPublish, sizeof(bulk_data) is 8.
 * As all the struct is used as a buffer, when more than 8 bytes are copied into
 * the 'bulk_data', sanitizer generates an out-of-bounds error which is a false
 * positive in this context. */
REDIS_NO_SANITIZE("bounds")
void clusterSendPublish(clusterLink *link, robj *channel, robj *message, uint16_t type) {
    unsigned char *payload;
    clusterMsg buf[1];
    clusterMsg *hdr = (clusterMsg*) buf;
    uint32_t totlen;
    uint32_t channel_len, message_len;

    channel = getDecodedObject(channel);
    message = getDecodedObject(message);
    channel_len = sdslen(channel->ptr);
    message_len = sdslen(message->ptr);

    clusterBuildMessageHdr(hdr,type);
    totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
    totlen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len;

    hdr->data.publish.msg.channel_len = htonl(channel_len);
    hdr->data.publish.msg.message_len = htonl(message_len);
    hdr->totlen = htonl(totlen);

    /* Try to use the local buffer if possible */
    if (totlen < sizeof(buf)) {
        payload = (unsigned char*)buf;
    } else {
        payload = zmalloc(totlen);
        memcpy(payload,hdr,sizeof(*hdr));
        hdr = (clusterMsg*) payload;
    }
    memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
    memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
        message->ptr,sdslen(message->ptr));

    if (link)
        clusterSendMessage(link,payload,totlen);
    else
        clusterBroadcastMessage(payload,totlen);

    decrRefCount(channel);
    decrRefCount(message);
    if (payload != (unsigned char*)buf) zfree(payload);
}

그래서 Sharded Pub/Sub 은 ssubscribe, spublish 등의 명령을 가지고 있습니다. 그래서 테스트를 해보면 다음과 같은 결과를 볼 수 있습니다. ssubscribe test 를 하면 7003 으로 -MOVED가 전송됩니다. 7003으로 접속해서 다시 ssubscribe를 하면 성공함을 볼 수 있습니다.

➜  redis git:(unstable) ✗ telnet 0 7005
Trying 0.0.0.0...
Connected to 0.
Escape character is '^]'.
ssubscribe test
-MOVED 6918 127.0.0.1:7003
quit
+OK
Connection closed by foreign host.
➜  redis git:(unstable) ✗ telnet 0 7003
Trying 0.0.0.0...
Connected to 0.
Escape character is '^]'.
ssubscribe test
*3
$10
ssubscribe
$4
test
:1

spublish 도 마찬가지 입니다.

➜  ~  telnet 0 7002
Trying 0.0.0.0...
Connected to 0.
Escape character is '^]'.
spublish test 123
-MOVED 6918 127.0.0.1:7003
quit
+OK
Connection closed by foreign host.
➜  ~ telnet 0 7003
Trying 0.0.0.0...
Connected to 0.
Escape character is '^]'.
spublish test 123
:1

실제로 spublish 를 하면 이제 해당 shard 군만 받게 되는 것을 볼 수 있습니다. 이로 인해서 broadcast 를 줄여서, 실제 부하를 줄이겠다는게 Redis 7.x에서 나오는 Sharded Pub/Sub 입니다. 다만 제가 보기에는 Sharded Pub/Sub도 명확한 한계가 있는 방식입니다. 물론, 이전의 방식이 더 좋다는 것은 아니지만, Redis 가 진짜 대규모의 Pub/Sub을 처리하기 위해서는 좀 더 많은 구조적인 개선이 필요할듯 합니다.

[입 개발] 스타트업을 위한 AWS 로그 시스템 Part #3

로그를 분석하거나 저장할 필요가 없는 서비스면 상관이 없겠지만, 결국 대부분의 서비스는 로그를 저장하고 이를 통해서 서비스를 상황을 분석하게 됩니다.(혼자서 테스트로만 쓰는 서비스면 그럴 필요가 없죠.) 그래서 이제 마지막으로 어떻게 s3에 저장할 것인가에 대해서 얘기를 할려고 합니다. 2부에서도 보였지만, 아래 형태 처럼 S3에 저장이되면 EMR, Athena 등을 이용해서 처리를 할 수 있습니다.

가장 쉬운 방법은 해당 CloudWatch Log를 S3로 덤프하는 것입니다. 아래 처럼 CloudWatch 에서 Export data to Amazon S3를 통해서 덤프를 하는 것입니다.

해당 메뉴를 선택하고, 덤프할 시간대를 지정하면 해당 경로에 덤프가 되게 됩니다. 여기에 주의할 것은 CloudWatch Log Stream 은 로컬 시간으로 보여지지만, 덤프는 UTC 기준으로 설정해야 한다는 부분입니다.

가장 쉽고, 쉽게 생성할 수 있습니다. API로도 쉽게 가능한데, 단점(?)은 로그가 cloudwatch에서 보이는 형태로 그대로 저장됩니다. 예를 들어 우리의 Log 자체는 json 형태인데 cloud watch 에는 시간 날짜 서버 주소 등이 추가되어서 다음과 같은 형태로 저장되게 됩니다.

2022-02-24T15:15:41.565Z Feb 25 00:15:41 ip-10-10-104-20 web: {"timestamp": "2022-02-25T00:15:41" 어쩌구 저쩌구 원래 로그}

그리고 실시간이 아니라 배치 형태입니다. 물론 (1분 단위로 저장하면 될것 같긴 한데…) 그리고 로그 구조도 위의 s3 bucket prefix 설정 이후에 {uuid}/{logstream name}/000000.gz 이런식으로 저장되게 됩니다.

그럼 실시간으로 하고 싶다면? kinesis firehose delivery stream 을 사용할 수 있습니다. create delivery stream 을 통해서 firehose delivery stream 을 생성할 수 있습니다.

이때 기본적으로 Source를 DIRECT_PUT으로 설정하고Destination 을 Amazon S3 로 설정하면 됩니다.

그리고 어디에 저장할지 S3 Bucket 과 S3 bucket Prefix 를 지정하면 됩니다. 항상 저 prefix 뒤에 YYYY/MM/dd/HH 가 붙게됩니다.(UTC 기준입니다.)

이런 작업을 하기 전에 해당 firehose 가 s3에 쓰기 위한 IAM 과 cloudwatch에서 해당 firehose delivery stream 을 쓰기 위한 IAM을 두 개 생성해야 합니다.

그리고 이제 cloudwatch에서 Subscription Filters 를 이용해서 로그를 firehose 에 준 실시간(몇분 정도의 데이터가 모여서 전달이 되는 걸로 보입니다.) 형태로 아까 지정한 s3 경로에 데이터가 저장되게 됩니다.)

그런데 문제는 Subscription Filters 형태로 그냥 저장을 해버리면, 로그 포맷이 다음과 같은 형태입니다.

{"messageType":"DATA_MESSAGE","owner":"171305364530","logGroup":"/aws/elasticbeanstalk/myservervice/var/log/myservice/reqlog.log","logStream":"myservice/i-0d93c1be6e7f0b515","subscriptionFilters":["adminlog_to_s3"],"logEvents":[{"id":"36685096528927476510604570585254667982719470768821960704","timestamp":1645016621747,"message":"{\"logType\":\"BasicErrorController#error\",\"server_timestamp\":1645016613084,\"remote_ip\":\"20.114.132.182\",\"userId\":-1,\"url\":\"/error\",\"status\":500}"}]}{"messageType":"DATA_MESSAGE","owner":"171305364530","logGroup":"/aws/elasticbeanstalk/myservice/var/log/myservice/reqlog.log","logStream":"i-0d93c1be6e7f0b515","subscriptionFilters":["adminlog_to_s3"],"logEvents":[{"id":"36685096528927476510604570585230365759195078421286944768","timestamp":1645016621747,"message":"{\"logType\":\"BasicErrorController#error\",\"server_timestamp\":1645016613084,\"remote_ip\":\"20.114.132.182\",\"userId\":-1,\"url\":\"/error\",\"status\":500}"}]}

Subscription Filter에서 저런 형태로 데이터를 Firehose로 넘겨주게됩니다. logEvents 안에 몇개의 로그들이 배열로 들어가 있고, 그 중에 message 부분에 실제 저장한 로그가 들어가 있습니다.. (다음 로그도 합쳐져서 저장이 되어서 json 파싱도 어려운…)

그러면 이걸 어떻게 하는가? 실제로 Subscription Filter를 lambda로 만들어서 lambda로 데이터를 보내고, 해당 데이터만 추출해서 데이터를 저장하는 형태로 사용하면 원래 파일에 남아있던 로그만 저장할 수 가 있습니다.

실제로 이 부분을 만들어야 하나 싶었는데, 다음을 이용하면 바로 aws lambda 로 설정이 됩니다. https://github.com/jlhood/json-lambda-logs-to-kinesis-firehose

코드를 보면 실제로 특별한 작업을 하는 것은 아니고 다음과 같습니다. 즉 Subscription Filter로 인해서 발생한 로그가 AWS Lambda의 handler 를 호출하면, 아까 Firehose 로그 형식을 읽어서 파싱합니다. logEvents 부분을 읽어서 개별로 FIREHOSE의 해당 경로에 저장하게 됩니다.

FIREHOSE = boto3.client('firehose')


def handler(event, context):
    """Forward JSON-formatted CW Log events to Firehose Delivery Stream."""
    LOG.debug('Received event: %s', event)

    log_messages = _get_log_messages(event)
    for log_message in log_messages:
        if _is_json(log_message):
            if not log_message.endswith('\n'):
                log_message += '\n'

            FIREHOSE.put_record(
                DeliveryStreamName=config.DELIVERY_STREAM_NAME,
                Record={
                    'Data': log_message
                }
            )


def _get_log_messages(event):
    data = json.loads(gzip.decompress(base64.b64decode(event['awslogs']['data'])))
    return [log_event['message'] for log_event in data['logEvents']]

위의 Lambda 를 등록하면 이제 해당 로그가 다음 같은 구조로 흐르게 됩니다.

개인적으로는 실시간이 필요없다면 그냥 export 방식이 가장 수월해 보입니다. (필요하면 주기를 좀 줄여서 실시간 처럼 보일 수도…)

저렇게 S3에 저장이 된 후에 EMR등을 이용해서 분석을 할 수 있습니다. 아마 초반에는 실시간 데이터는 필요 없을듯 하니, 최대한 간단하게 가져갈 수 있는게 좋을듯 합니다. 다만… 세상이 그렇게 쉽지 많은 않은… 보안 로그도 관리해야하고… 핀테크는 제약이 좀 더 많네요.

스타트업에서의 목표는 최소 비용(사람의 노력이 가장 큰 비용입니다.)으로 어느정도의 효과를 뽑아내는 거라고 생각합니다.(최대가 아닙니다.)

데이터 엔지니어어 생활도 오래한거 같지만… 데이터쪽은 항상 뭔가 할려고 하니 어렵네요. 일단은 이렇게 땜방을 치고… 나중에 좀 더 고도화를 해야…

이렇게 로그를 수집하려는 이유는, 일단은 에러로그 같은 걸 좀 쉽게 보기 위해서… DB를 최대한 안 읽고, 필요한 현재 상태를 파악하기 위해서 입니다. 사실 로그에서 가장 중요한 것은, 어떻게 구축하냐가 아니라, 어떤 정보를 남기는가를 정의하는 것입니다. DB에 있는 데이터를 계속 접근해야 하면, 데이터 량이 작을때는 별 문제가 아니지만, 나중에는 DB에서 그 데이터를 읽기 위해서 많은 비용을 들이게 됩니다.(단점은 로그가 안정적인가? – 유실은 없을까 등… 고민이 많습니다.)

어서 빨리 서비스가 성장해서 데이터 처리에 더 많은 고민을 할 수 있으면 좋겠습니다.

[입 개발] 스타트업을 위한 AWS 로그 시스템 Part #2

1부에서는 CloudWatch Log Stream 에 데이터를 저장하는 것 까지 살펴보았습니다. 그런데 그렇게 로그를 보낸다고 하면 고민해야 할 부분들이 있습니다.

  • 로그 파일의 사이즈
    • /var/log/myservice/myservice.log 에 파일을 저장한다고 가정을 하겠습니다. 그러면 해당 파일에 계속 로그가 쌓이다 보면 어떤 문제가 발생하게 될까요?
    • 계속 데이터가 쌓이다보면, 결국 해당 디스크의 디스크 용량을 서비스 로그가 전부 디스크를 사용하게 되면, 해당 인스턴스에 문제가 발생하게 될 것입니다.

그렇다면 위의 문제를 기존에 beanstalk 에서는 어떻게 사용하고 있을까요? beanstalk의 기본 설정으로는 logrotate 를 사용하고 있습니다. logrotate 에 대한 기본 설명 자체는 아래 블로그가 잘 설명하고 있어서 이것으로 대체하도록 하겠습니다.

https://velog.io/@gillog/logrotate

해당 설정은 beanstalk를 사용하면 /etc/logrotate.elasticbeanstalk.hourly 를 보면 확인할 수 있습니다.

[ec2-user@ ~]$ cd /etc/logrotate.elasticbeanstalk.hourly/
[ec2-user@ logrotate.elasticbeanstalk.hourly]$ ll
total 32
-rw-r--r-- 1 root root 157 Feb 11 00:54 logrotate.elasticbeanstalk.eb-engine.conf
-rw-r--r-- 1 root root 156 Feb 11 00:54 logrotate.elasticbeanstalk.eb-hooks.conf
-rw-r--r-- 1 root root 171 Feb 15 11:28 logrotate.elasticbeanstalk.healthd.conf
-rw-r--r-- 1 root root 157 Feb 15 11:28 logrotate.elasticbeanstalk.nginx.conf
-rw-r--r-- 1 root root 156 Feb 11 00:54 logrotate.elasticbeanstalk.web-stderr.conf
-rw-r--r-- 1 root root 156 Feb 11 00:54 logrotate.elasticbeanstalk.web-stdout.conf

여기에 logrotate 필요한 로그에 대한 설정을 다음과 같이 추가해주면 됩니다.

/var/log/myservice/*.log {
 su root root
 size 1M
 rotate 5
 missingok
 compress
 notifempty
 copytruncate
 dateext
 dateformat %s
 olddir /var/log/rotated
}

이 때 해당 설정 파일의 umask 는 444 나 644 가 되어야 합니다. 위의 설정도 .ebextention의 files 섹션에 내용을 추가함으로써 생성할 수 있습니다.

  "/etc/logrotate.elasticbeanstalk.hourly/logrotate.elasticbeanstalk.reqlog.conf" :
    mode: "000444"
    owner: root
    group: root
    content: |
      /var/log/lemontree-admin/*.log {
       su root root
       size 1M
       rotate 5
       missingok
       compress
       notifempty
       copytruncate
       dateext
       dateformat %s
       olddir /var/log/rotated 
      }

그럼 위의 설정만으로 과연 해당 로그가 logrotate가 되게 될까요? 위의 설정중에 olddir 을 보면 logrotate가 된 파일이 저장되게 됩니다. 위의 rotate 5 설정에 의해서 최고 5개만 남게 되죠.

/var/log/rotated 로 가보면 다음과 같이 파일 목록이 존재하게 됩니다. 파일 뒤에 붙는 이름의 형식은 위의 설정의 dateformat %s 를 사용하면 timestamp 로, 아니면 YYYYMMDD나 순서등으로 이름이 생성됩니다.

[ec2-user@ip-]$ logrotate.elasticbeanstalk.hourly]$ cd /var/log/rotated/
[ec2-user@ip rotated]$ ll
total 72
-rw-r--r-- 1 root   root   5444 Feb 11 02:42 eb-engine.log1644516061.gz
-rw-r--r-- 1 root   root   5678 Feb 12 01:36 eb-engine.log1644598861.gz
-rw-r--r-- 1 root   root   3888 Feb 12 13:46 eb-engine.log1644642061.gz
-rw-r--r-- 1 root   root   5664 Feb 12 14:25 eb-engine.log1644645661.gz
-rw-r--r-- 1 root   root   3899 Feb 15 11:28 eb-engine.log1644894061.gz
-rw------- 1 root   root   1851 Feb 16 20:31 web.stdout.log1645012861.gz
-rw------- 1 root   root   1843 Feb 16 21:57 web.stdout.log1645016462.gz
-rw------- 1 root   root   1693 Feb 16 22:44 web.stdout.log1645020061.gz
-rw------- 1 root   root   3905 Feb 16 23:43 web.stdout.log1645023661.gz
-rw------- 1 root   root   2076 Feb 17 00:55 web.stdout.log1645027261.gz

logrotate 를 사용할 경우, 만약에 해당 log 사이즈가 많고 지속적으로 데이터가 저장되고 있으면 logrotate가 발생하는 순간에 많은 부하가 발생할 수 있습니다. 이것은 logrotate의 동작 특성을 살펴보면 알 수 있는데 위의 설정에서 copytruncate 옵션을 주면 다음과 같이 처리되게 됩니다.

  • 먼저 logrotate 는 임시 파일을 생성합니다.
  • tmpfile의 이름을 필요한 형태로 변경합니다.
  • Myserver.log 의 내용을 Myserver.log.YYYYMMDD 로 복사합니다. 이 때 데이터의 양이 많으면 복사 작업 때문에 시스템 IO가 많이 발생하고 이때 서비스 timeout 과 로그 유실이 발생할 수 있습니다.
  • 최종적으로 Myserver.log 의 내용을 지우고 다시 해당 Myserver.log 에 로그를 쓰게 됩니다.

여기서 발생할 수 있는 부분에 대해서는 아래 블로그를 참고하면 될듯합니다.

https://brunch.co.kr/@alden/27 – 참고하고 보니, 갓앨든(강진우) 님의 브런치네요. 사랑해요 앨든!!!

그런데 이렇게 logrotate 설정만으로는 logrotate가 되지 않습니다. 실제로 해당 logrotate를 매 시간마다 실행시켜 주는 것은 cron 에 의해서 동작하고 있습니다. 그래서 실제로 cron 작업을 추가하는 마지막 작업이 있어야 합니다.

이것은 /etc/cron.hourly/ 를 살펴보면 쉽게 찾을 수 있습니다.

[ec2-user@ip- rotated]$ cd /etc/cron.hourly/
[ec2-user@ip- cron.hourly]$ ll
total 40
-rwxr-xr-x 1 root root 392 Jan 16  2020 0anacron
-rwxr-xr-x 1 root root 121 Feb 15 11:28 cron.logcleanup.elasticbeanstalk.healthd-proxy.conf
-rwxr-xr-x 1 root root 152 Feb 15 11:28 cron.logrotate.elasticbeanstalk.eb-engine.conf
-rwxr-xr-x 1 root root 151 Feb 15 11:28 cron.logrotate.elasticbeanstalk.eb-hooks.conf
-rwxr-xr-x 1 root root 150 Feb 15 11:28 cron.logrotate.elasticbeanstalk.healthd.conf
-rwxr-xr-x 1 root root 148 Feb 15 11:28 cron.logrotate.elasticbeanstalk.nginx.conf
-rwxr-xr-x 1 root root 149 Feb 12 18:07 cron.logrotate.elasticbeanstalk.reqlog.conf.bak
-rwxr-xr-x 1 root root 153 Feb 15 11:28 cron.logrotate.elasticbeanstalk.web-stderr.conf
-rwxr-xr-x 1 root root 153 Feb 15 11:28 cron.logrotate.elasticbeanstalk.web-stdout.conf

cron 설정을 보시면 다음과 같이 내용이 들어가 있습니다.

#!/bin/sh
test -x /usr/sbin/logrotate || exit 0
/usr/sbin/logrotate -f /etc/logrotate.elasticbeanstalk.hourly/logrotate.elasticbeanstalk.web-stdout.conf

이 해당 설정도 이제 files 섹션에 추가해주면 logrotate 설정이 완료하게 됩니다.

  "/etc/cron.hourly/cron.logrotate.elasticbeanstalk.myservice.conf" :
    mode: "000444"
    owner: root
    group: root
    content: |
      #!/bin/sh
      test -x /usr/sbin/logrotate || exit 0
      /usr/sbin/logrotate -f /etc/logrotate.elasticbeanstalk.hourly/logrotate.elasticbeanstalk.myservice.conf

이제 이것으로 기본적으로 서버의 설정은 끝났습니다.

  • 1부에서는 로그의 수집
  • 2부에서는 서버 안정성을 위한 logrotate 설정을 살펴보았습니다.

그런데 과연 이것만으로 다 된것일까요?

log의 retention 설정이 중요한데, cloudwatch 에 있는 log group의 retention 은 저희가 설정하기 나름입니다. 1일에서 영구적으로도 가능합니다. 해당 로그는 logs insight를 통해서 살펴 볼 수 있습니다. 다만 이 retention 기간이 지나면 없어지기 때문에 영구적으로 설정하거나, 잘 백업을 해둬야 합니다.

사실 로그는 이렇게 확인을 하기 위해서도 저장하지만, 분석한 필요한 정보들을 포함하고 있기 때문에, 분석을 위해서 이를 저장할 필요가 있습니다. AWS에서 가장 쉽게 사용하는 분석툴은 athena 나 EMR을 이용해서 분석을 해야 할텐데요. 결국 이를 위해서는 cloudwatch log stream에 있는 로그들을 s3에 적절한 형태로 저장할 필요가 있습니다.

3부에서는 이 부분에 대해서 다뤄보도록 하겠습니다.

[입 개발] 스타트업을 위한 AWS 로그 시스템 Part #1

스타트업에 다니고 있다보니 당장 필요하지는 않지만, 로그 수집 시스템을 구축해야 했습니다.(당장 필요하지 않은데 왜 하는거지!!!) 라는 의문이 들 수 있는데…(들어야 정상입니다.), 사실 꼭 초반에 구성하지 않더라도 큰 문제는 없습니다. 아직 저희는 서비스가 출시 전이기 때문에, 출시 전에 로그를 수집하더라도 큰 문제는 없지만, 다음과 같은 생각이 있었습니다.(만구 제 생각입니다.)

  • 서비스 출시 전으로 갈 수록 서비스 관련 이슈가 많으므로 로그 시스템을 구축할 수 있는 시간이 줄어들 것이다.
  • 그러면, 실제로 필요한 로그들을 정리하기 어려워서 놓치는 부분들이 생기고, 필요한 로그들이 있는지 체크하기 어려울 것이다.

그래서 일단은 로그 수집 시스템을 구축하려고 했는데, 그렇다고 기존에 다른 회사에서 사용하던 로그 수집 시스템을 그대로 사용하기에는 관리 비용이 비싸다고 생각했습니다.

beanstalk 에서도 자체적으로 로그를 보여주게 되는데, Beanstalk 에서 로그가 보고싶은 environment를 선택하고 Logs 를 선택합니다.

그리고 Request Logs 를 선택하고 Last 100 Lines 나 Full Logs를 선택해서 원하는 로그를 볼 수 있습니다.

개발중에는 대략 이렇게 로그를 볼 수도 있습니다. 그런데 서비스를 진행하면서 이렇게 로그를 볼 수 는 없습니다. 왜냐하면 로그는 서비스의 현재와 과거의 모습, 그리고 우리가 어떻게 성장을 해야 하는지를 보여주는 핵심 지표이기 때문입니다.

예를 들어 로그를 모아서 다음과 같은 정보들을 나중에 보여주고 싶습니다.

  • 현재 서비스의 사용자 수(DAU, WAU, MAU)
  • 현재 가장 많이 발생하는 Exception(장애시 이런게 엄청 중요합니다.)
  • 어떤 이벤트로 변하는 사용자 수

그래서 로그를 수집하는 일반적인 파이프라인은 다음과 같은 형태를 많이 사용합니다. 그리고 저기서 ELK 가 붙어서 비주얼라이제이션까지 처리를 많이 하게 되죠.

처음에는 위의 형태를 사용할까 생각을 했는데 몇 가지 고민이 생겼습니다.

  • 우리는 AWS Beanstalk 를 쓰고 있다.
    • beanstalk 라고 filebeat를 설치하는 것이 어렵지는 않습니다.
  • Kafka를 써야 하나?
    • Kafka는 굉장히 좋은 툴이고, AWS는 Managed Kafka 까지 제공해주고 있습니다. 그럼에도 불구하고, 우리가 Kafka의 운영에 신경을 써야 한다는 것이 고민 포인트였습니다.
  • ELK를 다 써야 하나?
    • elastic도 굉장히 좋은 솔루션인데, 스타트업에서는 살짝 고민되는 비용이기도 합니다.
    • 사실, 반대로는 스타트업이라서 로그가 별로 없어서 써도 상관없는 비용이기도 한…(무슨 소리냐!!!)

그래서 이것 저것을 조사하다 보니, 소규모에는 CloudWatch Log 정도로도 충분할꺼라는 지인들의 조언을 많이 받게 되어서 (DataDog이나 어려가지 SaaS 솔루션의 추천도 많고, 예전에 써본 경험도 있긴 한데… 흑흑흑 쓸 수 없었던…) 해당 솔루션 들을 리서치 하게 되었다.

그래서 현재는 (아직 구축이 끝나지 않았다!!!) 다음과 같다. 현재 필요한 로그들을 cloudwatch 에 까지 보내는 것은 테스트가 되었고, 뒷 부분에 lambda 로 s3에 저장을 할지, firehose 로 저장을 할지를 고민중이다.(이것도 사실 테스트는 되었는데……)

즉 지금 보는 솔루션은 beanstalk 에 awslogs 와 cwlogs 를 통해서 로그를 cloudwatch 로 저장하고 이를 다시 s3에 저장한 다음 분석 작업을 진행하는 것이다.(분석 작업은 현재 없다!!! 엉???)

그런데 beanstalk 가 조금 애매한 부분이 terraform 으로 environment 는 설정이 가능한데, 거기의 세세한 설정은 또 다른 방식으로 해야하는 것들이 있습니다. 예를 들어, 기본적으로 바로 beanstalk를 사용한다면, 기본 로그들 말고 우리가 남겨야 하는 custom log 들이 있습니다. 이런 것들을 어떻게 수집해야 할까요?

사실 beanstalk에서는 아주 간단하게 적용할 수 있는 방법을 제공합니다. 첫번째는 환경 설정에서 로그를 남기는 설정을 켜두는 것입니다.

Beanstalk 설정에서 Configuration 을 선택하고 Software 에서 Edit 를 누르면 설정을 쉽게 할 수 있습니다. 아래에서 Enabled 만 설정하시면 자동으로 cloudwatch에 로그가 남게 됩니다.

위의 설정을 켜고 CloudWatch 메뉴로 가서 Log groups 를 선택하게 되면 바로 기본적으로 로그가 남는 것을 알 수 있습니다. 예를 들어 자바 App을 구성하시면 5개의 로그가 기본적으로 생깁니다.(nginx 는 nginx를 proxy로 쓴다면…)

  • /aws/elasticbeanstalk/{environment}/var/log/eb-engine.log
  • /aws/elasticbeanstalk/{environment}/var/log/eb-hooks.log
  • /aws/elasticbeanstalk/{environment}/var/log/nginx/access.log
  • /aws/elasticbeanstalk/{environment}/var/log/nginx/error.log
  • /aws/elasticbeanstalk/{environment}/var/log/web.stdout.log

그런데 사실, 모든 출력을 web.stdout.log 로 출력을 해도 되지만, 관리를 위해서 필요한 로그를 추가를 하게 됩니다.

그렇다면 이 것은 어떻게 추가를 해야할까요? 일단 수동으로 하는 건 최대한 피해야 하고(수동으로 할 수 있는 방법도 없는 T.T)

beanstalk 는 이걸 위해 .ebextensions 에 확장을 위한 방법을 제공합니다. 사실 이미 aws 쪽에서는 이에 대한 Sample 을 제공하고 있습니다.(https://github.com/awsdocs/elastic-beanstalk-samples/blob/master/configuration-files/aws-provided/instance-configuration/logs-streamtocloudwatch-linux.config)

일단 초기에는 다음과 같이 설정했습니다.

packages:
  yum:
    awslogs: []

commands:
  01-awslog:
    command: systemctl enable awslogsd.service
  02-awslog:
    command: systemctl restart awslogsd


files:
  "/etc/awslogs/awslogs.conf" :
    mode: "000600"
    owner: root
    group: root
    content: |
      [general]
      state_file = /var/lib/awslogs/agent-state

  "/etc/awslogs/config/myservice.conf":
    mode: "000600"
    owner: root
    group: root
    content: |
      [/var/log/myservice/myservice.log]
      log_group_name = `{"Fn::Join":["/", ["/aws/elasticbeanstalk", "myservice", "var/log/myservice/myservice.log"]]}`
      log_stream_name = {instance_id}
      file = /var/log/myservice/myservice.log

  "/opt/elasticbeanstalk/tasks/bundlelogs.d/myservice.conf" :
    mode: "000755"
    owner: root
    group: root
    content: |
      /var/log/myservice/*.log

  "/opt/elasticbeanstalk/tasks/taillogs.d/myservice.conf" :
    mode: "000755"
    owner: root
    group: root
    content: |
      /var/log/myservice/*.log

위의 설정을 살펴보면 packages 는 Amazon linux 2 부터는 awslogs 가 설치가 안되어 있기 때문에 기본적으로 설치를 해주는 부분입니다.

packages:
  yum:
    awslogs: []

두 번째 commands 는 awslogs 를 설정하고 재시작 하는 부분입니다.

commands:
  01-awslog:
    command: systemctl enable awslogsd.service
  02-awslog:
    command: systemctl restart awslogsd

세 번째 files 섹션은 해당 파일들을 생성합니다. 여기에 적는 파일들은 설정파일들입니다. awslogs.conf 는 state_file을 저장하는 해당 파일은 filebeat 같은 종류의 파일들이 현재 데이터를 어디까지 보냈는지를 기록해두는 것처럼 awslogs가 보통 파일의 어디까지를 전송했는지 position 값을 가지고 있습니다.

  "/etc/awslogs/awslogs.conf" :
    mode: "000600"
    owner: root
    group: root
    content: |
      [general]
      state_file = /var/lib/awslogs/agent-state

이제 custom 로그를 cloudwatch 로 보내는 설정입니다. /etc/awslogs/config 에 들어가는 설정이 cloudwatch로 보내는 설정들이라고 보시면 됩니다. 혹시나 추가하고 싶은 파일들이 더 있으면 더 만들어주거나 하면 됩니다.

  "/etc/awslogs/config/myservice.conf":
    mode: "000600"
    owner: root
    group: root
    content: |
      [/var/log/myservice/myservice.log]
      log_group_name = `{"Fn::Join":["/", ["/aws/elasticbeanstalk", "myservice", "var/log/myservice/myservice.log"]]}`
      log_stream_name = {instance_id}
      file = /var/log/myservice/myservice.log

그리고 아래의 두 설정은 아까 beanstalk environment 설정에서 본 request logs 에서 Last 100 라인과 모두 보기를 위해서 추가해주는 설정입니다. 저기에 파일 내용을 보고 해당 파일들의 내용을 보여줍니다.

"/opt/elasticbeanstalk/tasks/bundlelogs.d/myservice.conf" :
    mode: "000755"
    owner: root
    group: root
    content: |
      /var/log/myservice/*.log

  "/opt/elasticbeanstalk/tasks/taillogs.d/myservice.conf" :
    mode: "000755"
    owner: root
    group: root
    content: |
      /var/log/myservice/*.log

이렇게 실행을 시키고 보면, 실행이 안되었습니다. 이유를 살펴보니, Log를 생성해야 하는 폴더가 없고 권한이 없어서 App이 실행이 안되었습니다. 그래서 commands 섹션을 다음과 같이 바꿔줍니다. 필요한 폴더를 생성해주는 것입니다.

commands:
  01-directory:
    command: "sudo mkdir -p /var/log/myservice/"
  02-directory:
    command: "sudo chmod 777 /var/log/myservice/"
  03-awslog:
    command: systemctl enable awslogsd.service
  04-awslog:
    command: systemctl restart awslogsd

이렇게 하고 보니, 이제 beanstalk 환경에서는 마지막 100라인등의 로그에는 잘 추가가 되었는데 cloudwatch에는 보이지 않았습니다. 이상하다 싶어서 설정을 바꿔보고 이름을 바꿔봐도 T.T 로그가 보이지 않았습니다.

이때 문제를 확인하는 방법중에 하나가 직접 해당 인스턴스에 들어가서 설정 파일들을 살펴보는 것입니다. ssh로 접근을 해서 설정을 보아도 크게 이상한게 없었는데…(제 눈이 침침해서…)

위에서 AWS 공식 예제와 다른 부분을 살펴보면 다음 부분이 있습니다. 제가 실수했던 부분은 해당 파일이 있는지만 확인하고 넘어간 것이었는데…

  "/etc/awslogs/awscli.conf" :
    mode: "000600"
    owner: root
    group: root
    content: |
      [plugins]
      cwlogs = cwlogs
      [default]
      region = `{"Ref":"AWS::Region"}`

ssh 로 직접 들어가서 해당 파일을 확인해보니… 파일 내용이 다음과 같았습니다.

[plugins]
cwlogs = cwlogs
[default]
region = us-east-1

네네, 제 Region은 ap-northeast-2 인데… 저 설정이 us-east-1 입니다. 제가 아까 awslogs 에 있는 설정은 cloudwatch 로그를 위한 것이라고 했습니다. cwlogs 가 cloudwatch logs인거죠.

그런데 이상한 부분은 일단 기존의 다른 로그들은 ap-northeast-2로 가고 있는데, custom 로그만 머나먼 버지니아(us-east-1)로 가고 있던 것입니다. 그래서 해당 region 의 cloudwatch를 보니… 로그가 거기에 쌓이고 있는…

이렇게 cloudwatch에 로그가 쌓이는 걸 확인했지만 사실 여기서 cloudwatch에 데이터를 보내는 것이 완료된 것이 아닙니다. 어떤 부분이 남아있을까요? 그것은 2부에서…

[입 개발] Redis LRU(Least Recently Used Algorithm)에 대해서

Redis 는 데이터를 영구적으로 저장하는 Persistent Store 의 역할도 하지만, 주로 데이터의 접근을 빠르게 하기 위한 Cache 로 많이 사용됩니다.

Redis 는 메모리를 데이터 저장소로 이용하기 때문에, Disk를 사용하는 다른 솔루션 보다 적은 양의 데이터를 저장하게 되고, 이로 인해서 최대치 까지 데이터를 저장하면, 새로운 데이터를 저장하기 위해서 기존 데이터를 버려야 하는 작업을 해야 합니다. 이를 eviction 이라고 부릅니다. – 다만 Expire 가 되어서 사라지는 것은 Eviction 이라고 부르지 않습니다. (실제로 Redis Enterprise 솔루션을 보면 이렇게 eviction 되어야 하는 데이터를 flash disk 에 저장해서 좀 더 빠른 접근을 하게 하는 솔루션도 있습니다.)

eviction 을 위해서 사용하는 방식중에 가장 일반적인 방식중에 하나가 LRU 입니다. LRU는 Least Recently Used Algorithm 으로, 가장 오랫동안 참조되지 않은 페이지를 교체하는 기법입니다. 보통 OS에서 페이징에서 사용하는 메모리를 교체하는 방식에서 사용되고 있으면 보통 페이지 교체 알고리즘은 다음과 같습니다.

알고리즘비고
FIFO(First In First Out)가장 먼저 메모리에 올라온 페이지를 교체
* Belady’s Anomaly(FIFO anomaly)가 발생할 수 있음
OPT(OPTimal Page Replacement)앞으로 가장 오랫동안 사용되지 않을 페이지를 교체
* 앞으로 프로세스가 사용할 페이지를 미리 알아야해서 불가능
LRU(Least Recently Used)가장 오랫동안 사용되지 않은 페이지를 교체
Count-BasedOPT와 비슷한 성능을 보여주고 같은 이유로 현실적으로 사용불가능
LFU(Least Frequently Used)참조 횟수가 가장 적은 페이지를 교체
MFU(Most Frequently Used)참조 횟수가 가장 많은 페이지를 교체
NUR(Not Used Recently)최근에 사용하지 않은 페이지를 교체(클럭 알고리즘)
Random아무거나 교체…

Redis 에서 지원해주는 알고리즘은 LRU와 LFU, RANDOM의 3가지를 제공하고 있습니다. 오늘은 여기서 LRU를 살펴보도록 하겠습니다.

보통 LRU를 구현하는 방식은 다음과 같습니다.

  • Page에 데이터를 접근한 시간에 대한 시간 데이터를 저장해서 해당 값이 가장 오래된 페이지를 교체
  • Page를 List 형태로 저장한 다음 사용된 Page를 항상 List 의 제일 앞으로 올리고, 데이터가 존재하지 않으면 가장 끝의 데이터를 삭제하고, 새로운 데이터를 List의 위에 올리는 방식으로 구현

다음 그림을 살펴보면 가장 사용되지 않은 페이지들이 교체되는 것을 볼 수 있습니다.

다시 Redis 로 돌아와서 Redis 에서 이런 Eviction 이 동작하게 하기 위해서는 maxmemory 를 설정해야 합니다. maxmemory 설정이 없으면 32bit 에서는 3GB로 제한이 자동으로 설정되고 64bit 에서는 메모리가 부족할 때 까지 계속 저장하게 됩니다.

maxmemory 10G

Redis Evicition Policy

Redis 에서는 다음과 같은 Evicition 정책을 제공합니다.

Policy비고
noeviction설정된 메모리 한계에 도달하면, 데이터 추가 명령 실행이 실패하게 됩니다.
allkeys-lruLRU 정책을 모든 Key 에 대해서 적용해서 데이터를 삭제하게 됩니다.
volatile-lruLRU 정책을 Expire 가 설정된 Key에 대해서 적용해서 데이터를 삭제하게 됩니다
allkeys-random모든 Key에 대해서 Random 하게 데이터를 삭제하게 됩니다.
volatile-randomExipre 가 설정된 Key 에 대해서 Random 하게 삭제하게 됩니다.
allkeys-lfuLFU 정책을 모든 Key 에 대해서 적용해서 데이터를 삭제하게 됩니다.
volatile-lfuLFU 정책을 Expire 가 설정된 Key 에 대해서 적용해서 데이터를 삭제하게 됩니다
volatile-ttlExpire 가 설정된 key 에 대해서 ttl이 짧은 순서로 먼저 삭제 하게 됩니다.

Eviction Process 어떻게 동작하는가?

Redis 에서 Evcition 은 메모리가 부족할 때 동작하게 되는데 다음과 같습니다.

  • 클라이언트 명령을 실행했을 때
  • Redis 가 메모리를 체크했을 때 현재 사용량이 maxmemory 설정보다 높을 때
  • 새로운 커맨드가 실행되었을 때

보통 Redis 에서 Eviction 은 server.c 에서 processCommand 함수 안에서 performEvictions 함수를 호출합니다.

    if (server.maxmemory && !scriptIsTimedout()) {
        int out_of_memory = (performEvictions() == EVICT_FAIL);

        /* performEvictions may evict keys, so we need flush pending tracking
         * invalidation keys. If we don't do this, we may get an invalidation
         * message after we perform operation on the key, where in fact this
         * message belongs to the old value of the key before it gets evicted.*/
        trackingHandlePendingKeyInvalidations();

        /* performEvictions may flush slave output buffers. This may result
         * in a slave, that may be the active client, to be freed. */
        if (server.current_client == NULL) return C_ERR;

        int reject_cmd_on_oom = is_denyoom_command;
        /* If client is in MULTI/EXEC context, queuing may consume an unlimited
         * amount of memory, so we want to stop that.
         * However, we never want to reject DISCARD, or even EXEC (unless it
         * contains denied commands, in which case is_denyoom_command is already
         * set. */
        if (c->flags & CLIENT_MULTI &&
            c->cmd->proc != execCommand &&
            c->cmd->proc != discardCommand &&
            c->cmd->proc != quitCommand &&
            c->cmd->proc != resetCommand) {
            reject_cmd_on_oom = 1;
        }

        if (out_of_memory && reject_cmd_on_oom) {
            rejectCommand(c, shared.oomerr);
            return C_OK;
        }

        /* Save out_of_memory result at script start, otherwise if we check OOM
         * until first write within script, memory used by lua stack and
         * arguments might interfere. */
        if (c->cmd->proc == evalCommand ||
            c->cmd->proc == evalShaCommand ||
            c->cmd->proc == fcallCommand ||
            c->cmd->proc == fcallroCommand)
        {
            server.script_oom = out_of_memory;
        }
    }

evict.c 의 performEvictions 함수는 다음과 같습니다. performEvictions 함수는 상당히 길고 복잡한 동작을 진행합니다.

int performEvictions(void) {
    /* Note, we don't goto update_metrics here because this check skips eviction
     * as if it wasn't triggered. it's a fake EVICT_OK. */
    if (!isSafeToPerformEvictions()) return EVICT_OK;

    int keys_freed = 0;
    size_t mem_reported, mem_tofree;
    long long mem_freed; /* May be negative */
    mstime_t latency, eviction_latency;
    long long delta;
    int slaves = listLength(server.slaves);
    int result = EVICT_FAIL;

    if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL) == C_OK) {
        result = EVICT_OK;
        goto update_metrics;
    }

    if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION) {
        result = EVICT_FAIL;  /* We need to free memory, but policy forbids. */
        goto update_metrics;
    }

    unsigned long eviction_time_limit_us = evictionTimeLimitUs();

    mem_freed = 0;

    latencyStartMonitor(latency);

    monotime evictionTimer;
    elapsedStart(&evictionTimer);

    /* Unlike active-expire and blocked client, we can reach here from 'CONFIG SET maxmemory'
     * so we have to back-up and restore server.core_propagates. */
    int prev_core_propagates = server.core_propagates;
    serverAssert(server.also_propagate.numops == 0);
    server.core_propagates = 1;
    server.propagate_no_multi = 1;

    while (mem_freed < (long long)mem_tofree) {
        int j, k, i;
        static unsigned int next_db = 0;
        sds bestkey = NULL;
        int bestdbid;
        redisDb *db;
        dict *dict;
        dictEntry *de;

        if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
            server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
        {
            struct evictionPoolEntry *pool = EvictionPoolLRU;

            while(bestkey == NULL) {
                unsigned long total_keys = 0, keys;

                /* We don't want to make local-db choices when expiring keys,
                 * so to start populate the eviction pool sampling keys from
                 * every DB. */
                for (i = 0; i < server.dbnum; i++) {
                    db = server.db+i;
                    dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ?
                            db->dict : db->expires;
                    if ((keys = dictSize(dict)) != 0) {
                        evictionPoolPopulate(i, dict, db->dict, pool);
                        total_keys += keys;
                    }
                }
                if (!total_keys) break; /* No keys to evict. */

                /* Go backward from best to worst element to evict. */
                for (k = EVPOOL_SIZE-1; k >= 0; k--) {
                    if (pool[k].key == NULL) continue;
                    bestdbid = pool[k].dbid;

                    if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
                        de = dictFind(server.db[bestdbid].dict,
                            pool[k].key);
                    } else {
                        de = dictFind(server.db[bestdbid].expires,
                            pool[k].key);
                    }

                    /* Remove the entry from the pool. */
                    if (pool[k].key != pool[k].cached)
                        sdsfree(pool[k].key);
                    pool[k].key = NULL;
                    pool[k].idle = 0;

                    /* If the key exists, is our pick. Otherwise it is
                     * a ghost and we need to try the next element. */
                    if (de) {
                        bestkey = dictGetKey(de);
                        break;
                    } else {
                        /* Ghost... Iterate again. */
                    }
                }
            }
        }

        /* volatile-random and allkeys-random policy */
        else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
                 server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM)
        {
            /* When evicting a random key, we try to evict a key for
             * each DB, so we use the static 'next_db' variable to
             * incrementally visit all DBs. */
            for (i = 0; i < server.dbnum; i++) {
                j = (++next_db) % server.dbnum;
                db = server.db+j;
                dict = (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ?
                        db->dict : db->expires;
                if (dictSize(dict) != 0) {
                    de = dictGetRandomKey(dict);
                    bestkey = dictGetKey(de);
                    bestdbid = j;
                    break;
                }
            }
        }

        /* volatile-random and allkeys-random policy */
        else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
                 server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM)
        {
            /* When evicting a random key, we try to evict a key for
             * each DB, so we use the static 'next_db' variable to
             * incrementally visit all DBs. */
            for (i = 0; i < server.dbnum; i++) {
                j = (++next_db) % server.dbnum;
                db = server.db+j;
                dict = (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ?
                        db->dict : db->expires;
                if (dictSize(dict) != 0) {
                    de = dictGetRandomKey(dict);
                    bestkey = dictGetKey(de);
                    bestdbid = j;
                    break;
                }
            }
        }

        /* Finally remove the selected key. */
        if (bestkey) {
            db = server.db+bestdbid;
            robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
            /* We compute the amount of memory freed by db*Delete() alone.
             * It is possible that actually the memory needed to propagate
             * the DEL in AOF and replication link is greater than the one
             * we are freeing removing the key, but we can't account for
             * that otherwise we would never exit the loop.
             *
             * Same for CSC invalidation messages generated by signalModifiedKey.
             *
             * AOF and Output buffer memory will be freed eventually so
             * we only care about memory used by the key space. */
            delta = (long long) zmalloc_used_memory();
            latencyStartMonitor(eviction_latency);
            if (server.lazyfree_lazy_eviction)
                dbAsyncDelete(db,keyobj);
            else
                dbSyncDelete(db,keyobj);
            latencyEndMonitor(eviction_latency);
            latencyAddSampleIfNeeded("eviction-del",eviction_latency);
            delta -= (long long) zmalloc_used_memory();
            mem_freed += delta;
            server.stat_evictedkeys++;
            signalModifiedKey(NULL,db,keyobj);
            notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
                keyobj, db->id);
            propagateDeletion(db,keyobj,server.lazyfree_lazy_eviction);
            decrRefCount(keyobj);
            keys_freed++;

            if (keys_freed % 16 == 0) {
                /* When the memory to free starts to be big enough, we may
                 * start spending so much time here that is impossible to
                 * deliver data to the replicas fast enough, so we force the
                 * transmission here inside the loop. */
                if (slaves) flushSlavesOutputBuffers();

                /* Normally our stop condition is the ability to release
                 * a fixed, pre-computed amount of memory. However when we
                 * are deleting objects in another thread, it's better to
                 * check, from time to time, if we already reached our target
                 * memory, since the "mem_freed" amount is computed only
                 * across the dbAsyncDelete() call, while the thread can
                 * release the memory all the time. */
                if (server.lazyfree_lazy_eviction) {
                    if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) {
                        break;
                    }
                }

                /* After some time, exit the loop early - even if memory limit
                 * hasn't been reached.  If we suddenly need to free a lot of
                 * memory, don't want to spend too much time here.  */
                if (elapsedUs(evictionTimer) > eviction_time_limit_us) {
                    // We still need to free memory - start eviction timer proc
                    startEvictionTimeProc();
                    break;
                }
            }
        } else {
            goto cant_free; /* nothing to free... */
        }
    }
    /* at this point, the memory is OK, or we have reached the time limit */
    result = (isEvictionProcRunning) ? EVICT_RUNNING : EVICT_OK;

cant_free:
    if (result == EVICT_FAIL) {
        /* At this point, we have run out of evictable items.  It's possible
         * that some items are being freed in the lazyfree thread.  Perform a
         * short wait here if such jobs exist, but don't wait long.  */
        if (bioPendingJobsOfType(BIO_LAZY_FREE)) {
            usleep(eviction_time_limit_us);
            if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) {
                result = EVICT_OK;
            }
        }
    }

    serverAssert(server.core_propagates); /* This function should not be re-entrant */

    /* Propagate all DELs */
    propagatePendingCommands();

    server.core_propagates = prev_core_propagates;
    server.propagate_no_multi = 0;

    latencyEndMonitor(latency);
    latencyAddSampleIfNeeded("eviction-cycle",latency);

update_metrics:
    if (result == EVICT_RUNNING || result == EVICT_FAIL) {
        if (server.stat_last_eviction_exceeded_time == 0)
            elapsedStart(&server.stat_last_eviction_exceeded_time);
    } else if (result == EVICT_OK) {
        if (server.stat_last_eviction_exceeded_time != 0) {
            server.stat_total_eviction_exceeded_time += elapsedUs(server.stat_last_eviction_exceeded_time);
            server.stat_last_eviction_exceeded_time = 0;
        }
    }
    return result;
}

메모리가 충분히 확보가 될 때까지 반복하면서, 먼저 eviction 을 위한 bestKey를 찾고 이것을 eviction 하게 됩니다. 처음에는 evictionPoolPopulate 라는 함수를 통해서 eviction pool 에 샘플링 데이터를 추가하게 됩니다. 샘플링 방법은 랜덤하게 데이터를 가져와서 idle time 을 구하고 여기서 이 idle 값을 가지고 eviction pool을 채우게 됩니다.

/* This is a helper function for performEvictions(), it is used in order
 * to populate the evictionPool with a few entries every time we want to
 * expire a key. Keys with idle time bigger than one of the current
 * keys are added. Keys are always added if there are free entries.
 *
 * We insert keys on place in ascending order, so keys with the smaller
 * idle time are on the left, and keys with the higher idle time on the
 * right. */

void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {
    int j, k, count;
    dictEntry *samples[server.maxmemory_samples];

    count = dictGetSomeKeys(sampledict,samples,server.maxmemory_samples);
    for (j = 0; j < count; j++) {
        unsigned long long idle;
        sds key;
        robj *o;
        dictEntry *de;

        de = samples[j];
        key = dictGetKey(de);

        /* If the dictionary we are sampling from is not the main
         * dictionary (but the expires one) we need to lookup the key
         * again in the key dictionary to obtain the value object. */
        if (server.maxmemory_policy != MAXMEMORY_VOLATILE_TTL) {
            if (sampledict != keydict) de = dictFind(keydict, key);
            o = dictGetVal(de);
        }

        /* Calculate the idle time according to the policy. This is called
         * idle just because the code initially handled LRU, but is in fact
         * just a score where an higher score means better candidate. */
        if (server.maxmemory_policy & MAXMEMORY_FLAG_LRU) {
            idle = estimateObjectIdleTime(o);
        } else if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
            /* When we use an LRU policy, we sort the keys by idle time
             * so that we expire keys starting from greater idle time.
             * However when the policy is an LFU one, we have a frequency
             * estimation, and we want to evict keys with lower frequency
             * first. So inside the pool we put objects using the inverted
             * frequency subtracting the actual frequency to the maximum
             * frequency of 255. */
            idle = 255-LFUDecrAndReturn(o);
        } else if (server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {
            /* In this case the sooner the expire the better. */
            idle = ULLONG_MAX - (long)dictGetVal(de);
        } else {
            serverPanic("Unknown eviction policy in evictionPoolPopulate()");
        }

        /* Insert the element inside the pool.
         * First, find the first empty bucket or the first populated
         * bucket that has an idle time smaller than our idle time. */
        k = 0;
        while (k < EVPOOL_SIZE &&
               pool[k].key &&
               pool[k].idle < idle) k++;
        if (k == 0 && pool[EVPOOL_SIZE-1].key != NULL) {
            /* Can't insert if the element is < the worst element we have
             * and there are no empty buckets. */
            continue;
        } else if (k < EVPOOL_SIZE && pool[k].key == NULL) {
            /* Inserting into empty position. No setup needed before insert. */
        } else {
            /* Inserting in the middle. Now k points to the first element
             * greater than the element to insert.  */
            if (pool[EVPOOL_SIZE-1].key == NULL) {
                /* Free space on the right? Insert at k shifting
                 * all the elements from k to end to the right. */

                /* Save SDS before overwriting. */
                sds cached = pool[EVPOOL_SIZE-1].cached;
                memmove(pool+k+1,pool+k,
                    sizeof(pool[0])*(EVPOOL_SIZE-k-1));
                pool[k].cached = cached;
            } else {
                /* No free space on right? Insert at k-1 */
                k--;
                /* Shift all elements on the left of k (included) to the
                 * left, so we discard the element with smaller idle time. */
                sds cached = pool[0].cached; /* Save SDS before overwriting. */
                if (pool[0].key != pool[0].cached) sdsfree(pool[0].key);
                memmove(pool,pool+1,sizeof(pool[0])*k);
                pool[k].cached = cached;
            }
        }

        /* Try to reuse the cached SDS string allocated in the pool entry,
         * because allocating and deallocating this object is costly
         * (according to the profiler, not my fantasy. Remember:
         * premature optimization bla bla bla. */
        int klen = sdslen(key);
        if (klen > EVPOOL_CACHED_SDS_SIZE) {
            pool[k].key = sdsdup(key);
        } else {
            memcpy(pool[k].cached,key,klen+1);
            sdssetlen(pool[k].cached,klen);
            pool[k].key = pool[k].cached;
        }
        pool[k].idle = idle;
        pool[k].dbid = dbid;
    }
}

그럼 이제 eviciton 동작은 알았는데, evcition 을 위한 값은 언제 설정하게 될까요? Redis 내부에는 실제 정보를 가지는 redisObject 라는 타입이 있습니다. LRU_BITS 는 24bits 입니다.

typedef struct redisObject {
    unsigned type:4;
    unsigned encoding:4;
    unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
                            * LFU data (least significant 8 bits frequency
                            * and most significant 16 bits access time). */
    int refcount;
    void *ptr;
} robj;

그럼 이 LRU 값은 언제 업데이트가 될까요? 실제로 db.c 파일의 lookupKey 를 보면 거기서 접근할 때 또는 생성시에 LRU_CLOCK이라는 함수를 통해서 업데이트가 되어집니다.

robj *lookupKey(redisDb *db, robj *key, int flags) {
    dictEntry *de = dictFind(db->dict,key->ptr);
    robj *val = NULL;
    if (de) {
        val = dictGetVal(de);
        int force_delete_expired = flags & LOOKUP_WRITE;
        if (force_delete_expired) {
            /* Forcing deletion of expired keys on a replica makes the replica
             * inconsistent with the master. The reason it's allowed for write
             * commands is to make writable replicas behave consistently. It
             * shall not be used in readonly commands. Modules are accepted so
             * that we don't break old modules. */
            client *c = server.in_script ? scriptGetClient() : server.current_client;
            serverAssert(!c || !c->cmd || (c->cmd->flags & (CMD_WRITE|CMD_MODULE)));
        }
        if (expireIfNeeded(db, key, force_delete_expired)) {
            /* The key is no longer valid. */
            val = NULL;
        }
    }

    if (val) {
        /* Update the access time for the ageing algorithm.
         * Don't do it if we have a saving child, as this will trigger
         * a copy on write madness. */
        if (!hasActiveChildProcess() && !(flags & LOOKUP_NOTOUCH)){
            if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
                updateLFU(val);
            } else {
                val->lru = LRU_CLOCK();
            }
        }

        if (!(flags & (LOOKUP_NOSTATS | LOOKUP_WRITE)))
            server.stat_keyspace_hits++;
        /* TODO: Use separate hits stats for WRITE */
    } else {
        if (!(flags & (LOOKUP_NONOTIFY | LOOKUP_WRITE)))
            notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id);
        if (!(flags & (LOOKUP_NOSTATS | LOOKUP_WRITE)))
            server.stat_keyspace_misses++;
        /* TODO: Use separate misses stats and notify event for WRITE */
    }

    return val;
}

그리고 LRU_CLOCK 은 다음과 같이 구현되어 있습니다.

/* Return the LRU clock, based on the clock resolution. This is a time
 * in a reduced-bits format that can be used to set and check the
 * object->lru field of redisObject structures. */
unsigned int getLRUClock(void) {
    return (mstime()/LRU_CLOCK_RESOLUTION) & LRU_CLOCK_MAX;
}

/* This function is used to obtain the current LRU clock.
 * If the current resolution is lower than the frequency we refresh the
 * LRU clock (as it should be in production servers) we return the
 * precomputed value, otherwise we need to resort to a system call. */
unsigned int LRU_CLOCK(void) {
    unsigned int lruclock;
    if (1000/server.hz <= LRU_CLOCK_RESOLUTION) {
        atomicGet(server.lruclock,lruclock);
    } else {
        lruclock = getLRUClock();
    }
    return lruclock;
}

그런데 제가 위에서 Redis LRU는 샘플링을 해서 데이터를 가져와서 이를 처리한다고 했습니다. evictionPoolPopulate 함수였죠. 그래서 Redis의 LRU는 엄밀한 LRU가 아니라 Approximated LRU algorithm을 구현하고 있습니다. 즉 LRU를 위해서 Best Candidate 을 구하는 것이 아니라, 적당히 구해온다는 것입니다.(Redis 는 그냥 Sampling Key를 통해서 가져옵니다.)

Redis 2.x(지금은 6.0 시대!!!) 에서 Redis 3.0으로 넘어가면서 좀 더 실제에 가까운 LRU 알고리즘으로 구현이 바뀌었다고 합니다.(지금 코드는 6.0 기반이라… 좀 더 개선이?) 아래 그림에 대해서 다음과 같이 설명이 되어있습니다.

  • 밝은 회색 밴드는 Eviction 된 object
  • 회색 밴드는 Evcition 되지 않은 object
  • 녹색 밴드는 추가된 object

sampling이 5로 되어있는 것을 10으로 바꾸면 좀 더 좋은 결과를 보여주는데, 이 때 CPU 사용량이 더 늘어난다고 합니다.

LFU MODE

Redis 4.0 부터는 LFU(Least Frequently Used Eviction Mode) 가 추가되었습니다. LRU에 비해서 좀 더 좋은 성능을 보여준다고 하는데요. 이 부분은 다음번에 추가로 설명을 드리도록 하겠습니다. 참고로 hot keys 를 보기위해서는 LFU 모드를 사용해야만 하고 object freq <key> 명령을 통해서 hot key를 대략적으로 찾을 수 있습니다.

Reference

[입 개발] AWS VPC Peering

AWS VPC Peering 을 위해서 체크해야 하는 부분

  1. A VPC 에서 B VPC로 VPC Peering 신청(다른 계정의 VPC도 가능)
  2. B VPC에서는 VPC Peering 수락
  3. B VPC와 통신해야 하는 A VPC의 모든 Subnet 에 B VPC로의 routing 설정
    1. 해당 Subnet 에 할당된 Route Table 들에 추가해 주면 된다.
    2. 항목은 Peering 선택
    3. 예를 들어 A VPC private, public, db 등의 3개의 subnet 이 있을 경우, A private 에서 B private 로만 접근이 필요하면 A private subnect 과 연결된 route table 에 B VPC Peer로 향한 라우팅 정보를 추가하면 된다.
  4. A VPC와 통신해야 하는 B VPC의 모든 Subnect 에 A VPC로의 routing 설정
  5. 각기 접근 가능한 Security Group 설정을 해주어야 한다. 서로 접근이 가능하도록

이는 AWS에서 VPC Peering 은 VPC 단위로 라우팅은 subnet 단위로 이루어지기 때문이다.

[입 개발] MariaDB Connector 와 AWS Aurora

먼저 저는 자바를 잘 모르고 AWS Aurora도 잘 모르고 MariaDB Connector도 잘 모르는 초초보에 자바맹인것을 먼저 밝히고 해당 글을 작성합니다.

지인 분의 서비스가 Aurora RDS Mysql 을 쓰다가 Failover 를 한다고 해서 뭔가 잘못된 정보를 드렸다가, 자세히 보다보니, AWS Aurora의 Manual Scale Up을 하기 위해서는 다음과 같이 하면 된다고 합니다.

https://svrlove.blogspot.com/2019/05/aws-aurora-rds.html

요약하면, Reader를 먼저 Scale Up 하고 Failover 하면 된다고, 그냥 하면 될꺼라고 알려드려서 죄송합니다. 흑…

그런데 그 얘기를 듣고 나서, 다시 들은 얘기가 select 가 Reader로 가고 있다라는 것이었습니다. 이게 제가 듣기에는 완전히 이상한게… Connector 가 단순히 자동으로 Write는 Primary에게, Read는 Replica 에게 쿼리를 전달해 준다면, 굉장히 편리하긴 한데(완전히 다 좋은 건 아니지만…), 이렇게 될 경우 다음과 같이 Replication Lag가 발생하면, Write 후에 Read를 할 경우 새롭게 Write한 데이터가 아니라, 과거의 데이터를 읽을 수 있습니다. 다음과 같이 Primary DB에 A,B가 저장되면 이것은 Replica 에 다시 A,B가 저장되게 되는데 만약 Replica의 처리가 늦어져서 A, B가 아직 Replica에 반영되지 않았다면, 우리가 A를 select 하더라도 Replica에서는 데이터가 없다고 나올 것입니다.

그래서 이런 부분은 설정으로 명시적으로 이루어져야 사용자의 실수할 여지가 줄어들게 됩니다. 그런데 지인 분의 서버는 Select를 자동으로 Replica에서 했다고 하는 것입니다. 아무런 설정 없이…

그래서 먼저 관련 자료를 찾다보니, 인프런의 CTO 이신 창천항로님의 글이 보입니다. 일단 먼저 읽어보시길 권장합니다.

여길 보면 Aurora를 쓰면 @Transactional(readOnly=true) 를 붙여주면 자동으로 Reader 로 연결이 된다는 것입니다. 이걸 보고 제가 외친 한마디는 “이게 말이되?”, 어떻게 Spring JPA 쪽에서 Connection 정보를 알아서 연결이 됨? 이라는 의문에 쌓이게 되었습니다.

기본적으로 JPA에서 DataSource를 하나만 만들어주고 readOnly=true 가 셋팅이 되면 Connection에 Read Only를 설정하는 것으로 알고 있습니다.(그래봤자… DataSource 가 하나면 하나만 와야… 엉?)

그래서 일단 MariaDB Connector 소스를 받아봤습니다.(이때, 좀 더 조사부터 했어야 하는데… 내가 미…) MariaDB Connector는 다음 github에서 받을 수 있습니다. (https://github.com/mariadb-corporation/mariadb-connector-j)

그리고 열심히 소스를 보는데…(솔직히… 봐도 모르는…), 그러다가 먼저 CHANGELOG.md 파일을 보니 Aurora로 기능이 들어온 것들이 있습니다. 그중에 Relesae 1.5.1 관련해서 CONJ-325의 제목이 Aurora host auto-discovery 입니다. 이거다 싶어서 Aurora로 소스 코드를 검색해도 아무런 내용이 없습니다.

그래서 삽질로 spring-cloud-aws 의 JDBC 도 보러 갔다가… 아닌듯하여… 돌아오게 되는… 흑…(몰라~~~ 알수가 없어…)

그런데 지인분이 다음 링크를 던져주면서, LoadBalacing 기능이 Connector에 있다는 것을 알게 되었습니다.(https://mariadb.com/kb/en/failover-and-high-availability-with-mariadb-connector-j/?fbclid=IwAR2EnwLRBGc1T0bQLJTloP9WnisrjM0smV2h4bGa23UcT9Teq55gYVkwctI)

아래 내용을 보시면 이렇게 호스트 주소를 여러개 적어두면, 첫번째는 Primary, 나머지는 Replica로 동작한다는 것을 알 수 있고, @Transactional(readOnly=true) 를 설정하면 readOnly 용 Connection을 가져가서 아까 말한 것과 유사한 상황이 발생한다는 것을 알게 되었습니다.

여기까지 보면, 사실 눈치챈 분들도 계시겠지만, 저는 아주 초초보라, “jdbc:mysql://host/test” 이런 형태의 endpoint 만 사용해 보아서 사실… 전혀 눈치를 못챘던…

그런데, 이제 지인분이 다시 얘기해주시는 게 보이는데, connection url 에 “aurora” 가 포함되어 있다고 하시는 겁니다. 엉, 이게 뭐야… 하면서 위의 endpoint url을 다시보니… “replication” 이라는게 포함되어 있습니다. 그리고 다시 창천항로 님의 글을 보니… 거기도 jdbc url 이 “jdbc:mysql:aurora:…..” 이런식으로 되어있습니다.

그런데 최신 소스에서 아무리 검색해도 aurora라는 파일이나 String이 존재하지가 않습니다.(CHANGLELOG.md 제외) 그런데 replication 으로 검색을 해보니, HaMode.java(./src/main/java/org/mariadb/jdbc/export/HaMode.java) 라는 파일이 발견이 됩니다. 아래 내용을 보면 뭔가 Ha 관련 설정이 있는 것이 보입니다. 위의 jdbc url 의 HaMode 와 관련해서 뭔가 설정을 할 것 같은 부분이 보이는 거죠.

public enum HaMode {
  REPLICATION("replication") {
    public Optional<HostAddress> getAvailableHost(
        List<HostAddress> hostAddresses,
        ConcurrentMap<HostAddress, Long> denyList,
        boolean primary) {
      return HaMode.getAvailableHostInOrder(hostAddresses, denyList, primary);
    }
  },
  SEQUENTIAL("sequential") {
    public Optional<HostAddress> getAvailableHost(
        List<HostAddress> hostAddresses,
        ConcurrentMap<HostAddress, Long> denyList,
        boolean primary) {
      return getAvailableHostInOrder(hostAddresses, denyList, primary);
    }
  },
  LOADBALANCE("load-balance") {
    public Optional<HostAddress> getAvailableHost(
        List<HostAddress> hostAddresses,
        ConcurrentMap<HostAddress, Long> denyList,
        boolean primary) {
      // use in order not blacklisted server
      List<HostAddress> loopAddress = new ArrayList<>(hostAddresses);
      loopAddress.removeAll(denyList.keySet());
      Collections.shuffle(loopAddress);

      return loopAddress.stream().filter(e -> e.primary == primary).findFirst();
    }
  },

그리고 HaMode.java 에서 from이란 함수를 보면 다음과 같습니다. 이게 분명히 aurora 라는 문자가 있으면 HaMode 로 바꿔줘야 하는데 그런게 없네요.

  public static HaMode from(String value) {
    for (HaMode haMode : values()) {
      if (haMode.value.equalsIgnoreCase(value) || haMode.name().equalsIgnoreCase(value)) {
        return haMode;
      }
    }
    throw new IllegalArgumentException(
        String.format("Wrong argument value '%s' for HaMode", value));
  }

그래서 해당 github에서 해당 파일의 History를 뒤지다 보니 뭔가 이상합니다. 파일의 기록이 특정 시점 이전이 없는… -_- 그리고 버전을 보니 master branch 는 3.0.x 입니다. 그런데 주변에서 많이 사용하는 건 2.7.x네요. 급히 소스를 바꾸고 검색을 해보니… HaMode.java 가 옮겨졌는데 다음 위치에 있습니다. 우리가 찾던 AURORA가 있는 것이 보입니다. (./src/main/java/org/mariadb/jdbc/internal/util/constant/HaMode.java)

public enum HaMode {
  AURORA,
  REPLICATION,
  SEQUENTIAL,
  LOADBALANCE,
  NONE
}

그럼 관련 호스트 목록은 어디서 가져오는가? 2.7.x대에는 ./src/main/java/org/mariadb/jdbc/internal/failover/impl/AuroraListener.java 라는 파일이 존재하고 AuroraListener 에서 information_schema.replica_host_status 테이블에서 server_id에서 endpoint를 생성할 수 있습니다.

  private List<String> getCurrentEndpointIdentifiers(Protocol protocol) throws SQLException {
    List<String> endpoints = new ArrayList<>();
    try {
      proxy.lock.lock();
      try {
        // Deleted instance may remain in db for 24 hours so ignoring instances that have had no
        // change
        // for 3 minutes
        Results results = new Results();
        protocol.executeQuery(
            false,
            results,
            "select server_id, session_id from information_schema.replica_host_status "
                + "where last_update_timestamp > now() - INTERVAL 3 MINUTE");
        results.commandEnd();
        ResultSet resultSet = results.getResultSet();

        while (resultSet.next()) {
          endpoints.add(resultSet.getString(1) + "." + clusterDnsSuffix);
        }

        // randomize order for distributed load-balancing
        Collections.shuffle(endpoints);

      } finally {
        proxy.lock.unlock();
      }
    } catch (SQLException qe) {
      logger.warning("SQL exception occurred: " + qe.getMessage());
      if (protocol.getProxy().hasToHandleFailover(qe)) {
        if (masterProtocol == null || masterProtocol.equals(protocol)) {
          setMasterHostFail();
        } else if (secondaryProtocol.equals(protocol)) {
          setSecondaryHostFail();
        }
        addToBlacklist(protocol.getHostAddress());
        reconnectFailedConnection(new SearchFilter(isMasterHostFail(), isSecondaryHostFail()));
      }
    }

    return endpoints;
  }

그래서 결론부터 말하자면 jdbc url 의 endpoint 에 “jdbc:mysql:aurora” 라고 설정을 하면 mariadb connector 가 내부적으로 저런 작업을 다 해주게 됩니다. 그게 아니라면 HaMode의 다른 값을 보고 적절히 설정하시면 원하는 형태로 나눌 수 가 있게 됩니다. 코드를 대충 보셨으니 저 값을 쓰면 어떻게 되겠구나라고 보시면 될듯 합니다. 참고로 저런 정책을 파싱하는 부분은 ./src/main/java/org/mariadb/jdbc/UrlParser.java 를 보시면 잘 나와있습니다.

즉 aurora를 안 붙이면, 저렇게 동작하지 않는다는 얘기…

그런데… One More Thing…

MariaDB Connector 가 있고, Mysql Connector 가 있습니다. Mysql Connector는 과연 같은 걸 지원할까요? 그래서 Mysql Connector 소스도 까 보았는데, 재밌는건 “jdbc:mysql:[replication|loadbalance|failover]” 는 공통으로 지원이 됩니다. 그런데 Aurora는 안보이더군요.

사실 이 두 개의 Driver를 바꾸면 다음과 같이 Timestamp나 여러가지 다른 이슈들이 있을 수 있으므로 확인이 필요합니다.

다 끝나고 나서, 좋은 레퍼런스들이 나오기 시작했는데, 기계인간 이종립 님의 문서도 좋았습니다.

솔직히 저는 저런 문법을 처음 보았기 때문에 흑… 이해를 잘 못했는데, 다른 분들은 이미 다 아실듯한…

참고로 최종적으로 @Transactional(readOnly=True) 일 경우 Connection.setReadOnly를 호출하게 되는데, src/main/java/org/mariadb/jdbc/internal/failover/FailoverProxy.java 를 보게 되면 해당 함수가 호출되면 다음과 같은 작업을 하게 됩니다.

  @Override
  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
      ......
      case METHOD_SET_READ_ONLY:
        this.listener.switchReadOnlyConnection((Boolean) args[0]);
      case METHOD_GET_READ_ONLY:
        return this.listener.isReadOnly();
      ......
   }

위의 코드를 보면 listener의 switchReadOnlyConnection 를 호출하는 것을 보실 수 있습니다.

switchReadOnlyConnection 은 Connection의 종류에 따라서 달라지는데, MasterFailoverListener 이냐, MasterReplicasListener.java 냐에 따라서 다르게 구현되어 있습니다.

MasterReplicasListener가 우리가 원하는 read-only(secondary)로 바꾸어 주는 부분이므로 src/main/java/org/mariadb/jdbc/internal/failover/impl/MastersReplicasListener.java 를 확인해봅니다.

  /**
   * Switch to a read-only(secondary) or read and write connection(master).
   *
   * @param mustBeReadOnly the read-only status asked
   * @throws SQLException if operation hasn't change protocol
   */
  @Override
  public void switchReadOnlyConnection(Boolean mustBeReadOnly) throws SQLException {
    checkWaitingConnection();
    if (currentReadOnlyAsked != mustBeReadOnly) {
      proxy.lock.lock();
      try {
        // another thread updated state
        if (currentReadOnlyAsked == mustBeReadOnly) {
          return;
        }
        currentReadOnlyAsked = mustBeReadOnly;
        if (currentReadOnlyAsked) {
          if (currentProtocol == null) {
            // switching to secondary connection
            currentProtocol = this.secondaryProtocol;
          } else if (currentProtocol.isMasterConnection()) {
            // must change to replica connection
            if (!isSecondaryHostFail()) {
              try {
                // switching to secondary connection
                syncConnection(this.masterProtocol, this.secondaryProtocol);
                currentProtocol = this.secondaryProtocol;
                // current connection is now secondary
                return;
              } catch (SQLException e) {
                // switching to secondary connection failed
                if (setSecondaryHostFail()) {
                  addToBlacklist(secondaryProtocol.getHostAddress());
                }
              }
            }
            // stay on master connection, since replica connection is fail
            FailoverLoop.addListener(this);
          }
        } else {
          if (currentProtocol == null) {
            // switching to master connection
            currentProtocol = this.masterProtocol;
          } else if (!currentProtocol.isMasterConnection()) {
            // must change to master connection
            if (!isMasterHostFail()) {
              try {
                // switching to master connection
                syncConnection(this.secondaryProtocol, this.masterProtocol);
                currentProtocol = this.masterProtocol;
                // current connection is now master
                return;
              } catch (SQLException e) {
                // switching to master connection failed
                if (setMasterHostFail()) {
                  addToBlacklist(masterProtocol.getHostAddress());
                }
              }
            } else if (urlParser.getOptions().allowMasterDownConnection) {
              currentProtocol = null;
              return;
            }

            try {
              reconnectFailedConnection(new SearchFilter(true, false));
              handleFailLoop();

            } catch (SQLException e) {
              // stop failover, since we will throw a connection exception that will close the
              // connection.
              FailoverLoop.removeListener(this);
              HostAddress failHost =
                  (this.masterProtocol != null) ? this.masterProtocol.getHostAddress() : null;
              throwFailoverMessage(
                  failHost, true, new SQLException("master connection failed"), false);
            }

            if (!isMasterHostFail()) {
              // connection established, no need to send Exception !
              // switching to master connection
              try {
                syncConnection(this.secondaryProtocol, this.masterProtocol);
                currentProtocol = this.masterProtocol;
              } catch (SQLException e) {
                // switching to master connection failed
                if (setMasterHostFail()) {
                  addToBlacklist(masterProtocol.getHostAddress());
                }
              }
            } else {
              currentReadOnlyAsked = !mustBeReadOnly;
              HostAddress failHost =
                  (this.masterProtocol != null) ? this.masterProtocol.getHostAddress() : null;
              throwFailoverMessage(
                  failHost, true, new SQLException("master connection failed"), false);
            }
          }
        }
      } finally {
        proxy.lock.unlock();
      }
    }
  }