[입 개발] bits로 시간을 얼마나 표현할 수 있을까?

갑자기 bit를 얼마나 할당하는가에 따라서 얼만큼의 시간을 표현할 수 있을까라는 생각이 들었습니다. 흔히 unix timestamp 라고 해서 1970년 1월 1일 0시 부터 현재까지를 Second 단위로 표현한 것입니다. unix timestamp 는 4 bytes 로 표현되고, 이를 2038년 1월 19일 03:14:07 에 overflow 가 난다고 합니다. 그럼 이를 어떻게 계산할 수 있을까요? 사실 원리는 간단합니다. 일단 다음과 같이 시간을 정리해 봅시다. (윤달은 일단 빼고 계산합니다.)

1 minute60 seconds
1 hour60 * 60 = 3600 seconds
1 day60 * 60 * 24 = 86400 seconds
1 weak60 * 60 * 24 * 7= 604800 seconds
1 year60 * 60 * 24 * 365 = 31536000 seconds

이제 이런 시간 테이블이 있으니 생각해보면 4 bytes는 32 bits 이므로 2^32 승입니다. 이것을 간단히 구하면 다음과 같습니다.

>>> pow(2, 32) / 60 / 60 / 24
49710.26962962963
>>> pow(2, 32) / 60 / 60 / 24 / 365
136.19251953323186

4 bytes 로는 Day 로는 49710일, Year 로는 136년을 커버할 수 있습니다. 이러면 1970 + 136 해서 2106년이 나옵니다. 뭔가 이상하지 않나요? 위에서 제가 2038년에 1월 19일 이라고 했는데, 왜 2106년이 나올까요? 사실 이건 간단합니다. unix timestamp 는 time_t(부호가 있는 4 bytes)를 이용하기 때문입니다. 그래서 실제로 32 bits가 아니라 31 bits만 사용하게 됩니다. 31bit 로 계산하면 다음과 같이 68년 1970 + 68년 해서 2038년에 만료가 되는 것입니다.

>>> pow(2, 31) / 60 / 60 / 24
24855.134814814814
>>> pow(2, 31) / 60 / 60 / 24 / 365
68.09625976661593

그렇다면 이제 이걸 위해서 시간 값을 사용하면 bit를 얼마나 할당해야 할까요? unix timestamp 는 seconds 단위지만, 현재는 milliseconds 단위는 최소 커버해야 합니다. 여기서 공식은 다음과 같습니다.

>>> pow(2, N) / 1000 / 60 / 60 / 24 / 365

계산해보면 다음과 같습니다.

BitsYear
37 bits4.35
38 bits8.71
39 bits17.43
40 bits34.86
41 bits69.73
42 bits139.46
43 bits278.92

보통 자신만의 unique key를 만들기 위해서 시간 값을 집어넣는데, 현재 시간을 epoch을 잡는다면 위와 같이 사용이 가능합니다. 서비스가 얼마나 갈지는 모르지만, 최소 39 bit 는 되어야 key 사용이 가능할듯 하네요. 안전하게 할려면 40 bit 이상은 가야할듯 합니다. twitter 의 snowflake 는 42 bit를 사용하고 있습니다.

[입개발] Spark Kafka Streaming 에서의 BackPressure 에 대한 아주 간단한 정리.

Kafka Streaming 처리를 할 때, 자주 문제가 되는 부분은 보통 다음과 같은 것들이 있습니다.

  • 잘못된 입력의 데이터
  • 갑자기 들어온 많은 양의 데이터

잘못된 입력의 데이터는 각자 잘 알아서 처리를 하면되지만? 갑자기 들어오는 많은 양의 데이터는 어떻게 해야할까요? 간단한 방법은 처리하는 서버 대수를 늘리는 것입니다. 그런데 평소에는 2대면 충분한데, 피크때는 10대 정도가 필요한 상황이면 어떻게 해야할까요? 방법은 간단합니다. 그냥 항상 10대를 돌리면 됩니다. 돈만 많다면…(그러나 저는 거지입니다. T.T)

두번째 방법은 Spark의 Auto Scaling 을 이용하는 방법입니다. spark.streaming.dynamicAllocation 설정을 이용해서 Scaling이 가능합니다. 아래와 같은 설정을 이용합니다.(이 방법은 DStream을 사용할때만 유용하고, Structured Streaming 하고는 상관없을 수 있습니다.)

spark.dynamicAllocation.enabled=false
spark.streaming.dynamicAllocation.enabled=true
spark.streaming.dynamicAllocation.minExecutors=2
spark.streaming.dynamicAllocation.maxExecutors=8

#default
spark.streaming.dynamicAllocation.scalingInterval=60
spark.streaming.dynamicAllocation.scaleUpRatio=0.9
spark.streaming.dynamicAllocation.scaleDownRatio=0.3

원래 spark의 Dynamic Resource Allocation(DRA)이 Streaming 과는 상성이 맞지 않았습니다. Spark의 DRA는 Idle Timedㅡㄹ 체크하는데 Spark의 Streaming은 Micro 배치라 계속 일정시간 마다 작업을 하게되어서 Executor를 놓지 않아서, 늘어는 나도 줄어들지 않게 됩니다.

그래서 Spark 2.x에 나온것이 spark.streaming.dynamicAllocation 이 생겼습니다. spark.streaming.dynamicAllocation 정책은 처음부터 리소스를 전부 가져가지 않고, 다음과 같이 ratio를 계산해서 이를 이용하게 됩니다.

ratio = processing time / processing interval
  • ratio > spark.streaming.dynamicAllocation.scaleUpRatio => executor 1개 추가
  • ratio < spark.streaming.dynamicAllocation.scaleDownRatio => executor 1개 제거

이 값이 minExecutors, maxExecutors 설정의 최대치 까지 적용이 됩니다. 다만 이처리는 ExecutorAllocationManager 에서 처리되는데, 하나의 배치 안에서 증가/추가되지 않고, onBatchCompleted 가 호출될때 마다 실행시간 정보가 반영되어서 이번 Micro 배치가 끝나고 다음번에 영향을 주게 됩니다. 즉 하나의 배치 타이밍만에서만 크게 생기면 효과를 보기 힘들지만, 특정 시간동안 이슈가 있다면 서서히 증가해서 서서히 줄어들게 됩니다.

spark streaming dynamicAllocation을 쓸려면 기존의 spark.dynamicAllocation.enabled=false로 설정해야 합니다.

이렇게 Auto Scaling을 설정하면 모든게 해결될 것 같지만, 그렇지 않습니다. 일단 대부분의 Auto Scaling은 min/max 설정이 있습니다. 이 얘기는 해당 수준보다 더 데이터가 들어오면 결국은 뭔가 데이터 처리에 문제가 장애가 발생할 수 있다는 얘기가 됩니다. 그리고 비용도 더 든다는 얘깁니다. 그리고 또 문제가 되는것은 위의 Auto Scaling 설정도 위의 설정이 반영되는 것은, 엄청 많은 데이터가 들어온다면, 최소한 한번 그 많은 양을 처리해야 합니다.

그래서 이제 도리어 생각을 살짝 바꿔보면, 그냥 아주 느리게 평소에 처리할 수 있는 양 까지만 처리하면 되지 않을까라는 생각을 할 수 있습니다. 그래서 일정 수준까지만 처리하겠다라는 개념이 BackPressure 입니다.

보통 Spark에서 Kafka Streaming 에서 배치 때 처리하는 양은 지난번 처리한 마지막 offset 에서 현재 마지막 offset 까지를 가져와서 처리를 하게 됩니다. 즉, 한번에 1000개씩 처리하던 배치가 1분만에 10000 개가 들어오면, 그 배치 기간에는 10000개를 처리하게 됩니다. 이런 경우가 발생하는 경우가 다음과 같은 두 가지 경우가 있습니다.

  • offset 정책이 earliest 면서 offset 정보가 없어서 처음으로 시작될 때
  • 장애나, 트래픽이 늘어나서, 마지막 처리한 offset 과 현재의 offset 정보 차이가 많은 경우.

BackPressure를 설정하면, 딱 그만큼만 계속 가져오게 됩니다. 즉 1분마다 처리되는 Streaming 인데, BackPressure가 1000으로 설정되면 1분마다 1000개씩만 처리하게 됩니다. 전체 처리는 늦어지지만, 처리하는 입장에서는 부하가 늘어나지 않습니다. Spark 에서 Kafka Streaming은 DStream 과 Structured Streaming에서 셋팅 방법이 좀 다릅니다.(자세한 옵션을 다루지는 않습니다.)

DStream에서의 설정

spark.streaming.backpressure.enabled=true
spark.streaming.backpressure.initialRate=100000
spark.streaming.receiver.maxRate=100000
spark.streaming.kafka.maxRatePerPartition=20000

Structured Streaming 에서의 설정

코드에서 추가해야 합니다.

option("maxOffsetsPerTrigger", "100000")

Structured Streaming에서 주의할 점

Structured Streaming 에서는 checkpoint를 spark에서 관리를 하면서 다음 Offsets을 미리 만들어두고 진행하게 됩니다. 이게 생기게 되면, 해당 Offset 을 가져오려고 시도를 하고 maxOffsetsPerTrigger가 이 다음에 설정되면 해당 배치에는 적용이 되지 않고 그 다음 배치부터 적용되게 됩니다.(이 때는 최종 commit 된 다음 번호의 offset을 지워주면 됩니다.)

다음 offset 을 계산하는 방법에서 RateLimit을 설정해두는데, 이게 항상 현재 Kafka의 마지막 Offset이 되게 됩니다. 그런데 maxOffsetsPerTrigger 가 설정되면, RateLimit 가 해당 값으로 설정되서, 다음 Offset 이 현재 처리된 Offset + rateLImit 로 설정이 됩니다. 이를 이용해서 일종의 BackPressure가 설정이 되게 되는 것입니다. 해당 내용은 이전 블로그를 참고하세요. [입 개발] Spark Structured Streaming 에서 Offset 은 어떻게 관리되는가(아주 간략한 버전)? | Charsyam’s Blog (wordpress.com)

그런데 DStream 방식과 Structured Streaming에서 방식이 달라지는데, 이와 관련해서 정리를 하다보니 [SPARK-24815] Structured Streaming should support dynamic allocation – ASF JIRA (apache.org) 현재 dynamic allocation이 Structured Streaming 하고는 뭔가 어울리지 않는 것 같습니다. 그런데 또 코드를 보면 설정이 상관이 없는건 아닌듯 한… 코드를 보면 BackPressure 설정도 maxOffsetPerTrigger 는 kafka에만 존재하는 값이라 external/kafka-0-10-sql 안에 존재하고 있습니다.

뭔가… 정리하다 보니 이상하게 끝나버리지만… 대충 이렇습니다.

[입 개발] Redis가 maxmemory 보다 더 썼다가 OOM 에러를 던져요!!!

Redis를 운영하다보면 항상 어려운 문제는 memory 관리입니다. In-Memory Cache 다 보니, 메모리 보다 더 많은 데이터를 써서 swap이 발생하면 해당 메모리 page를 접근할 때 마다 swap out 이 발생해서, 속도에 엄청난 영향을 주게 됩니다. 또한 더 많은 메모리를 쓰면, 메모리 문제로 장애가 발생할 수 가 있습니다. 그런데 오늘 팀에서 다음과 같은 에러가 발생했다고 보고를 해주셨습니다.

redis.clients.jedis.exceptions.JedisDataException: OOM command not allowed when used memory > 'maxmemory'.

Redis는 이런 메모리 관리를 위해서 두 가지 옵션을 제공하고 있습니다. 첫 번째는 maxmemory 설정이고 두 번째는 maxmemory policy 입니다.

maxmemory는 메모리를 이것 이상 사용하지 않도록 설정하는 옵션입니다. 내부적으로 메모리 할당에 zmalloc 이라는 함수를 이용하는데, 이 안에서 메모리 할당 사이즈를 계산하고 이를 이용합니다. 다만 Redis에서 할당을 요청하는 값이기 때문에, 실제 메모리 page는 일반적으로 더 사용하게 되고 이로 인해 실제 물리 메모리 사용량은 계산한 값보다 훨씬 더 많이 사용하고 있을 수 있습니다. 이를 통해서 필요할 때, 사용하던 메모리를 반납해서 메모리 용량을 확보하게 되는데 이를 eviction 이라고 합니다.

maxmemory-policy 는 메모리 사용량이 maxmemory 보다 커졌을 때 어떤 정책을 취할 것인가를 정해둔 정책입니다. 다음과 같은 정책 값들이 있습니다.

정책명내용
noevictioneviction 작업을 하지 않고, 바로 write 작업시에 에러를 리턴한다.
volatile-lruexpire set(expire를 건 키집합) 에 대해 LRU 방식을 이용해서 키를 제거한다. 정확한 LRU 방식이 아니라 유사한 방식을 사용
volatile-lfuexpire set 에 대해 LFU 유사한 방식을 이용해서 키를 제거한다.
volatile-randomexpire set 에 대해 랜덤하게 키를 제거한다.
volatile-ttlexpire set에 대해서 ttl이 적게 남은 순으로 키를 제거한다.
allkeys-lru모든 키에 대해서 LRU 유사한 방식으로 키를 제거한다.
allkeys-lfu모든 키에 대해서 LFU 유사한 방식으로 키를 제거한다.
allkeys-random모든 키에 대해서 랜덤하게 키를 제거한다.

여기서 LRU는 Least Recently Used, LFU는 Least Frequently Used 입니다.

각각 conf 에 다음과 같이 설정이 가능합니다.

maxmemory <bytes>
maxmemory-policy <policy>

그럼 Redis 는 언제 이 eviction을 실행하려고 할까요? eviction이 발생하는 사용자가 Command를 실행하려고 할 때 입니다. 만약 현재 사용중인 메모리(used memory)가 maxmemory 설정보다 크다면 발생하게 됩니다. 먼저 processCommand 함수를 살펴봅시다.

int processCommand(client *c) {
    ......
    int is_denyoom_command = (c->cmd->flags & CMD_DENYOOM) ||
                             (c->cmd->proc == execCommand && 
                             (c->mstate.cmd_flags & CMD_DENYOOM));

    ......
        /* Handle the maxmemory directive.
     *
     * Note that we do not want to reclaim memory if we are here re-entering
     * the event loop since there is a busy Lua script running in timeout
     * condition, to avoid mixing the propagation of scripts with the
     * propagation of DELs due to eviction. */
    if (server.maxmemory && !server.lua_timedout) {
        int out_of_memory = (performEvictions() == EVICT_FAIL);
        /* performEvictions may flush slave output buffers. This may result
         * in a slave, that may be the active client, to be freed. */
        if (server.current_client == NULL) return C_ERR;

        int reject_cmd_on_oom = is_denyoom_command;
        /* If client is in MULTI/EXEC context, queuing may consume an unlimited
         * amount of memory, so we want to stop that.
         * However, we never want to reject DISCARD, or even EXEC (unless it
         * contains denied commands, in which case is_denyoom_command is already
         * set. */
        if (c->flags & CLIENT_MULTI &&
            c->cmd->proc != execCommand &&
            c->cmd->proc != discardCommand &&
            c->cmd->proc != resetCommand) {
            reject_cmd_on_oom = 1;
        }

        if (out_of_memory && reject_cmd_on_oom) {
            rejectCommand(c, shared.oomerr);
            return C_OK;
        }

        /* Save out_of_memory result at script start, otherwise if we check OOM
         * until first write within script, memory used by lua stack and
         * arguments might interfere. */
        if (c->cmd->proc == evalCommand || c->cmd->proc == evalShaCommand) {
            server.lua_oom = out_of_memory;
        }
    }
    ......
}

위의 코드를 살펴보면 server.maxmemory 값이 설정되어 있고, server.lua_timeout 이 0이어야만 실행이 되게 되어있습니다. 여기서 out_of_memory 변수에 performEDvcitons() 의 결과 값이 EVICT_FAIL 인지 체크하고 해당 이슈가 실패하면, 해당 command 가 is_denyoom_command 인지를 체크해서 is_denyoom_command 라면, shared.oomerr 를 리턴하게 됩니다.

shared.oomerr 는 다음과 같이 정의되어 있습니다. 아까 어디선가 본듯한 문구이죠?

    shared.oomerr = createObject(OBJ_STRING,sdsnew(
        "-OOM command not allowed when used memory > 'maxmemory'.\r\n"));

그럼 이제 performEvictions 함수를 살펴봅시다.

/* Check that memory usage is within the current "maxmemory" limit.  If over
 * "maxmemory", attempt to free memory by evicting data (if it's safe to do so).
 *
 * It's possible for Redis to suddenly be significantly over the "maxmemory"
 * setting.  This can happen if there is a large allocation (like a hash table
 * resize) or even if the "maxmemory" setting is manually adjusted.  Because of
 * this, it's important to evict for a managed period of time - otherwise Redis
 * would become unresponsive while evicting.
 *
 * The goal of this function is to improve the memory situation - not to
 * immediately resolve it.  In the case that some items have been evicted but
 * the "maxmemory" limit has not been achieved, an aeTimeProc will be started
 * which will continue to evict items until memory limits are achieved or
 * nothing more is evictable.
 *
 * This should be called before execution of commands.  If EVICT_FAIL is
 * returned, commands which will result in increased memory usage should be
 * rejected.
 *
 * Returns:
 *   EVICT_OK       - memory is OK or it's not possible to perform evictions now
 *   EVICT_RUNNING  - memory is over the limit, but eviction is still processing
 *   EVICT_FAIL     - memory is over the limit, and there's nothing to evict
 * */
int performEvictions(void) {
    if (!isSafeToPerformEvictions()) return EVICT_OK;

    int keys_freed = 0;
    size_t mem_reported, mem_tofree;
    long long mem_freed; /* May be negative */
    mstime_t latency, eviction_latency;
    long long delta;
    int slaves = listLength(server.slaves);
    int result = EVICT_FAIL;

    if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL) == C_OK)
        return EVICT_OK;

    if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION)
        return EVICT_FAIL;  /* We need to free memory, but policy forbids. */

    unsigned long eviction_time_limit_us = evictionTimeLimitUs();

    mem_freed = 0;

    latencyStartMonitor(latency);

    monotime evictionTimer;
    elapsedStart(&evictionTimer);

    while (mem_freed < (long long)mem_tofree) {
        int j, k, i;
        static unsigned int next_db = 0;
        sds bestkey = NULL;
        int bestdbid;
        redisDb *db;
        dict *dict;
        dictEntry *de;

        if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
            server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
        {
            struct evictionPoolEntry *pool = EvictionPoolLRU;

            while(bestkey == NULL) {
                unsigned long total_keys = 0, keys;

                /* We don't want to make local-db choices when expiring keys,
                 * so to start populate the eviction pool sampling keys from
                 * every DB. */
                for (i = 0; i < server.dbnum; i++) {
                    db = server.db+i;
                    dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ?
                            db->dict : db->expires;
                    if ((keys = dictSize(dict)) != 0) {
                        evictionPoolPopulate(i, dict, db->dict, pool);
                        total_keys += keys;
                    }
                }
                if (!total_keys) break; /* No keys to evict. */

                /* Go backward from best to worst element to evict. */
                for (k = EVPOOL_SIZE-1; k >= 0; k--) {
                    if (pool[k].key == NULL) continue;
                    bestdbid = pool[k].dbid;

                    if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
                        de = dictFind(server.db[pool[k].dbid].dict,
                            pool[k].key);
                    } else {
                        de = dictFind(server.db[pool[k].dbid].expires,
                            pool[k].key);
                    }

                    /* Remove the entry from the pool. */
                    if (pool[k].key != pool[k].cached)
                        sdsfree(pool[k].key);
                    pool[k].key = NULL;
                    pool[k].idle = 0;

                    /* If the key exists, is our pick. Otherwise it is
                     * a ghost and we need to try the next element. */
                    if (de) {
                        bestkey = dictGetKey(de);
                        break;
                    } else {
                        /* Ghost... Iterate again. */
                    }
                }
            }
        }

        /* volatile-random and allkeys-random policy */
        else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
                 server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM)
        {
            /* When evicting a random key, we try to evict a key for
             * each DB, so we use the static 'next_db' variable to
             * incrementally visit all DBs. */
            for (i = 0; i < server.dbnum; i++) {
                j = (++next_db) % server.dbnum;
                db = server.db+j;
                dict = (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ?
                        db->dict : db->expires;
                if (dictSize(dict) != 0) {
                    de = dictGetRandomKey(dict);
                    bestkey = dictGetKey(de);
                    bestdbid = j;
                    break;
                }
            }
        }

        /* Finally remove the selected key. */
        if (bestkey) {
            db = server.db+bestdbid;
            robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
            propagateExpire(db,keyobj,server.lazyfree_lazy_eviction);
            /* We compute the amount of memory freed by db*Delete() alone.
             * It is possible that actually the memory needed to propagate
             * the DEL in AOF and replication link is greater than the one
             * we are freeing removing the key, but we can't account for
             * that otherwise we would never exit the loop.
             *
             * Same for CSC invalidation messages generated by signalModifiedKey.
             *
             * AOF and Output buffer memory will be freed eventually so
             * we only care about memory used by the key space. */
            delta = (long long) zmalloc_used_memory();
            latencyStartMonitor(eviction_latency);
            if (server.lazyfree_lazy_eviction)
                dbAsyncDelete(db,keyobj);
            else
                dbSyncDelete(db,keyobj);
            latencyEndMonitor(eviction_latency);
            latencyAddSampleIfNeeded("eviction-del",eviction_latency);
            delta -= (long long) zmalloc_used_memory();
            mem_freed += delta;
            server.stat_evictedkeys++;
            signalModifiedKey(NULL,db,keyobj);
            notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
                keyobj, db->id);
            decrRefCount(keyobj);
            keys_freed++;

            if (keys_freed % 16 == 0) {
                /* When the memory to free starts to be big enough, we may
                 * start spending so much time here that is impossible to
                 * deliver data to the replicas fast enough, so we force the
                 * transmission here inside the loop. */
                if (slaves) flushSlavesOutputBuffers();

                /* Normally our stop condition is the ability to release
                 * a fixed, pre-computed amount of memory. However when we
                 * are deleting objects in another thread, it's better to
                 * check, from time to time, if we already reached our target
                 * memory, since the "mem_freed" amount is computed only
                 * across the dbAsyncDelete() call, while the thread can
                 * release the memory all the time. */
                if (server.lazyfree_lazy_eviction) {
                    if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) {
                        break;
                    }
                }

                /* After some time, exit the loop early - even if memory limit
                 * hasn't been reached.  If we suddenly need to free a lot of
                 * memory, don't want to spend too much time here.  */
                if (elapsedUs(evictionTimer) > eviction_time_limit_us) {
                    // We still need to free memory - start eviction timer proc
                    if (!isEvictionProcRunning) {
                        isEvictionProcRunning = 1;
                        aeCreateTimeEvent(server.el, 0,
                                evictionTimeProc, NULL, NULL);
                    }
                    break;
                }
            }
        } else {
            goto cant_free; /* nothing to free... */
        }
    }
    /* at this point, the memory is OK, or we have reached the time limit */
    result = (isEvictionProcRunning) ? EVICT_RUNNING : EVICT_OK;

cant_free:
    if (result == EVICT_FAIL) {
        /* At this point, we have run out of evictable items.  It's possible
         * that some items are being freed in the lazyfree thread.  Perform a
         * short wait here if such jobs exist, but don't wait long.  */
        if (bioPendingJobsOfType(BIO_LAZY_FREE)) {
            usleep(eviction_time_limit_us);
            if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) {
                result = EVICT_OK;
            }
        }
    }

    latencyEndMonitor(latency);
    latencyAddSampleIfNeeded("eviction-cycle",latency);
    return result;
}


여기서 먼저 살펴볼 것은 리턴 값입니다. EVICT_OK는 used_memory가 maxmemory 보다 줄어들었거나, 아직 eviction을 수행할 수 가 없다는 것입니다. EVICT_RUNNING은 현재 eviction 이 계속 진행중이라는 뜻입니다. Single Threaded인 Redis가 어떻게 이렇게 동작할 수 있는지는 뒤에서 설명하겠습니다. 그리고 마지막으로 EVICT_FAIL은 여전히 메모리를 maxmemory 보다 많이 사용하지만, 현재 더 eviction해서 메모리를 줄일 데이터가 없다는 뜻입니다. 이 EVICT_FAIL이 나면 Redis는 OOM 에러를 던질 수 있습니다.

코드를 살펴보면 먼저 isSafeToPerformEvictions 함수가 나옵니다. 현재 eviction을 수행할 수 있는 상태인지를 살펴봅니다. 그래서 무시해도 되는 상황이면 eviction을 진행하지 않도록 0을 리턴합니다. 1이면 eviction 을 진행합니다.

/* Check if it's safe to perform evictions.
 *   Returns 1 if evictions can be performed
 *   Returns 0 if eviction processing should be skipped
 */
static int isSafeToPerformEvictions(void) {
    /* - There must be no script in timeout condition.
     * - Nor we are loading data right now.  */
    if (server.lua_timedout || server.loading) return 0;

    /* By default replicas should ignore maxmemory
     * and just be masters exact copies. */
    if (server.masterhost && server.repl_slave_ignore_maxmemory) return 0;

    /* When clients are paused the dataset should be static not just from the
     * POV of clients not being able to write, but also from the POV of
     * expires and evictions of keys not being performed. */
    if (checkClientPauseTimeoutAndReturnIfPaused()) return 0;

    return 1;
}

두번째로는 getMaxmemoryState 를 호출합니다. getMaxmemoryState는 현재 사용한 메모리 정보를 zmalloc_used_memory() 함수를 통해서 가져옵니다.

/* Get the memory status from the point of view of the maxmemory directive:
 * if the memory used is under the maxmemory setting then C_OK is returned.
 * Otherwise, if we are over the memory limit, the function returns
 * C_ERR.
 *
 * The function may return additional info via reference, only if the
 * pointers to the respective arguments is not NULL. Certain fields are
 * populated only when C_ERR is returned:
 *
 *  'total'     total amount of bytes used.
 *              (Populated both for C_ERR and C_OK)
 *
 *  'logical'   the amount of memory used minus the slaves/AOF buffers.
 *              (Populated when C_ERR is returned)
 *
 *  'tofree'    the amount of memory that should be released
 *              in order to return back into the memory limits.
 *              (Populated when C_ERR is returned)
 *
 *  'level'     this usually ranges from 0 to 1, and reports the amount of
 *              memory currently used. May be > 1 if we are over the memory
 *              limit.
 *              (Populated both for C_ERR and C_OK)
 */
int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level) {
    size_t mem_reported, mem_used, mem_tofree;

    /* Check if we are over the memory usage limit. If we are not, no need
     * to subtract the slaves output buffers. We can just return ASAP. */
    mem_reported = zmalloc_used_memory();
    if (total) *total = mem_reported;

    /* We may return ASAP if there is no need to compute the level. */
    int return_ok_asap = !server.maxmemory || mem_reported <= server.maxmemory;
    if (return_ok_asap && !level) return C_OK;

    /* Remove the size of slaves output buffers and AOF buffer from the
     * count of used memory. */
    mem_used = mem_reported;
    size_t overhead = freeMemoryGetNotCountedMemory();
    mem_used = (mem_used > overhead) ? mem_used-overhead : 0;

    /* Compute the ratio of memory usage. */
    if (level) {
        if (!server.maxmemory) {
            *level = 0;
        } else {
            *level = (float)mem_used / (float)server.maxmemory;
        }
    }

    if (return_ok_asap) return C_OK;

    /* Check if we are still over the memory limit. */
    if (mem_used <= server.maxmemory) return C_OK;

    /* Compute how much memory we need to free. */
    mem_tofree = mem_used - server.maxmemory;

    if (logical) *logical = mem_used;
    if (tofree) *tofree = mem_tofree;

    return C_ERR;
}

그리고 혹시나 maxmemory_policy 가 MAXMEMORY_NO_EVICTION이면 eviction을 안하니 바로 EVICT_FAIL 로 리턴합나다. 첫 번째로 OOMERR를 볼 수 있는 상황입니다.(maxmemroy_policy가 noeviction 일때…)

이제는 while 루프를 돌면서 메모리를 해제하게 됩니다. mem_tofree는 used – maxmemory 값 즉, 지금 얼마나 메모리를 해제해야 하는지를 나타내는 값이며, mem_freed 는 현재까지 확보한 메모리 크기입니다.

    while (mem_freed < (long long)mem_tofree) {
        ......
    }

이제 maxmemory-policy 정책에 따라서 조금 달라지게 됩니다. volatile 계열은 expire set 만(expire 로 ttl이 걸린 key들), ALLKEY 는 모든 key들을 대상으로 하기 때문에, 어떤 db에서 이를 처리할지가 결정되게 됩니다. Redis는 내부적으로 expire set을 관리하기 위해서 expires 라는 내부 변수를 가지고 있습니다. 전체 데이터는 dict 안에 있습니다.

Redis 는 eviction 작업을 좀 쉽게 하기 위해서 먼저 일부를 샘플링해서 eviciton 대상 pool을 만들고 이것을 eviction 하게 됩니다.

maxmemory-policy 가 LRU/LFU 종류거나 volatile_ttl 이면 다음과 같이 동작합니다.

현재 발견된 bestkey(eviction 대상) 가 없다면 evictionPoolPopulate 함수를 통해서 대상 풀을 먼저 만들고 여기서 bestkey를 찾아보게 됩니다.

만약에 maxmemory-policy가 RANDOM 계열이면, 정말 랜덤키를 가져와서 지우게 됩니다.

위의 두 경우에 bestkey를 찾는 것이 실패하면 cant_free로 점프하게 되고 EVICT_FAIL을 리턴하게 됩니다. 이 때 LAZY_FREE를 쓰면 조금 다르게 동작할 수 도 있습니다.(LAZY_FREE면 잠시 sleep 후에도 메모리 상황이 used_memory > maxmemory 이면 EVICT_FAIL 아니면 EVICT_OK를 던집니다. LAZY_FREE니 잠시 기다려보고 줄어들면 OK라는 거죠.)

짧게 Redis가 메모리가 부족할 때 eviction을 어떻게 처리하는지를 살펴보았습니다. 뭐 OOM 에러가 난다는 것은, 메모리가 부족한 상황이라는 것이므로, maxmemory-policy 정책을 바꿀 수 있다면… 바꾸건…(데이터가 날아가도 된다면, volatile 말고 allkey 로…) 아니면 메모리를 증설하는 것이 좋은 방법입니다.

[입 개발] Databricks Terraform Provider databricks_aws_s3_mount 와 resource databricks_dbfs_file

  • databricks terraform proivder 에서 databricks_aws_s3_mount 인데 datatbricks_s3_mount 라고 문서에 오타나있음.
  • databricks_dbfs_file 에서 0.2.9 까지는 다음과 같은 옵션이 필수(content_b64_md5)
resource "databricks_dbfs_file" "my_dbfs_file" {
  source = pathexpand("README.md")
  content_b64_md5 = md5(filebase64(pathexpand("README.md")))
  path = "/sri/terraformdbfs/example/README.md"
  overwrite = true
  mkdirs = true
  validate_remote_file = true
}
  • databricks_dbfs_file 에서 0.3.0 부터는 필요없어짐
resource "databricks_dbfs_file" "this" {
  source = "${path.module}/main.tf"
  path = "/tmp/main.tf"
}
  • databricks_aws_s3_mount 는 cluster_id 나 instance_profile 중에 하나가 있어야 하고 cluster_id 가 비면, 클러스터를 생성하고 mount 한 다음 종료한다.
  • databricks_aws_s3_mount 는 해당 cluster와 instance_profile 의 권한으로 mount 를 하므로, 다른 사용자는 해당 내용의 권한이 없으면 볼 수 없다.
  • databricks_dbfs_file 로 추가한 것은 누구나 볼 수 있다.

[입 개발] EMR에서는 sc.addFile, Databricks에서는 그냥 dbfs 폴더를 이용하자.

기존에 특정 파일을 Spark 클러스터에서 쓰기 위해서는 다음과 같은 방법을 이용했습니다. S3에 파일을 올리고 이를 addFile 한 후 해당 경로에 있다고 하고 사용하는 방식입니다.

EMR

val FileName = "FileName.dat"
val S3_PATH = s"s3://abcd/efg/hijk/$FileName"
spark.sparkContext.addFile(S3_PATH)

ReadFile(FileName)

Databricks 로 가면 위의 방식을 더 이상 사용할 수 없습니다. java.io.FileNotFound Exception이 발생하게 됩니다. 이를 위해서 어떻게 해야할까요? dbfs 가 Spark 클러스터에 마운트 된다는 것을 이용합니다. dbfs 에 파일을 올려두고 다음과 같이 사용하면 됩니다. 경로의 매칭이 어떻게 되는지만 주의하시면 됩니다.

Databricks

val FileName = "FileName.dat"
val DBFS_PATH = s"/dbfs/abcd/efg/hijk/$FileName"
ReadFile(DBFS_PATH)

[입 개발] rclone mount 빌드 방법

어쩌다보니 rclone 이라는 프로젝트를 윈도우에서 빌드해야할 일이 생겼습니다.

rclone/rclone: “rsync for cloud storage” – Google Drive, S3, Dropbox, Backblaze B2, One Drive, Swift, Hubic, Wasabi, Google Cloud Storage, Yandex Files (github.com)

그냥 소스를 다음과 같이 받아서 go build 만 하면 짠하고 빌드가 나옵니다.

git clone https://github.com/rclone/rclone
cd rclone
go build

그런데 이렇게 빌드를 하고 나면 해당 외부 스토리즈를 로컬로 mount 하는 mount 기능이 빠져있습니다. 그래서 간단하게 mount 기능 까지 포함해서 어떻게 빌드 하는지를 정리합니다.

먼저 mount 기능이 fuse 를 사용하기 때문에, cgofuse(billziss-gh/cgofuse: Cross-platform FUSE library for Go – Works on Windows, macOS, Linux, FreeBSD, NetBSD, OpenBSD (github.com)) 를 설치해야 합니다. 그리고 cgofuse 는 다시 winfsp 에 의존성이 있습니다. (billziss-gh/winfsp: Windows File System Proxy – FUSE for Windows (github.com))

이 의존성 때문에 winfsp의 설치가 먼저 필요합니다. 그런데 이름에서 부터 알수 있듯이 cgofuse 는 cgo를 씁니다. 그럼 c 코드를 뭔가 사용하는데 winfsp의 경우 꼭 풀 설치로 헤더와 라이브러리가 모두 설치되어야만 cgofuse 가 설치가 됩니다.

그리고 cgofuse가 컴파일러가 필요하기 때문에 mingw64 를 설치할 필요가 있습니다. (http://mingw-w64.org/doku.php/download/mingw-builds) 여기서 받아서 설치해줍니다.

이렇게 설치를 하고 mingw64의 gcc 가 있는 경로를 Path에 추가해 줍니다.

그렇게 잘 설치되었다면 다음으로 가볍게 빌드가 성공하게 됩니다.

set CPATH=C:\Program Files (x86)\WinFsp\inc\fuse
go build -tags cmount

이제 결과물을 잘 실행해보면 됩니다.

[입 개발] Spark Structured Streaming 에서 Offset 은 어떻게 관리되는가(아주 간략한 버전)?

DStream 만 열심히 쓰다가, Structured Streaming 을 강제로 써야할 일이 생겼습니다. 그런데 DStream을 쓸 때는 Offset 을 명확하게 가져오거나 볼 수 있었는데, Structured Streaming 을 보니, 아는게 없었습니다. -_-;;; 분명히 까만건 글씨고 하얀건 바탕인데…

먼저 Structured Streaming 에서 offset 을 지정하는 방법은 startingOffsets 을 지정하는 방법이 있습니다. 보통 “earliest” 와 “latest” 를 지정하지만, 시작 Offset을 명시적으로 지정할 수도 있습니다.

  val stream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaServer)   // comma separated list of broker:host
      .option("subscribe", topic)    // comma separated list of topics
      .option("startingOffsets", "earliest") //earliest, latest
      .load()

earliest 는 현재 Kafka에서 해당 토픽이 가지고 있는 가장 빠른 offset, latest 는 가장 마지막 offset 입니다. 다음과 같이 Offset 을 명시적으로 지정할 수 도 있습니다.

  val stream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaServer)   // comma separated list of broker:host
      .option("subscribe", topic)    // comma separated list of topics
      .option("startingOffsets", """{"test_logs": {"0": 100, "1": 200}""")
      .load()

일단 위의 offset 지정 방법은 checkpoint가 생성되기 전에 최초에만 지정되는 형태입니다. 즉 checkpoint 가 생성되면 위의 staringOffsets는 checkpoint 의 값을 사용하게 됩니다.

일단 Structured Streaming 을 하면 checkpoint 를 사용하게 되어 있습니다. checkpoint 를 설정하면, 다음과 같은 기본 디렉토리 들이 생깁니다.

  • commits – 완료된 batchId 가 저장된다.
    • commits/0, commits/1 이런식으로 파일이 저장된다.
  • metadata
    • 잘 모릅니다.
  • offsets
    • 실제 offsets 이 저장됩니다.
    • offsets/0, offsets/1, offsets/2 이런식으로 Offset 이 저장되어 있다.
  • sources
    • 잘 모릅니다.

일단 잘 모르는 것들을 제외하고 나서 남아있는 offsets 와 commits 입니다. 여기서 offsets 안에는 batchId 가 있는데, 이 파일 안에 실제로 offset이 들어가 있습니다. 다음과 같이 버전 {정보} {오프셋} 형태로 들어가 있습니다. 실제 오프셋은 {topic_name: {partitionId: offset} 구조 입니다. 마지막줄의 내용이, 위에서 보았던, Specific Offset 지정 방법과 동일한 형태인 것을 볼 수 있습니다.

v1
{“batchWatermarkMs”:0,”batchTimestampMs”:1615293540745,”conf”:{“spark.sql.streaming.stateStore.providerClass”:”org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider”,”spark.sql.streaming.join.stateFormatVersion”:”2″,”spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion”:”2″,”spark.sql.streaming.multipleWatermarkPolicy”:”min”,”spark.sql.streaming.aggregation.stateFormatVersion”:”2″,”spark.sql.shuffle.partitions”:”200″}}
{“test_logs”:{“0″:1058826043,”1″:1058853622,”2”:1058871214}}

실제로 이 offset 을 남기는 부분은 external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceInitialOffsetWriter.scala 파일을 보시면 됩니다.

여기서 Offset 이 어떻게 관리되고 처리되는지 살짝 알아보도록 하겠습니다. 먼저 offsets/{nextBatchId} 로 파일이 먼저 생성됩니다. 이 파일이 있으면 현재 offset 부터 nextBatchId 에 있는 offset 까지 처리가 되게 됩니다. 해당 파일이 없으면 latest offset까지가 다음 offset 이 되면서 해당 파일이 만들어지게 됩니다. nextBatchId 는 일종의 WAL(Write ahead Log) 의 역할을 합니다.

그리고 하나의 MicroBatch 가 완료되면 commits 에 해당 batchId 가 생성되면서 완료를 확인합니다. 즉 commits/3 까지 있고 offsets/3 이 있다면, batchId 3번 까지 완료가 되었다는 뜻입니다.

그럼 이제 실제로 예를 한번 들어보겠습니다.

startingOffsets 를 earliest 로 설정하고 해당 값이 100 이고 처음 batch가 시작된다고 하면 offsets/0 파일에 {“test_logs”:{“0”: 200}} 이렇게 저장되어 있다면, 첫 배치에서 Offset 100 ~ Offset 199 까지 처리하게 됩니다. 그래서 혹시나 Offset 을 조작할 일이 있다면, 처리된 commits/{batchId} 를 지우거나, N번째와 N-1번째를 수정해주면 동작할 것 같습니다.(지우기는 해봤는데… 막 수정까지는 하지 안해봤습니다. 다만 Validation 검증 코드가 없는 걸로 봐서 동작할꺼 같습니다. 둘 다 바꾸기는 귀찮아서…)

처음에 살짝 당황했던건 offsets/0 에 있는 값이 실제로 준 offset 보다 이후 값이었는데, 왜 그런지 몰라서 한참 고민을 했습니다. offsets/0 에 있는 값이 DStream 의 untilOffset과 유사하다고 보시면 됩니다.

그리고 여기 offsets 안에 저장하는 값은 KafkaSourceOffset 과 KafkaSourceInitialOffsetWriter(HDFSMetadataLog) 을 보시면됩니다. RateLimit 가 설정이 되면 ./external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala 의 latestOffset 함수를 보면 설정에 따라서 다음 offset이 설정이 됩니다.

삽질(실험?) 과 소스를 보면서 이해한 Offset에 대한 아주 간략한 내용입니다. 위의 설명을 보면 아시겠지만, 위의 내용은 거의 Kafka 전용이고, 다른 것들은 내용이 다를 것으로 보입니다.

[입 개발] Kafka 와 Spark Structured Streaming 에서 checkpoint 에서 아주 과거의 Offset이 있으면 어떻게 동작할까?

최근에 Kafka와 Spark Structured Streaming 코드를 작성할 일이 있어서 작업을 하는데, 이상한 에러를 만났습니다. 그렇습니다. 여러분 저는 Spark Structured Streaming 이 처음이라… 흑 초초보의 슬픔이네요.

에러는 다음과 같았습니다. empty 인 DataFrame을 쓸 수 없다는 것인데요.

21/03/05 07:14:04 ERROR MicroBatchExecution: Query [id = c80a0c8a-b6fe-4961-ae6c-a79a880cb369, runId = e9993a73-6579-4cd9-9ab4-fdbd2995ae6d] terminated with error
org.apache.spark.sql.AnalysisException: 
Datasource does not support writing empty or nested empty schemas.
Please make sure the data schema has at least one or more column(s).
         ;
	at org.apache.spark.sql.execution.datasources.DataSource$.validateSchema(DataSource.scala:950)
	at org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:595)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:437)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:421)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:294)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:884)

분명히 카프카에는 데이터가 계속 들어오고 있는 상황인데, 이유를 알 수 없어서, 몇 시간을 날렸는데… 바꾼 다른 코드의 문제인가 싶어서 이것저것 바꿔보고, 컴파일 And 실행의 반복만…(플랫폼을 EMR에서 Databricks로 그리고 Spark 2.x 에서 3.0.1로 바꾸고 Scala 버전도 2.11 에서 2.12로 바뀌면서 이래저래 이슈가 좀 있었는데… 그 탓인줄…)

그러다가 우연히… CheckPoint를 지우고 나니… 정상적으로 실행이 잘 되는것이었습니다. 그 때 든 생각은 아, 뭔가 Offset에 문제가 있구나라는 생각이 들었습니다. 사실 이 문제가 발생하는 조건을 분석한 다음에 보면, 어떻게 보면 당연한 문제인데요. 문제가 되었던 상황은 같은 토픽으로 checkpoint 가 만들어졌는데, 제가 게을러서 Kafka Retention 기간이 지나서 다시 실행했던 것이었습니다.

즉 문제 상황은, 이미 checkpoint 에는 그 시점의 Offset 이 들어가 있는데, Kafka에서는 이미 Retention 정책으로 인해서 그 부분의 데이터는 없어진 상황인거죠.(checkpoint는 지정한 위치의 offsets 디렉토리 안에 batchId가 생기고 그 안에 있습니다.)

그런데 기본적으로 Spark에서 Kafka가 Offset 을 아주 과거로 주면, earlist 로 동작하게 되어있습니다. (소스는 보다가 아직 이해가 안되서 이걸 정확하게 설명할 정도로 보지는 못했습니다. 나중에 이해되면 추가할께요 관심이 있으시면, 다음 코드들을 보시면 좋을듯 합니다.





sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala

그런데 이게 CheckPoint 안에 들어가 있는 Offset 은 그냥 그대로 처리하는 것으로 보입니다. 그럼 결론부터 생각하면 이게 문제가 되는가? 라고 하면, 큰 문제는 아닙니다. 위에서 에러가 발생한 것은 데이터가 0인 DataFrame 을 쓸려고 해서 생긴문제입니다.

다시 문제로 돌아가서 그렇다면 위와 같은 상황에서 데이터는 어떻게 처리가 될까요? 일단 조건으로 백프레셔 설정은 없다고 가정합니다. 그리고 checkpoint 에 저장되어 있던 값이 10001 이고 Kafka Log Retention 에 의해서 중간에 10001~70000 까지가 사라지고,현재 earliest가 70001 이라고 가정합니다. 그리고 현재 latest 는 100000 이라고 하겠습니다. 그러면 다음과 같이 MicroBatch 가 발생하게 됩니다. 그러면 다음과 같이 처리가 되게 됩니다.

BatchIDCount비고
N0존재하지 않는 10001~70000 까지의 처리량
N+130000earliest 부터 현재 시간까지 들어온 양 30000
N+2들어오는 만큼원래 스트리밍에서 처리해야 할 양

다행인 부분은 첫 부분에서 0으로 처리가 되기는 하지만, 그 다음 부터는 다시 정상(?)적으로 커버가 되게 된다는 점입니다.

다시 결론부터 얘기하자면, 위의 상황은 정상적으로 생각하면 발생하면 안되는 상황입니다. Kafka Retention 정책보다 뒤에 처리 스트리밍 코드를 돌린다는 것은 결국 Data Loss 상황인데, 이건 어떻게 보면 심각한 문제입니다. 저 같은 경우에는 개발 과정에서, 휴가 다녀온 다음에 실행해서 발생한 이슈였습니다. 물론 지식이 부족하기도 하지만요. 그래도 덕분에 카프카와 Spark Structured Streaming 에서 offset 이 어떻게 처리되는지 살짝 속내를 볼 수 있었네요.(휴, 왜 저만 이런일들이 벌어지는지…)

[입 개발] HikariCP 는 왜 나를 물먹이는가…

HikariCP는 왜 저를 물먹이는걸까요?… 정답은 제가 못나서 입니다. 흑흑흑, 오늘은 HikariCP를 사용하다가 겪는 일반적인 상황과는 전혀 상관없이 그냥 제가 겪은… 삽질을 공유하려고 합니다.

HikariCP를 아주 특이하게 사용하고 있었습니다. 그런데, 문제는 제가 자바맹이라서 HikariCP에 대한 지식이 일도 없다는 거죠. 히카리는 빛이라는 일본어라는 것만 알고 있습니다.(역시 H2!!!)

코드를 간략하게 정리하면 다음과 같은 형태로 사용하고 있었습니다. write가 가능한 DB 유저와 read가 가능한 DB 유저로 구분해서 사용하고 있었습니다. 아래와 같은 구조는 지양하셔야 합니다. 몰랐던거라…

    val dbConf = new HikariConfig()
    dbConf.setUsername(writeUser)
    dbConf.setPassword(writeUserPass)
    dbConf.addDataSourceProperty("user", readUser)
    dbConf.addDataSourceProperty("password", readUserPass)

그래서 read만 필요한 경우에 다음과 같이 DataSourceProperties를 넘겨서 readUser로 사용하는 케이스 였습니다.

dbConf.getDataSourceProperties

이것 자체가 올바른 사용방법은 아닌데, 이 방법을 혹시나 쓰신다면 HikariCP 3.2.0 이상에서부터만 가능합니다.

그런데 이상하긴 하지만, 이렇게 아주 잘 사용하고 있었습니다. 그런데 갑자기 다른 플랫폼으로 이전을 했더니, 기존에 잘 쓰던 writeUser 대신에 readUser로 접속을 하면서, 기존에 사용하던 것들이 모두 실패하기 시작했습니다.

일단 다음과 같은 형태로 움직이기 시작했습니다.

  1. 각 DB 계정의 권한이 명확한지 확인!
  2. 특정 버전 사이의 코드 변경 확인
  3. 각 플랫폼의 Library 버전 확인

각 DB의 계정은 명확했습니다. 메뉴얼 한 테스트 및, show grants for ‘writeUser’ 를 해봐도 명확하게 기 설정된 정보가 그대로 존재했었습니다. 특정 버전 사이의 코드도 전과 바뀐 것이 없는 것을 확인하자. 이제 생각은 아 HikariCP의 버전이 다르겠구나라는 생각이 들었습니다.

생각이 거기까지 들자, 버전을 확인해보니, 제가 쓰던 버전은 3.4.2 였고, 새로운 플랫폼의 버전은 3.1.0이었습니다. 그런데, 자바맹이다 보니 이걸 당장 분석할 시간은 없고, 일단은 뭔가 꽁수로 처리를 하고, 해당 건은 나중에 자세히 알아봐야지 할려고 하다가… 좀 시간이 나서 HikariCP를 보기 시작했습니다. 사실 3.1.0 -> 3.4.2 사이에 엄청 큰 변화는 제가 본 코드에는 없었습니다.(사실, README도 안보고, 그냥 쭈루룩 넘어간…)

그런데 그러면 왜 이런 문제가 생긴것일까요? 이걸 제대로 이해하기 위해서는 일단 간단히 이 부분에 대한 HikariCP의 구조를 알아야 합니다. 위의 소스에서 보듯이 configuration을 저장하고 있는 것은 HikariConfig 입니다. 그리고 다음과 같이 DataSource를 만들어서 사용하고 있습니다.

val dbConf = HikariConfig()
val ds = new HikariDataSource(dbConf)

그리고 필요한 connection 은 다음과 같이 DataSource 의 getConnection()을 호출합니다.

val connection = ds.getConnection()

그럼 이제 실제 DataSource의 getConnection 이 어떻게 동작하는지 살펴보도록 하겠습니다. 우리가 볼 것은 HikariDataSource인걸 그 위의 소스에서 알 수 있습니다. 그런데 HikariDataSource의 getConnection 은 아주 복잡합니다.(제 기준에서요.)

   @Override
   public Connection getConnection() throws SQLException
   {
      if (isClosed()) {
         throw new SQLException("HikariDataSource " + this + " has been closed.");
      }

      if (fastPathPool != null) {
         return fastPathPool.getConnection();
      }

      // See http://en.wikipedia.org/wiki/Double-checked_locking#Usage_in_Java
      HikariPool result = pool;
      if (result == null) {
         synchronized (this) {
            result = pool;
            if (result == null) {
               validate();
               LOGGER.info("{} - Starting...", getPoolName());
               try {
                  pool = result = new HikariPool(this);
                  this.seal();
               }
               catch (PoolInitializationException pie) {
                  if (pie.getCause() instanceof SQLException) {
                     throw (SQLException) pie.getCause();
                  }
                  else {
                     throw pie;
                  }
               }
               LOGGER.info("{} - Start completed.", getPoolName());
            }
         }
      }

      return result.getConnection();
   }

히카리한 코드입니다. 자세한 내용은 모르겠고, 일단 pool 이 없으면 pool을 생성합니다. new HikariPool(this) 이라는 코드가 보이네요. 그런데 요 생성자를 따라가보면, 많은 것을 하고 있습니다. 일단 생성자를 따라가기 전에 먼저 HikariPool의 구조를 보시면 PoolBase 라는 것을 상속 받고 있습니다. 나머지는 일단 패스~~~~(모르는건, 아는척 넘어갑시다.)

public final class HikariPool extends PoolBase implements HikariPoolMXBean, IBagStateListener {
}

그리고 실제 HikariPool의 생성자를 살펴봅니다. 중요한 부분이 많지만 과감하게 날려버립니다.

public HikariPool(final HikariConfig config)
   {
      super(config);

      this.connectionBag = new ConcurrentBag<>(this);
      this.suspendResumeLock = config.isAllowPoolSuspension() ? new SuspendResumeLock() : SuspendResumeLock.FAUX_LOCK;

      this.houseKeepingExecutorService = initializeHouseKeepingExecutorService();

      checkFailFast();

      ......
   }

이제 checkFailFast() 라는 함수가 보입니다. 여기서 보시면 PoolEntry를 createPoolEntry()라는 함수로 생성하고, 성공하면, connectionBag에 넣어주고 끝입니다.

private void checkFailFast()
   {
      final long initializationTimeout = config.getInitializationFailTimeout();
      if (initializationTimeout < 0) {
         return;
      }

      final long startTime = currentTime();
      do {
         final PoolEntry poolEntry = createPoolEntry();
         if (poolEntry != null) {
            if (config.getMinimumIdle() > 0) {
               connectionBag.add(poolEntry);
               logger.debug("{} - Added connection {}", poolName, poolEntry.connection);
            }
            else {
               quietlyCloseConnection(poolEntry.close(), "(initialization check complete and minimumIdle is zero)");
            }

            return;
         }

         if (getLastConnectionFailure() instanceof ConnectionSetupException) {
            throwPoolInitializationException(getLastConnectionFailure().getCause());
         }

         quietlySleep(SECONDS.toMillis(1));
      } while (elapsedMillis(startTime) < initializationTimeout);

      if (initializationTimeout > 0) {
         throwPoolInitializationException(getLastConnectionFailure());
      }
   }

이 뒤에서 뭔가 동작이 일어나면 복잡하겠지만, 일단 createPoolEntry() 함수를 봅니다. 전에 절 괴롭혔던(전, 항상 괴롭힘을 당합니다. MaxLifeTime 값을 이용하는 것이 보이지만, 우리의 관심은 그쪽은 아닙니다.

   private PoolEntry createPoolEntry()
   {
      try {
         final PoolEntry poolEntry = newPoolEntry();

         final long maxLifetime = config.getMaxLifetime();
         if (maxLifetime > 0) {
            // variance up to 2.5% of the maxlifetime
            final long variance = maxLifetime > 10_000 ? ThreadLocalRandom.current().nextLong( maxLifetime / 40 ) : 0;
            final long lifetime = maxLifetime - variance;
            poolEntry.setFutureEol(houseKeepingExecutorService.schedule(
               () -> {
                  if (softEvictConnection(poolEntry, "(connection has passed maxLifetime)", false /* not owner */)) {
                     addBagItem(connectionBag.getWaitingThreadCount());
                  }
               },
               lifetime, MILLISECONDS));
         }

         return poolEntry;
      }
      catch (ConnectionSetupException e) {
         if (poolState == POOL_NORMAL) { // we check POOL_NORMAL to avoid a flood of messages if shutdown() is running concurrently
            logger.error("{} - Error thrown while acquiring connection from data source", poolName, e.getCause());
            lastConnectionFailure.set(e);
         }
      }
      catch (Exception e) {
         if (poolState == POOL_NORMAL) { // we check POOL_NORMAL to avoid a flood of messages if shutdown() is running concurrently
            logger.debug("{} - Cannot acquire connection from data source", poolName, e);
         }
      }

      return null;
   }

다시 newPoolEntry() 함수를 따라갑니다. 좀 딥하게 내려가지만, 일차선 도로입니다. 빠질 곳이 없지요.

   PoolEntry newPoolEntry() throws Exception
   {
      return new PoolEntry(newConnection(), this, isReadOnly, isAutoCommit);
   }

newPoolEntry는 쉽습니다. newConnection()만 보면 되겠네요. newConnection() 함수에서는 밑에 connection 부분만 보시면 됩니다. username 과 password를 config(HikariConfig 입니다.) 에서 가져와서, username == null 이면, dataSource의 getConnection() 을 없으면 dataSource의 getConnection(username, password)를 호출합니다.

   private Connection newConnection() throws Exception
   {
      final long start = currentTime();

      Connection connection = null;
      try {
         String username = config.getUsername();
         String password = config.getPassword();

         connection = (username == null) ? dataSource.getConnection() : dataSource.getConnection(username, password);
         if (connection == null) {
            throw new SQLTransientConnectionException("DataSource returned null unexpectedly");
         }

         setupConnection(connection);
         lastConnectionFailure.set(null);
         return connection;
      }
      catch (Exception e) {
         if (connection != null) {
            quietlyCloseConnection(connection, "(Failed to create/setup connection)");
         }
         else if (getLastConnectionFailure() == null) {
            logger.debug("{} - Failed to create/setup connection: {}", poolName, e.getMessage());
         }

         lastConnectionFailure.set(e);
         throw e;
      }
      finally {
         // tracker will be null during failFast check
         if (metricsTracker != null) {
            metricsTracker.recordConnectionCreated(elapsedMillis(start));
         }
      }
   }

그럼 이 dataSource는 어디서 온 것일까요? 정체는 무엇일까요? 그걸 확인할려면 아까 HikariPool의 생성자에 있던 super(config); 구문을 따라가야 합니다. HikariPool은 뭘 상속 받았죠? 네! 그렇습니다. PoolBase 로 가봅시다.(그런데 사실 아까 newPoolEntry 부터 이미 PoolBase 였다는 사실은 안 비밀입니다.)

PoolBase(final HikariConfig config)
   {
      this.config = config;

      ......
      initializeDataSource();
   }

PoolBase의 생성자는 크게 딴게 없이 HikariConfig을 config으로 저장하고 initializeDataSource()를 호출하게 됩니다. 이 함수가 재미있는데, driverClass가 설정되었는지 jdbcUrl이 설정되었는지에 따라서 다르게 동작하는데. 일단 처음에는 DataSource가 내부적으로 없기 때문에 생성해야 합니다.

   private void initializeDataSource()
   {
      final String jdbcUrl = config.getJdbcUrl();
      final String username = config.getUsername();
      final String password = config.getPassword();
      final String dsClassName = config.getDataSourceClassName();
      final String driverClassName = config.getDriverClassName();
      final String dataSourceJNDI = config.getDataSourceJNDI();
      final Properties dataSourceProperties = config.getDataSourceProperties();

      DataSource ds = config.getDataSource();
      if (dsClassName != null && ds == null) {
         ds = createInstance(dsClassName, DataSource.class);
         PropertyElf.setTargetFromProperties(ds, dataSourceProperties);
      }
      else if (jdbcUrl != null && ds == null) {
         ds = new DriverDataSource(jdbcUrl, driverClassName, dataSourceProperties, username, password);
      }
      else if (dataSourceJNDI != null && ds == null) {
         try {
            InitialContext ic = new InitialContext();
            ds = (DataSource) ic.lookup(dataSourceJNDI);
         } catch (NamingException e) {
            throw new PoolInitializationException(e);
         }
      }

      if (ds != null) {
         setLoginTimeout(ds);
         createNetworkTimeoutExecutor(ds, dsClassName, jdbcUrl);
      }

      this.dataSource = ds;
   }

ds를 생성하는 DriverDataSource 부분을 확인해 봅시다. 파라매터로 config 의 username 과 password가 넘어갑니다.

   public DriverDataSource(String jdbcUrl, String driverClassName, Properties properties, String username, String password)
   {
      this.jdbcUrl = jdbcUrl;
      this.driverProperties = new Properties();

      for (Entry<Object, Object> entry : properties.entrySet()) {
         driverProperties.setProperty(entry.getKey().toString(), entry.getValue().toString());
      }

      if (username != null) {
         driverProperties.put(USER, driverProperties.getProperty("user", username));
      }
      if (password != null) {
         driverProperties.put(PASSWORD, driverProperties.getProperty("password", password));
      }

      ......
   }

중요하지 않은 부분을 다 날리고 보면, 저는 처음에 버그인줄 알았는데, username, password가 null 이 아닐 때, properties 에 “user”, 와 “password”가 있다면, 가지고 있는 username, password를 디폴트 값으로 셋팅합니다. 즉 property에 “user”, “password”가 설정되어 있다면, 이걸 무조건 우선적으로 쓰게 됩니다. 네네, 즉 둘 다 셋팅이 되면, 여기서 driverProperties는 값을 덮어씌우게 됩니다. 올레!!!, 그렇지 이 부분입니다!!! 라고 말하고 싶지만… 이 코드는 3.1.0 도 동일합니다. 원래 코드를 한번 다시 살펴보시죠.

    val dbConf = new HikariConfig()
    dbConf.setUsername(writeUser)
    dbConf.setPassword(writeUserPass)
    dbConf.addDataSourceProperty("user", readUser)
    dbConf.addDataSourceProperty("password", readUserPass)

그렇습니다. 우리의 코드는 처음부터 둘 다 쓰고 있었습니다. 그런데 위의 부분을 보면… 무조건 readUser, readUserPass 가 writeUser, writeUserPass로 설정된 값을 덮어써야만 합니다. 즉, 이전에도 동작을 안해야 정상이라는 겁니다. 여기서 사실 1차 좌절을 하게 됩니다. “왜 안되지!!!” 보다 더 무서운 “왜 되지!!!” 만나게 된겁니다. 저도 멘붕에 다시 한번 빠집니다.

그런데 여기서 아까 살짝 놓치고 지나간 부분이 생각났습니다. 아마도 여러분들은 다 맞추셨을꺼 같습니다. 전 초초초초초초초보 개발자라 T.T

혹시 아까 newConnection() 함수가 기억나시나요? 다시 가져오면 아래와 같습니다. 실제로 driverProperties가 설정되는 것은 위와 같지만… 우리가 사용하는 것은 아래 함수입니다.

connection = (username == null) ? dataSource.getConnection() : dataSource.getConnection(username, password);

오오 이 부분이 다르지 않을까 하고 3.1.0 소스를 봤더니!!! 당연하게도!!!, 똑 같습니다.(어어어, 이게 아닌데…)

음… 음.. 음… 다시 또 2차 좌절을 하는데…

dataSource 의 getConnection 함수가 다릅니다. 덜덜덜 아까 우리가 봤던 dataSource 는 DriverDataSource 입니다.

3.4.0 – cloned Properties를 만들고 여기에 user 와 password 를 넘겨 받은 값으로 셋팅합니다. null 이면 그냥 이전값 사용.

   public Connection getConnection(final String username, final String password) throws SQLException
   {
      final Properties cloned = (Properties) driverProperties.clone();
      if (username != null) {
         cloned.put("user", username);
         if (cloned.containsKey("username")) {
            cloned.put("username", username);
         }
      }
      if (password != null) {
         cloned.put("password", password);
      }

      return driver.connect(jdbcUrl, cloned);
   }

3.1.0 – 그냥 원래의 getConnection()을 그대로 호출합니다. 이건 뭥미!!!

   public Connection getConnection(String username, String password) throws SQLException
   {
      return getConnection();
   }

실제의 getConnection() 함수는 둘다 다음과 같이 동일합니다. 아까 우리가 만든 driverProperties 를 사용하고 있습니다.

   public Connection getConnection() throws SQLException
   {
      return driver.connect(jdbcUrl, driverProperties);
   }

결론적으로, getConnection함수에서 username, password를 처리하는 부분이 바뀌었기 때문에 3.4.0 에서는 의도한 대로 둘 다 동작을 했지만, 3.1.0 에서는 제대로 동작하지 않았던 것입니다. 자세한 패치는 https://github.com/brettwooldridge/HikariCP/commit/851e2d4592b52a9c367ada2c76f013b1d4e20ac3 를 보시면 됩니다.

코드를 보면 HikariCP의 개발자는 제가 사용한 것과 같은 방법을 쓸 거라고는 생각은 안했고, 이게 맞는 방향이 아니었던 것입니다. 흑… 역시 모르면 삽질하네요 T.T