[입 개발] memcached 의 binary protocol 과 text protocol의 동작은 조금씩 다르다.

보통 가장 유명한 캐시 솔루션을 뽑으라면… Memcached 나 Redis 가 될 가능성이 높습니다.(당연히 다른 좋은 솔루션들도 많지만…) 가장 많이 쓰이는 건 이 두가지가 아닐듯 합니다.

Redis의 경우는 text protocol 만 지원하지만, memcached의 경우는 text와 binary protocol 두 가지를 모두 지원합니다. 실제로 binary protocol을 사용하면 당연히 속도도 더 빠릅니다.

그런데… 당연히 text/binary protocol 두 가지를 지원하다고 하면 두 동작이 모두 동일할꺼라고 생각하기 쉽습니다. 그러나… memcached 에서 text와 binary protocol은 미묘하게 동작이 다른 경우가 있습니다. 그리고 그 대표적인 예가 incr 입니다.

먼저 text protocol에서 “incr” 커맨드의 동작을 살펴보면 다음과 같습니다. 실제 text protocol은 process_command 라는 함수에서 동작하게 됩니다. 소스가 엄청 길어서 중간에 “incr”를 찾아보면 process_arithmetic_command 함수를 호출한다는 것만 알면됩니다.

static void process_command(conn *c, char *command) {

    token_t tokens[MAX_TOKENS];
    size_t ntokens;
    int comm;

    assert(c != NULL);

    MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->rcurr, c->rbytes);

    if (settings.verbose > 1)
        fprintf(stderr, "<%d %s\n", c->sfd, command);

    /*
     * for commands set/add/replace, we build an item and read the data
     * directly into it, then continue in nread_complete().
     */

    c->msgcurr = 0;
    c->msgused = 0;
    c->iovused = 0;
    if (add_msghdr(c) != 0) {
        out_of_memory(c, "SERVER_ERROR out of memory preparing response");
        return;
    }

    ntokens = tokenize_command(command, tokens, MAX_TOKENS);
    if (ntokens >= 3 &&
        ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
         (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {

        process_get_command(c, tokens, ntokens, false);
    } else if ((ntokens == 6 || ntokens == 7) &&
               ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
                (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
                (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
                (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
                (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {

        process_update_command(c, tokens, ntokens, comm, false);

    } else if ((ntokens == 7 || ntokens == 8) && (strcmp(tokens[COMMAND_TOKEN].value, "cas") == 0 && (comm = NREAD_CAS))) {

        process_update_command(c, tokens, ntokens, comm, true);

    } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {

        process_arithmetic_command(c, tokens, ntokens, 1);

    } else if (ntokens >= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "gets") == 0)) {

        process_get_command(c, tokens, ntokens, true);

    } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {

        process_arithmetic_command(c, tokens, ntokens, 0);

    } else if (ntokens >= 3 && ntokens <= 5 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {

        process_delete_command(c, tokens, ntokens);

    } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "touch") == 0)) {

        process_touch_command(c, tokens, ntokens);

    } else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) {

        process_stat(c, tokens, ntokens);
    } else if (ntokens >= 2 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) {
        time_t exptime = 0;
        rel_time_t new_oldest = 0;

        set_noreply_maybe(c, tokens, ntokens);

        pthread_mutex_lock(&c->thread->stats.mutex);
        c->thread->stats.flush_cmds++;
        pthread_mutex_unlock(&c->thread->stats.mutex);

        if (!settings.flush_enabled) {
            // flush_all is not allowed but we log it on stats
            out_string(c, "CLIENT_ERROR flush_all not allowed");
            return;
        }

        if (ntokens != (c->noreply ? 3 : 2)) {
            exptime = strtol(tokens[1].value, NULL, 10);
            if(errno == ERANGE) {
                out_string(c, "CLIENT_ERROR bad command line format");
                return;
            }
        }

        /*
          If exptime is zero realtime() would return zero too, and
          realtime(exptime) - 1 would overflow to the max unsigned
          value.  So we process exptime == 0 the same way we do when
          no delay is given at all.
        */
        if (exptime > 0) {
            new_oldest = realtime(exptime);
        } else { /* exptime == 0 */
            new_oldest = current_time;
        }
        if (settings.use_cas) {
            settings.oldest_live = new_oldest - 1;
            if (settings.oldest_live <= current_time)
                settings.oldest_cas = get_cas_id();
        } else {
            settings.oldest_live = new_oldest;
        }
        out_string(c, "OK");
        return;

    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "version") == 0)) {

        out_string(c, "VERSION " VERSION);

    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "quit") == 0)) {

        conn_set_state(c, conn_closing);

    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "shutdown") == 0)) {

        if (settings.shutdown_command) {
            conn_set_state(c, conn_closing);
            raise(SIGINT);
        } else {
            out_string(c, "ERROR: shutdown not enabled");
        }
    } else if (ntokens > 1 && strcmp(tokens[COMMAND_TOKEN].value, "slabs") == 0) {
        if (ntokens == 5 && strcmp(tokens[COMMAND_TOKEN + 1].value, "reassign") == 0) {
            int src, dst, rv;

            if (settings.slab_reassign == false) {
                out_string(c, "CLIENT_ERROR slab reassignment disabled");
                return;
            }

            src = strtol(tokens[2].value, NULL, 10);
            dst = strtol(tokens[3].value, NULL, 10);

            if (errno == ERANGE) {
                out_string(c, "CLIENT_ERROR bad command line format");
                return;
            }

            rv = slabs_reassign(src, dst);
            switch (rv) {
            case REASSIGN_OK:
                out_string(c, "OK");
                break;
            case REASSIGN_RUNNING:
                out_string(c, "BUSY currently processing reassign request");
                break;
            case REASSIGN_BADCLASS:
                out_string(c, "BADCLASS invalid src or dst class id");
                break;
            case REASSIGN_NOSPARE:
                out_string(c, "NOSPARE source class has no spare pages");
                break;
            case REASSIGN_SRC_DST_SAME:
                out_string(c, "SAME src and dst class are identical");
                break;
            }
            return;
        } else if (ntokens == 4 &&
            (strcmp(tokens[COMMAND_TOKEN + 1].value, "automove") == 0)) {
            process_slabs_automove_command(c, tokens, ntokens);
        } else {
            out_string(c, "ERROR");
        }
    } else if (ntokens > 1 && strcmp(tokens[COMMAND_TOKEN].value, "lru_crawler") == 0) {
        if (ntokens == 4 && strcmp(tokens[COMMAND_TOKEN + 1].value, "crawl") == 0) {
            int rv;
            if (settings.lru_crawler == false) {
                out_string(c, "CLIENT_ERROR lru crawler disabled");
                return;
            }

            rv = lru_crawler_crawl(tokens[2].value);
            switch(rv) {
            case CRAWLER_OK:
                out_string(c, "OK");
                break;
            case CRAWLER_RUNNING:
                out_string(c, "BUSY currently processing crawler request");
                break;
            case CRAWLER_BADCLASS:
                out_string(c, "BADCLASS invalid class id");
                break;
            case CRAWLER_NOTSTARTED:
                out_string(c, "NOTSTARTED no items to crawl");
                break;
            }
            return;
        } else if (ntokens == 4 && strcmp(tokens[COMMAND_TOKEN + 1].value, "tocrawl") == 0) {
            uint32_t tocrawl;
             if (!safe_strtoul(tokens[2].value, &tocrawl)) {
                out_string(c, "CLIENT_ERROR bad command line format");
                return;
            }
            settings.lru_crawler_tocrawl = tocrawl;
            out_string(c, "OK");
            return;
        } else if (ntokens == 4 && strcmp(tokens[COMMAND_TOKEN + 1].value, "sleep") == 0) {
            uint32_t tosleep;
            if (!safe_strtoul(tokens[2].value, &tosleep)) {
                out_string(c, "CLIENT_ERROR bad command line format");
                return;
            }
            if (tosleep > 1000000) {
                out_string(c, "CLIENT_ERROR sleep must be one second or less");
                return;
            }
            settings.lru_crawler_sleep = tosleep;
            out_string(c, "OK");
            return;
        } else if (ntokens == 3) {
            if ((strcmp(tokens[COMMAND_TOKEN + 1].value, "enable") == 0)) {
                if (start_item_crawler_thread() == 0) {
                    out_string(c, "OK");
                } else {
                    out_string(c, "ERROR failed to start lru crawler thread");
                }
            } else if ((strcmp(tokens[COMMAND_TOKEN + 1].value, "disable") == 0)) {
                if (stop_item_crawler_thread() == 0) {
                    out_string(c, "OK");
                } else {
                    out_string(c, "ERROR failed to stop lru crawler thread");
                }
            } else {
                out_string(c, "ERROR");
            }
            return;
        } else {
            out_string(c, "ERROR");
        }
    } else if ((ntokens == 3 || ntokens == 4) && (strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) {
        process_verbosity_command(c, tokens, ntokens);
    } else {
        out_string(c, "ERROR");
    }
    return;
}

다시 process_arithmetic_command를 살펴보면 다음과 같습니다.

static void process_arithmetic_command(conn *c, token_t *tokens, const size_t ntokens, const bool incr) {
    char temp[INCR_MAX_STORAGE_LEN];
    uint64_t delta;
    char *key;
    size_t nkey;

    assert(c != NULL);

    set_noreply_maybe(c, tokens, ntokens);

    if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
        out_string(c, "CLIENT_ERROR bad command line format");
        return;
    }

    key = tokens[KEY_TOKEN].value;
    nkey = tokens[KEY_TOKEN].length;

    if (!safe_strtoull(tokens[2].value, &delta)) {
        out_string(c, "CLIENT_ERROR invalid numeric delta argument");
        return;
    }

    switch(add_delta(c, key, nkey, incr, delta, temp, NULL)) {
    case OK:
        out_string(c, temp);
        break;
    case NON_NUMERIC:
        out_string(c, "CLIENT_ERROR cannot increment or decrement non-numeric value");
        break;
    case EOM:
        out_of_memory(c, "SERVER_ERROR out of memory");
        break;
    case DELTA_ITEM_NOT_FOUND:
        pthread_mutex_lock(&c->thread->stats.mutex);
        if (incr) {
            c->thread->stats.incr_misses++;
        } else {
            c->thread->stats.decr_misses++;
        }
        pthread_mutex_unlock(&c->thread->stats.mutex);

        out_string(c, "NOT_FOUND");
        break;
    case DELTA_ITEM_CAS_MISMATCH:
        break; /* Should never get here */
    }
}

case DELTA_ITEM_NOT_FOUND 를 보면 아이템이 없으면 incr_misses를 증가시키고, NOT_FOUND를 리턴합니다. 즉 아이템이 없으면 단순히 없다고 리턴하는 것입니다.

그런데 incr의 binary protocol에서의 동작을 살펴보면 조금 미묘하게 다릅니다.
일단 binary protocol 에서 incr이 수행될때 최종 함수는 complete_incr_bin 이라는 함수입니다.

static void complete_incr_bin(conn *c) {
    item *it;
    char *key;
    size_t nkey;
    /* Weird magic in add_delta forces me to pad here */
    char tmpbuf[INCR_MAX_STORAGE_LEN];
    uint64_t cas = 0;

    protocol_binary_response_incr* rsp = (protocol_binary_response_incr*)c->wbuf;
    protocol_binary_request_incr* req = binary_get_request(c);

    assert(c != NULL);
    assert(c->wsize >= sizeof(*rsp));

    /* fix byteorder in the request */
    req->message.body.delta = ntohll(req->message.body.delta);
    req->message.body.initial = ntohll(req->message.body.initial);
    req->message.body.expiration = ntohl(req->message.body.expiration);
    key = binary_get_key(c);
    nkey = c->binary_header.request.keylen;

    if (settings.verbose > 1) {
        int i;
        fprintf(stderr, "incr ");

        for (i = 0; i < nkey; i++) {
            fprintf(stderr, "%c", key[i]);
        }
        fprintf(stderr, " %lld, %llu, %d\n",
                (long long)req->message.body.delta,
                (long long)req->message.body.initial,
                req->message.body.expiration);
    }

    if (c->binary_header.request.cas != 0) {
        cas = c->binary_header.request.cas;
    }
    switch(add_delta(c, key, nkey, c->cmd == PROTOCOL_BINARY_CMD_INCREMENT,
                     req->message.body.delta, tmpbuf,
                     &cas)) {
    case OK:
        rsp->message.body.value = htonll(strtoull(tmpbuf, NULL, 10));
        if (cas) {
            c->cas = cas;
        }
        write_bin_response(c, &rsp->message.body, 0, 0,
                           sizeof(rsp->message.body.value));
        break;
    case NON_NUMERIC:
        write_bin_error(c, PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL, NULL, 0);
        break;
    case EOM:
        out_of_memory(c, "SERVER_ERROR Out of memory incrementing value");
        break;
    case DELTA_ITEM_NOT_FOUND:
        if (req->message.body.expiration != 0xffffffff) {
            /* Save some room for the response */
            rsp->message.body.value = htonll(req->message.body.initial);

            snprintf(tmpbuf, INCR_MAX_STORAGE_LEN, "%llu",
                (unsigned long long)req->message.body.initial);
            int res = strlen(tmpbuf);
            it = item_alloc(key, nkey, 0, realtime(req->message.body.expiration),
                            res + 2);

            if (it != NULL) {
                memcpy(ITEM_data(it), tmpbuf, res);
                memcpy(ITEM_data(it) + res, "\r\n", 2);

                if (store_item(it, NREAD_ADD, c)) {
                    c->cas = ITEM_get_cas(it);
                    write_bin_response(c, &rsp->message.body, 0, 0, sizeof(rsp->message.body.value));
                } else {
                    write_bin_error(c, PROTOCOL_BINARY_RESPONSE_NOT_STORED,
                                    NULL, 0);
                }
                item_remove(it);         /* release our reference */
            } else {
                out_of_memory(c,
                        "SERVER_ERROR Out of memory allocating new item");
            }
        } else {
            pthread_mutex_lock(&c->thread->stats.mutex);
            if (c->cmd == PROTOCOL_BINARY_CMD_INCREMENT) {
                c->thread->stats.incr_misses++;
            } else {
                c->thread->stats.decr_misses++;
            }
            pthread_mutex_unlock(&c->thread->stats.mutex);

            write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, NULL, 0);
        }
        break;
    case DELTA_ITEM_CAS_MISMATCH:
        write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, NULL, 0);
        break;
    }
}

아까 전과 비슷한 코드들이 있습니다. 다시 case DELTA_ITEM_NOT_FOUND 를 찾아보면 뭔가 다른 부분을 볼 수 있을 것입니다. else 문 부분은 text_protocol과 동일한데, if 문이 뭔가 추가되었네요.

        if (req->message.body.expiration != 0xffffffff) {
            /* Save some room for the response */
            rsp->message.body.value = htonll(req->message.body.initial);
    
            snprintf(tmpbuf, INCR_MAX_STORAGE_LEN, "%llu",
                (unsigned long long)req->message.body.initial);
            int res = strlen(tmpbuf);
            it = item_alloc(key, nkey, 0, realtime(req->message.body.expiration),
                            res + 2);
    
            if (it != NULL) {
                memcpy(ITEM_data(it), tmpbuf, res);
                memcpy(ITEM_data(it) + res, "\r\n", 2);

                if (store_item(it, NREAD_ADD, c)) {
                    c->cas = ITEM_get_cas(it);
                    write_bin_response(c, &rsp->message.body, 0, 0, sizeof(rsp->message.body.value));
                } else {
                    write_bin_error(c, PROTOCOL_BINARY_RESPONSE_NOT_STORED,
                                    NULL, 0);
                }
                item_remove(it);         /* release our reference */
            } else {
                out_of_memory(c,
                        "SERVER_ERROR Out of memory allocating new item");
            }
        }

코드를 보면 expiration 이 -1이 아니면 값을 셋팅하는 부분이 있습니다. 즉 binary protocol에서는 해당 키가 없더라도… 해당 시점에 생성이 가능합니다. text protocol 에서는 불가능 한데 말이죠. 또한, 최초에 아이템이 없을때만 생성이되고, 있을 경우에는 expiration을 변경하지도 않습니다. 코드를 보니… 궁금증들이 해결이 되는 것 같습니다.

[입 개발] RabbitMQ를 제대로 설치하는 방법

개발을 하다보면, Queue를 써야 하는 경우가 종종 있습니다. 뭔가를 비동기 적으로 처리하기 위해서, 또는 다이렉트로 들어가는 부담을 줄이기 위해서…

그런데 그러다보면 항상 고민되는 것이 바로 HA 입니다. 뭐, Queue의 모든 데이터가 날아가도 된다. 이러면 사실 큰 문제가 아니겠지만… 일반적으로 Queue의 데이터를 그렇게 날려도 되는 경우는 많지 않습니다.

일반적으로 많이 사용하는 Queue 는 여기서 소개할 RabbitMQ, ActiveMQ, Redis 기반의 Resque나 SideKiq(Resque 보다는 Sidekiq이 ㅎㅎㅎ) 아니면, 로그나 이런 대규모 부하에서는 거의 디팩토인 kafka 가 있습니다.

일단 각각 장단점이 있기 때문에, 여기서는 꽤 안정성을 보장해 주는 RabbitMQ의 HA를 구성하는 설치에 대해서만 설명하도록 하겠습니다.(다만 RabbitMQ의 경우는 운영하다 Consumer 가 느리면… 뻗어버리는 단점이…)

사실 설치 자체는 RabbitMQ 사용법을 그대로 따르면 됩니다. 별로 어려운 것도 없습니다.

CentOS면 yum으로, Ubuntu 면 apt로 쉽게 설치가 가능합니다. Rabbitmq 공식 페이지를 참조하면 됩니다.

그리고 보통은 클러스터를 설정합니다. 이 설정 방법도 쉽습니다. erlang 쿠키를 맞춰주고, join_cluster 만 해주면 됩니다. RabbitMQ HA Guide를 참고하시면 쉽습니다.

그러면 무엇을 해야하는가? 바로 이 Cluster가 그냥 쓰면 장애의 원인이 되기 때문입니다. 기본적으로, RabbitMQ에서 Cluster 모드가 되면, 채널 정보가 보통 Disk Node 에 저장되게 되는데, 이 서버가 내려가면… 해당 채널들은 모두 사용이 불가능해집니다.(장애시, HA를 하려고 설정한건데… 도리어, 장애가 나는거죠. 물론 그냥 죽어도 장애긴 합니다.)

그럼 제대로 RabbitMQ에서 HA를 구성하는 방법은 무엇일까요?

바로 mirrored queue를 운영하는 것입니다. RabbitMQ는 mirroed queue 라고 해서, 해당 Queue의 내용이 클러스터된 서버내에 중복되어 저장되는 기능을 의미합니다. 다만, 디폴트 설정으로는 설정이 된 시점의 데이터 부터, 복사가 되고 그 이전의 데이터는 복사되지 않습니다. (물론 이것도 옵션에 의해서 모두 가져가도록 할 수 있습니다.)

Mirrored Queue가 구성하는 방법 역시 RabbitMQ HA Guide에 잘 나와있습니다.

rabbitmqctl set_policy ha-all "^ha\." '{"ha-mode":"all"}

이러면… ha. 으로 시작하는 queue 들은 mirrored 형태로 구성하겠다는 것입니다. 실제 web ui에서 보면 Queue 이름에 두대로 클러스터가 되었다면 ha.test +1, 세 대라면 ha.test +2 식으로 구성되어야만 제대로 설정이 된 것입니다.

이제 어떤 서버로 연결할 것인가의 이슈가 생깁니다. 앞에 Load Balancer 를 붙여서(haproxy든 L4든) 장애난 서버를 제외하고 연결하게 하면… 제대로 된 서비스가 가능해집니다.

[입 개발] Redis AOF Rewrite 설정에 관하여…

오래간만에 Redis 관련 블로깅을 하네요.(요새는 실력 부족이라… 적을 내용이…) 먼저 실제 내용 자체는 지인의 질문에 대한 답변을 해드리다가, 정리할 필요가 있어서 이렇게 적습니다.(그렇습니다. 나중에 제가 까먹을까봐지요. 그 때 소스 다시 보기는 그러니…)

Redis에서 Persist 를 제공하는 방식은 RDB/AOF 두 가지가 있습니다. 뭐, 당연히 이쯤 되면 다들 기본적으로 이게 뭔지는 아실테니… 자세한 내용은 넘어가고요. 원래 AOF는 Redis 의 Comment Handler가 처리하는 명령들 중에 write관련 커맨드들만, 디스크에 Redis Protocol 그대로 저장하는 방식입니다.(현재 이에 대해 압축을 해볼까 하는 얘기들도 오고가고 있습니다만… 일단 이건 논외로…)

그런데 로그중에 다음과 같은 에러를 보셨다고 합니다.

"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis."

사실 Redis는 스레드를 몇 개 사용하긴 합니다. 그 중에 하나가 fsync를 스레드에서 실행하는 거죠. appendfsync 가 everysec 일 때만, 백그라운드로 돕니다. fsync 자체에서 대기하게 되면, redis가 아무작업도 못하게 되니…

appendfsync everysec

하여튼 fsync가 큐에 들어갔는데, 아직 실행이 안끝났다. 디스크가 바쁘거나, 뭔가 이슈가 있을꺼라라는 뜻이죠. 실제로 디스크가 좀 이상한 느낌이라고…

삼천포로 빠졌는데, 다시 AOF rewrite 얘기로 넘어가면, 실제로 모든 write 성 작업이 저장되기 때문에 AOF 파일이 한없이 커질 수가 있습니다. 그래서 특정시점에, 해당 파일 사이즈가 얼마 이상이면, rdb 생성 처럼 fork 후에 현재 메모리 정보를 전부 AOF 형식으로 다시 쓰게 됩니다.(그럼 사이즈가 줄어들 가능성이 있겠죠?, 변경이나 삭제가 한번도 없었다면, 줄어들지 않을수도…)

그럼 당연한 의문이 생깁니다. 어느 정도 되어야 rewrite가 일어날까? 관련 옵션이 바로 아래 두가지 입니다.

auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb

일단 rewrite가 rdb 처럼 fork를 이용하므로 사용하지 않고 싶다면 위의 auto-aof-rewrite-percentage 값을 0으로 설정하면 됩니다. auto-aof-rewrite-min-size 는 일단 최소 이것보다는 사이즈가 커야 rewrite 하라는 뜻입니다.

내부적으로 코드를 살펴보면 다음과 같습니다.

         /* Trigger an AOF rewrite if needed */
         if (server.rdb_child_pid == -1 &&
             server.aof_child_pid == -1 &&
             server.aof_rewrite_perc &&
             server.aof_current_size > server.aof_rewrite_min_size)
         {
            long long base = server.aof_rewrite_base_size ?
                            server.aof_rewrite_base_size : 1;
            long long growth = (server.aof_current_size*100/base) - 100;
            if (growth >= server.aof_rewrite_perc) {
                serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
                rewriteAppendOnlyFileBackground();
            }
         }

내부적으로 aof_current_size 와 aof_rewrite_base_size 를 사용하는데… AOF를 처음 만들면… 마지막으로 만들어졌던 AOF 파일의 크기가 aof_rewrite_base_size 가 되고 현재 AOF 파일의 크기가 aof_current_size 가 됩니다. 즉 auto-aof-rewrite-percentage 이 100이면, 이전에 만들어진 AOF가 1G 였다면, 2G가 되면(즉 100% 커지면) 하라는 뜻이 됩니다. 그리고 위의 조건 문에 의해서 0이면 실제로 AOF rewrite는 동작하지 않습니다.

[입개발] Redis Command 구조체에 대해서…

오늘 올라온 Redis 버그 중에 클러스터에서 geo command가 redirect를 먹지 않는다 라는 버그가 있었습니다. 그런데 antirez가 아주 쉽게 버그를 수정하면서 패치가 되었다고 답변을 답니다.(이 버그는 cluster 모드일때만 발생합니다.) 그리고 해당 글은 꼭 끝까지 보셔야 합니다!!!

https://github.com/antirez/redis/issues/2671

그리고 그 패치라는 것도 다음과 같습니다.
https://github.com/antirez/redis/commit/5c4fcaf3fe448c5575a9911edbcd421c6dbebb54

     {"geoadd",geoaddCommand,-5,"wm",0,NULL,1,1,1,0,0},
     {"georadius",georadiusCommand,-6,"r",0,NULL,1,1,1,0,0},
     {"georadiusbymember",georadiusByMemberCommand,-5,"r",0,NULL,1,1,1,0,0},
-    {"geohash",geohashCommand,-2,"r",0,NULL,0,0,0,0,0},
-    {"geopos",geoposCommand,-2,"r",0,NULL,0,0,0,0,0},
-    {"geodist",geodistCommand,-4,"r",0,NULL,0,0,0,0,0},
+    {"geohash",geohashCommand,-2,"r",0,NULL,1,1,1,0,0},
+    {"geopos",geoposCommand,-2,"r",0,NULL,1,1,1,0,0},
+    {"geodist",geodistCommand,-4,"r",0,NULL,1,1,1,0,0},
     {"pfselftest",pfselftestCommand,1,"r",0,NULL,0,0,0,0,0},
     {"pfadd",pfaddCommand,-2,"wmF",0,NULL,1,1,1,0,0},
     {"pfcount",pfcountCommand,-2,"r",0,NULL,1,1,1,0,0},

0, 0, 0 이 1, 1, 1 로만 바뀐거죠. 그럼 도대체 저 값들은 뭘 의미하는 걸까요? 그걸 위해서 이제 Redis Command 구조체를 알아보도록 하겠습니다.

먼저 Redis Command 구조체는 다음과 같습니다.

struct redisCommand {
    char *name;
    redisCommandProc *proc;
    int arity;
    char *sflags; /* Flags as string representation, one char per flag. */
    int flags;    /* The actual flags, obtained from the 'sflags' field. */
    /* Use a function to determine keys arguments in a command line.
     * Used for Redis Cluster redirect. */
    redisGetKeysProc *getkeys_proc;
    /* What keys should be loaded in background when calling this command? */
    int firstkey; /* The first argument that's a key (0 = no keys) */
    int lastkey;  /* The last argument that's a key */
    int keystep;  /* The step between first and last key */
    long long microseconds, calls;
};

먼저 위의 샘플 커맨드와 하나를 비교해보면

     {"pfadd",pfaddCommand,-2,"wmF",0,NULL,1,1,1,0,0},

name 과 proc는 각각 명확합니다. name은 사용자로 부터 입력받는 명령어이고, proc는 해당 명령어에 연결된 함수포인터입니다. 즉 name으로 사용자가 입력하면 proc가 실행되는 것이죠.

이제 arity 부터가 영어를 알면 쉽게 알 수 있습니다. 그렇습니다. 영어를 모르면 어렵지만…(전 방금 사전을…)
다음과 같은 뜻이 있습니다.

(logic, mathematics, computer science) The number of arguments or operands a function or operation takes. For a relation, the number of domains in the corresponding Cartesian product.

즉 파라메터의 개수입니다. 다음 코드를 보시죠. arity가 양수일 때는 파라매터 개수랑 동일해야 하고(고정적), 음수이면, 절대값으로 이값보다는 많거나 같아야 하는거죠(동적). 즉 -2 면 파라매터가 2개 이상이어야 하는거고, 2면 딱 2개여야 하는겁니다.

    } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
               (c->argc < -c->cmd->arity)) {
        flagTransaction(c);
        addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
            c->cmd->name);
        return REDIS_OK;
    }

sflags는 명령어의 속성을 문자열로 나타냅니다. 각 속성은 populateCommandTable 함수에 잘 나타나 있습니다.

            case 'w': c->flags |= REDIS_CMD_WRITE; break;
            case 'r': c->flags |= REDIS_CMD_READONLY; break;
            case 'm': c->flags |= REDIS_CMD_DENYOOM; break;
            case 'a': c->flags |= REDIS_CMD_ADMIN; break;
            case 'p': c->flags |= REDIS_CMD_PUBSUB; break;
            case 's': c->flags |= REDIS_CMD_NOSCRIPT; break;
            case 'R': c->flags |= REDIS_CMD_RANDOM; break;
            case 'S': c->flags |= REDIS_CMD_SORT_FOR_SCRIPT; break;
            case 'l': c->flags |= REDIS_CMD_LOADING; break;
            case 't': c->flags |= REDIS_CMD_STALE; break;
            case 'M': c->flags |= REDIS_CMD_SKIP_MONITOR; break;
            case 'k': c->flags |= REDIS_CMD_ASKING; break;
            case 'F': c->flags |= REDIS_CMD_FAST; break;

flags는 값이 모두 0입니다. 이 값은 로딩시에 sflags 로 부터 계산해서 만들어집니다. 위의 populateCommandTable 함수를 보시면 sflags 속성에 따라서 c->flags를 만드는 코드가 바로 보입니다. 왜 이렇게 구현했을까요? 아마 숫자값으로 설정되어 있는것 보다, 문자열이 보기 쉬워서가 아닐까 합니다.

getkeys_proc 는 파라매터만 가지고는 key를 찾기가 힘들때, 보조적으로 사용하는 함수의 함수포인터입니다. 위의 예에서는 전부 NULL 이지만, 다음과 같은 커맨들들이 있습니다.

    {"zunionstore",zunionstoreCommand,-4,"wm",0,zunionInterGetKeys,0,0,0,0,0},
    {"zinterstore",zinterstoreCommand,-4,"wm",0,zunionInterGetKeys,0,0,0,0,0},

zunionInterGetKeys 의 형태를 살펴보면 다음과 같습니다.

/* Helper function to extract keys from following commands:
 * ZUNIONSTORE <destkey> <num-keys> <key> <key> ... <key> <options>
 * ZINTERSTORE <destkey> <num-keys> <key> <key> ... <key> <options> */
int *zunionInterGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
    int i, num, *keys;
    REDIS_NOTUSED(cmd);

    num = atoi(argv[2]->ptr);
    /* Sanity check. Don't return any key if the command is going to
     * reply with syntax error. */
    if (num > (argc-3)) {
        *numkeys = 0;
        return NULL;
    }

    /* Keys in z{union,inter}store come from two places:
     * argv[1] = storage key,
     * argv[3...n] = keys to intersect */
    keys = zmalloc(sizeof(int)*(num+1));

    /* Add all key positions for argv[3...n] to keys[] */
    for (i = 0; i < num; i++) keys[i] = 3+i;

    /* Finally add the argv[1] key position (the storage key target). */
    keys[num] = 1;
    *numkeys = num+1;  /* Total keys = {union,inter} keys + storage key */
    return keys;
}

그리고 getkeys_proc 를 쓰는 곳은 db.c 에서 getKeysFromCommand 에서 다음과 같이 사용하고 있습니다.

int *getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
    if (cmd->getkeys_proc) {
        return cmd->getkeys_proc(cmd,argv,argc,numkeys);
    } else {
        return getKeysUsingCommandTable(cmd,argv,argc,numkeys);
    }
}

그런데 재미있는 것은 getkeys_proc 는 실제로 코드는 예전부터 존재했으나, 거의 쓰이지 않던 파라매터라는 것입니다. 실제로 현재 버전에서도 commandCommand 와 cluster 정보를 얻기위해서만 쓰고 있습니다.

firstkey, lastkey, keystep 은 각각, 첫번째 파라매터가 key 된다. 마지막 파라매터가 key 가 된다. keystep은 1이면 [key, key, key] 형태고, 2이면 [key, val, key, val, key, val] 형태가 되는 것을 의미합니다. 그래서 이 값이, 0, 0, 0 이면 geohash가 다음과 같은 결과를 냅니다.

127.0.0.1:6379> GEOHASH Sicily Palermo Catania
(empty list or set)

위의 값이 1로 셋팅이 되면… 다음과 같이 redirect 결과가 나옵니다.

127.0.0.1:6381> GEOHASH Sicily Palermo Catania
(error) MOVED 10713 127.0.0.1:6380

물론 둘 다 해당 서버인 6380에서 실행하면 정상적인 결과가 나옵니다.

127.0.0.1:6380> GEOHASH Sicily Palermo Catania
1) "sqc8b49rny0"
2) "sqdtr74hyu0"

그런데 왜 이게 이런 차이가 날까요? 아까 살짝 언급한게 “getkeys_proc 는 commandCommand 와 cluster 정보를 얻기위해서만 쓰고 있습니다.” 라고 말했습니다. 저 firstkey, lastkey, keystep 도 상단의 getKeysUsingCommandTable 즉 getKeysFromCommand 에서만 사용되고 있습니다.

다시말하면, 결국 cluster 에서 뭔가 정보를 얻을때, firstkey, lastkey, keystep을 쓴다는 것입니다. 그냥 커맨드가 독립적으로 실행될 때는 안쓴다는 T.T

코드를 찾아보면 cluster.c 의 getNodeByQuery 에서 위의 getKeysFromCommand 를 쓰고 있습니다. 어떤용도로 쓰고 있을까요?

geohash 명령을 cluster 모드에서 받았다고 할때… 실제로 getKeysFromCommand 는 getKeysUsingCommandTable를 호출하게 되는데 cmd->firstkey 는 0이므로 NULL이 리턴됩니다. numkey도 0이 셋팅됩니다.

int *getKeysUsingCommandTable(struct redisCommand *cmd,robj **argv, int argc, int *numkey>
    int j, i = 0, last, *keys;
    REDIS_NOTUSED(argv);

    if (cmd->firstkey == 0) {
        *numkeys = 0;
        return NULL;
    }
    last = cmd->lastkey;
    if (last < 0) last = argc+last;
    keys = zmalloc(sizeof(int)*((last - cmd->firstkey)+1));
    for (j = cmd->firstkey; j <= last; j += cmd->keystep) {
        redisAssert(j < argc);
        keys[i++] = j;
    }
    *numkeys = i;
    return keys;
}

그래서 getNodeByQuery에서는 다음과 같은 코드가 실행됩니다.

clusterNode *n = NULL
keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys);

//......

if (n == NULL) return myself;

즉 numkey가 0이라 노드를 찾는 작업을 하지 않고 n == NULL 이라서 자기자신에 속한다고 리턴해버립니다. 그래서 자기가 붙은 노드에서 검색하고 결과가 없어서 빈 값이 넘어가게 되는겁니다.

실제로 processCommand 소스를 보시면… 아래에 getNodeByQuery 를 호출하고, myself 가 아닐때에는 hashslot을 가진 서버로 clusterRedirectClient를 호출해서 보내게 되는데… 아까 위의 결과에서 myself가 리턴되기 때문에… 해당 서버에서 실행되게 되어서 결과가 없는 것입니다.

    if (server.cluster_enabled &&
        !(c->flags & REDIS_MASTER) &&
        !(c->flags & REDIS_LUA_CLIENT &&
          server.lua_caller->flags & REDIS_MASTER) &&
        !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0))
    {
        int hashslot;

        if (server.cluster->state != REDIS_CLUSTER_OK) {
            flagTransaction(c);
            clusterRedirectClient(c,NULL,0,REDIS_CLUSTER_REDIR_DOWN_STATE);
            return REDIS_OK;
        } else {
            int error_code;
            clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_cod>
            if (n == NULL || n != server.cluster->myself) {
                flagTransaction(c);
                clusterRedirectClient(c,n,hashslot,error_code);
                return REDIS_OK;
            }
        }
    } 

딱, 이렇습니다라고 말해야 하지만… 사실 여기는 훼이크가 있습니다. 위의 processCommand의 코드를 자세히 살펴보면… 아래와 같은 코드가 있습니다.

    !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0)

네 그렇습니다. getkeys_proc 가 없고, firstkey가 0이 아니어야 아래의 코드를 타는 것입니다. 미리 조건을 걸러낸것입니다. 그래서 실제로… 위의 코드를 타지 않고… 아예 바로 해당 서버에서 실행됩니다. 즉 myself를 리턴한것과 동일한 효과가 나는 것이지요. firstkey가 0이면 파라매터가 없는 명령으로 인식되어서 그냥 해당 서버에서 실행이 되는 것입니다.

살짝 속으셨다고 분노하시겠지만, 결국은 해당 코드도 그렇게 동작하게 됩니다. :) 호호호호… 그러면 즐거운 하루되시길…

[입 개발] Redis internal : Redis Dictionary Type 에 관해서

Redis 는 기본적으로 Hash 형태로 데이터를 관리하지만, 내부적으로 관리가 필요한 정보들 역시, 내부적으로는 Dictionary 라고 부르는 Hash 형태로 관리하고 있습니다. 그런데 이 Dictionary 에 대한 핸들링이, 관리되는 데이터의 종류에 따라서 다르게 처리되어야 할 때가 있습니다. 아마 오늘 글을 그냥 Redis를 쓰시는 분 입장에서는 별 내용이 없고, Redis 소스를 건드리시는 분들에게는 아주 살짝 도움이 될듯합니다.

보통 대부분의 언어에서는 함수 오버라이딩등을 이용한 폴리모피즘을 이용해서 이런 방식의 요구사항을 좀 수월하게 처리하게 되어 있습니다. 그러나 C 에서는… 기본적으로 이런 방식이 제공되지 않지만, 함수 포인터를 이용해서 이런 방식을 구현할 수 있습니다. 가장 유명한 예가, 리눅스 커널의 VFS 같은 부분을 보면 됩니다.

cluster.c 의 clusterInit 코드를 보면 다음과 같이 dictCreate 함수를 이용해서 dictionary를 생성하는 것을 볼 수 있습니다.

    server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL);
    server.cluster->nodes_black_list =
        dictCreate(&clusterNodesBlackListDictType,NULL);

dictCreate 함수를 살펴보면 다음과 같이 dictType, privDataPtr 두개의 파라매터를 가지고, 결과로 dict 를 넘겨줍니다.

dict *dictCreate(dictType *type,
        void *privDataPtr)
{
    dict *d = zmalloc(sizeof(*d));

    _dictInit(d,type,privDataPtr);
    return d;
}

먼저 dict 구조체부터 살펴보도록 하겠습니다.

typedef struct dict {
    dictType *type;
    void *privdata;
    dictht ht[2];
    long rehashidx; /* rehashing not in progress if rehashidx == -1 */
    int iterators; /* number of iterators currently running */
} dict;

dict 구조체는 dictType 과 dictht 두개를 가집니다. 여기서 dictht는 dict의 테이블 확장시에 사용하기 위한 것입니다. 그럼 이 dict 안에 있는 dictType은 뭘까요? dictType은 다음과 같이 정의되어 있습니다.

typedef struct dictType {
    unsigned int (*hashFunction)(const void *key);
    void *(*keyDup)(void *privdata, const void *key);
    void *(*valDup)(void *privdata, const void *obj);
    int (*keyCompare)(void *privdata, const void *key1, const void *key2);
    void (*keyDestructor)(void *privdata, void *key);
    void (*valDestructor)(void *privdata, void *obj);
} dictType;

dictType 을 보시면 hashFuction, keyDup, valDup, keyCompare, keyDestructor, valDestructor 같은 값들이 정의되어 있습니다. 즉, 함수포인터를 가지고 있기 때문에, 여기서 가리키는 함수의 동작이 다르면, 같은 형태로 보이더라도 서로 다르게 동작하게 할 수 있습니다.

그리고 현재 Redis 에는 다음과 같이 8개의 dictType이 정의되어 있습니다.

  • dictType setDictType;
  • dictType clusterNodesDictType;
  • dictType clusterNodesBlackListDictType;
  • dictType dbDictType;
  • dictType shaScriptObjectDictType;
  • dictType hashDictType;
  • dictType replScriptCacheDictType;

dictType 구조체 안에서 특히 keyCompare 를 통해서 해당 키를 찾아내게 됩니다.
예를 들어서, 내부적으로 실제 키들을 저장하는 곳에서는 dbDictType 을 사용합니다. dbDictType을 보면 다음과 같습니다.

/* Db->dict, keys are sds strings, vals are Redis objects. */
dictType dbDictType = {
    dictSdsHash,                /* hash function */
    NULL,                       /* key dup */
    NULL,                       /* val dup */
    dictSdsKeyCompare,          /* key compare */
    dictSdsDestructor,          /* key destructor */
    dictRedisObjectDestructor   /* val destructor */
};

dbDictType에서는 Key의 비교를 위해서는 dictSdsKeyCompare 를 사용하고, dictSdsKeyCompare는 다음과 같이 구현되어 있습니다.

int dictSdsKeyCompare(void *privdata, const void *key1,
        const void *key2)
{
    int l1,l2;
    DICT_NOTUSED(privdata);

    l1 = sdslen((sds)key1);
    l2 = sdslen((sds)key2);
    if (l1 != l2) return 0;
    return memcmp(key1, key2, l1) == 0;
}

그리고 Redis Command 를 저장하는 곳에서 사용하는 commandTableDictType 에서는 대소문자 구분이 필요없을 때는 dictSdsKeyCaseCompare 를 사용합니다.

int dictSdsKeyCaseCompare(void *privdata, const void *key1,
        const void *key2)
{
    DICT_NOTUSED(privdata);

    return strcasecmp(key1, key2) == 0;
}

위와 같은 형태에서 dictFind를 살펴보면 내부적으로 dictType 함수포인터를 이용하게 됩니다.

dictEntry *dictFind(dict *d, const void *key)
{
    dictEntry *he;
    unsigned int h, idx, table;

    if (d->ht[0].size == 0) return NULL; /* We don't have a table at all */
    if (dictIsRehashing(d)) _dictRehashStep(d);
    h = dictHashKey(d, key);
    for (table = 0; table <= 1; table++) {
        idx = h & d->ht[table].sizemask;
        he = d->ht[table].table[idx];
        while(he) {
            if (dictCompareKeys(d, key, he->key))
                return he;
            he = he->next;
        }
        if (!dictIsRehashing(d)) return NULL;
    }
    return NULL;
}

#define dictCompareKeys(d, key1, key2) \
    (((d)->type->keyCompare) ? \
        (d)->type->keyCompare((d)->privdata, key1, key2) : \
        (key1) == (key2))

위의 dictCompareKeys가 내부적으로 dictType의 keyCompare를 이용하는 것을 볼 수 있습니다. 그런데 여기서 조심해야 하는 것들이 있습니다. 실제 dictFind 에서 key를 넘겨줄 때, dictSdsKeyCompare 는 key가 무조건 sds type이어야 하지만, dictSdsKeyCaseCompare 에서는 실제 값만 비교하므로, 단순 string 이어도 가능합니다. 말 그대로 가능만 합니다. 그런데, 뭔가 잘못 쓰기 시작하면… Redis가 뻥뻥 죽어나가는 걸 보실 수 있을껍니다.

그래서 cluster.c 소스를 보면, 무조건 key가 sds 형태여야 하기 때문에 발생하는 사소한 오버헤드도 존재합니다. 뭐, 그러나 자주 발생하지는 않으니… 그냥 패스를 ㅎㅎㅎ

[입 개발] Scala의 거시기 : _(underscore) 의 용법 정리

Scala 를 사용하면 만나게 되는 여러 문법 중에, 처음 접하는 이들의 머리를 깨는 것이 있으니, 그게 바로 Scala에서 거시기로 통하는 _(underscore) 입니다. 이게 뭐지 하고 고민하는 중에 팀분이(멀린 사랑해요.) 아래 자료를 알려주셨습니다. 그리고 이 블로그는 아래의 문서를 풀이하는 것입니다. 사실 아래의 문서만 봐도 거의 모든 것이 이해되지만, 제 부족한 머리를 위해서 정리해둡니다. 참고로 dreaded 는 “무서운” 이런뜻입니다.

첫 슬라이드는 다음과 같습니다. 뭔가 어려워보이죠? 각종 _ 의 용법은 모두 들어가 있습니다.
아주 간단하게 설명하면 아래의 offset 에 대입되는 커링 함수 sum2(count) 에서 count는 매번 실행시의 count 값이 바인딩됩니다.(악 벌써 여기부터 어려워!!!)

class Underscores {
	import collection.{ Map => _ , _ }

	var count : Int = _

	def sum = (_:Int) + (_:Int)
	def sum2(a:Int)(b:Int) = a+b
	def offset = sum2(count) _

	def sizeOf(l:Traversable[_]) : Unit = l match {
		case it:Iterable[Int] => count = (0/:it)(_ + _)
		case s:Seq[_] => s.foreach( _ => count = count + 1)
		case _ => println(offset(l.size))
	}
}

두번째 슬라이드는 각각 어떤 용법으로 이루어졌나를 보여줍니다. 총 6가지 용법을 색상까지 이쁘게 보여주네요.

세번째 슬라이드 부터 각각의 용법에 대해서 알려줍니다.

1번은 “모두”를 의미합니다. 아래의 예에서 첫번째 Map => _ 는 일단 무시하시고(5번이니깐요.)
그 뒤의 _ 가 자바에서의 import * 와 같은 용법입니다.(왜 *가 아닌지는…)

	import collection.{ Map => _ , _ }

실제 예는 다음과 같습니다.

import java.util._
val date = new Date()

import scala.util.control.Breaks._
breakable {
	for (i <- 0 to 10) { if (i == 5) break }
}

웬지 breakable은 Exception을 잡아서 나가는 것 같은 느낌이 진하게 납니다.

2번은 디폴트 값 지정입니다. 숫자면 0 문자면 null로 지정됩니다.

	var count : Int = _

다만 이렇게 지정하는 건 생성자에서만 되고 함수안에서는 되지 않습니다.

class Foo {
	var i:Int = _ // i = 0
	var s:String = _ // s = null

	def f {
	// var i:String = _//error: local variables must be initialized
	}
}

3번째는 unused variables 입니다. 아래와 같이 _로 받은걸 쓰지 않게 되는 겁니다.

		case _ => println(offset(l.size))

예를 보면 다음과 같습니다. 아래의 두 예는 같은 예입니다. x를 파라매터로 받지만… 쓰지 않는 겁니다.

(1 to 5) foreach { (x:Int) => println("one more")}
(1 to 5) foreach { _ => println("one more")}

다음 예제들도 동일합니다.

def inPatternMatching1(s:String) {
	s match {
		case "foo" => println("foo !")
		case x => println("not foo")
	}
}

def inPatternMatching2(s:String) {
	s match {
		case "foo" => println("foo !")
		case _ => println("not foo")
	}
}

4번째는 이름 없는 파라매터입니다. 아주 명확하게 들어갈 변수 대신에 지정되게 됩니다.(3번하고는 반대입죠.)
아래의 예는 1…10 까지가 x로 바인딩되는데, 이걸 _로 대체하는 경우입니다. 명시적으로 사용하기 위해서이죠.

(1 to 10) map { x => x + 1}
(1 to 10) map { _ + 1}

(1 to 10).foldLeft(0) { (x,y) => x+y }
(1 to 10).foldLeft(0) { _+_ }

이제 partial function 에서 보면 더더욱 눈이 휘둥굴해 집니다.

def f(i:Int) : String = i.toString

def g = (x:Int) => f(x)
def g2 = f _
def f2 = (_:String).toString

def u(i:Int)(d:Double) : String = i.toString + d.toString

def v = u _

def w1 = u(4) _

def w2 = u(_:Int)2.0)

5번은 아까 Map 관련 헤더들을 import 하지 말라는 뜻입니다.
6번은 c++의 Template 처럼 특정 타입을 지칭하는 것입니다. 위에서 넘어온 타입을 그대로 사용하겠다라고 하면 이해가 쉬울까요.

다시 한번 말씀드리지만, 슬라이드가 잘 되어있으니, 실제로 보시면 꽤 도움이 되실껍니다. 뭐, 저도 공부하는 중이라…

[입 개발] Scala 의 App Trait는 어떻게 동작하는가?

요새 스칼라 스터디를 하고 있는데…(스칼라 어려워요 흑흑흑, 전 스맹 T.T) 아주 여러가지 기능들이 있습니다. 그런데 첫 부분에 나오는 예제부터 머리속을 땡땡 때리는 경우가 있습니다. 간단한 예를 들자면, tuple의 파라매터가 22개 밖에 안되는 것은 실제로 tuple1 ~ tuple22 까지의 클래스가 있어서 처리된다는 것(tuple 은 다시 product 이라는 것을 상속받는…)

보통 우리가 언어를 처음 배울때 쓰는 첫 예제는… 반가워 세상입니다. 즉 Hello World! 를 출력하는 것이죠.

 object HelloWorld {
    def main(args: Array[String]) {
      println("Hello, world!")
    }
  }

그런데 App 이라는 trait 를 상속받으면 다음과 같은 형태로 똑같이 동작이 됩니다.

object HelloWorld extends App {
  println("Hello, world!")
}

사실 스칼라를 공부하는 사람이야 그냥 당연하게 넘어갈 수 있지만, 두 번째 예의 경우는 println 코드가 있는 부분은 생성자입니다. 그런데 “어떻게 저게 자동으로 실행이 되는거지?” 라는 의문이 생기게 됩니다.(안생기면 500원…), 그리고 args 도 사용할 수 있습니다.

그래서 안을 조금 파보니…

App Trait 는 다시 DelayedInit 라는 Trait를 상속받습니다. 먼저 App Trait 부터 살짝 보도록 하겠습니다.

trait App extends DelayedInit {

  /** The time when the execution of this program started, in milliseconds since 1
    * January 1970 UTC. */
  @deprecatedOverriding("executionStart should not be overridden", "2.11.0")
  val executionStart: Long = currentTime

  /** The command line arguments passed to the application's `main` method.
   */
  @deprecatedOverriding("args should not be overridden", "2.11.0")
  protected def args: Array[String] = _args

  private var _args: Array[String] = _

  private val initCode = new ListBuffer[() => Unit]

  /** The init hook. This saves all initialization code for execution within `main`.
   *  This method is normally never called directly from user code.
   *  Instead it is called as compiler-generated code for those classes and objects
   *  (but not traits) that inherit from the `DelayedInit` trait and that do not
   *  themselves define a `delayedInit` method.
   *  @param body the initialization code to be stored for later execution
   */
  @deprecated("The delayedInit mechanism will disappear.", "2.11.0")
  override def delayedInit(body: => Unit) {
    initCode += (() => body)
  }

    /** The main method.
   *  This stores all arguments so that they can be retrieved with `args`
   *  and then executes all initialization code segments in the order in which
   *  they were passed to `delayedInit`.
   *  @param args the arguments passed to the main method
   */
  @deprecatedOverriding("main should not be overridden", "2.11.0")
  def main(args: Array[String]) = {
    this._args = args
    for (proc <- initCode) proc()
    if (util.Properties.propIsSet("scala.time")) {
      val total = currentTime - executionStart
      Console.println("[total " + total + "ms]")
    }
  }
}

젤 아래의 main을 보면, 아 여기서 실행되겠구나 할것입니다. 쉽네하고 보다보면, 다시 이상해집니다. 분명히 main이 보통 entrypoint 일텐데…(실제로는 object이니 이것을 실행하는 부분이 있긴하겠죠.) 뭔가 initCode 라는 것에서 proc를 가져와서 이걸 실행시킵니다.

그 위의 delayedInit 함수를 보니, body가 넘어와서 initCode에 저장됩니다.(여기서 body는 람다라고 보시면 될듯합니다.)

그럼 다시 처음으로 여기서 main이 실행되는 건 알겠는데… App Trait를 상속받은 object의 생성자를 실행을 시켜주는 걸로 봐서 아마도 위의 proc 가 App Trait를 상속받은 object의 생성자일꺼라는 예상을 할 수 있게 됩니다. 그러나, 여전히 delayedInit을 호출해주는 녀석은 보이지 않습니다. 다시 App Trait 가 DelayedInit Trait를 상속받으니, 이걸 살펴보도록 하겠습니다.

trait DelayedInit {
  def delayedInit(x: => Unit): Unit
}

악!!! 살펴볼 내용이 없습니다. 그냥 인터페이스만 정의가 되어있습니다. 그럼 뭔가 언어적으로 뭔가 해주지 않을까 싶습니다. 소스를 까보면 src/reflect/scala/reflect/internal/Definitions.scala 에서 다음 코드를 발견할 수 있습니다.

def delayedInitMethod = getMemberMethod(DelayedInitClass, nme.delayedInit)

해당 클래스에서 delayedInit를 뽑아내는 것 같습니다.

그리고 src/compiler/scala/tools/nsc/transform/Constructors.scala 를 보면 다음 코드가 있습니다.

    private def delayedInitCall(closure: Tree) = localTyper.typedPos(impl.pos) {
      gen.mkMethodCall(This(clazz), delayedInitMethod, Nil, List(New(closure.symbol.tpe, This(clazz))))
    }

그리고 위의 delayedInitCall은 rewriteDelayedInit() 에서 사용하고 있습니다. delayedInitCall을 실제로
호출하게 됩니다. 즉 여기서 아까 delayedInit가 호출되면서 App Trait 의 initCode 쪽에 생성자를 넣어주는 것입니다. 그래서 실제로 App Trait 의 main에서 그걸 호출하게 되는거죠.

    def rewriteDelayedInit() {
      /* XXX This is not corect: remainingConstrStats.nonEmpty excludes too much,
       * but excluding it includes too much.  The constructor sequence being mimicked
       * needs to be reproduced with total fidelity.
       *
       * See test case files/run/bug4680.scala, the output of which is wrong in many
       * particulars.
       */
      val needsDelayedInit = (isDelayedInitSubclass && remainingConstrStats.nonEmpty)

      if (needsDelayedInit) {
        val delayedHook: DefDef = delayedEndpointDef(remainingConstrStats)
        defBuf += delayedHook
        val hookCallerClass = {
          // transform to make the closure-class' default constructor assign the the outer instance to its pa>
          val drillDown = new ConstructorTransformer(unit)
          drillDown transform delayedInitClosure(delayedHook.symbol.asInstanceOf[MethodSymbol])
        }
        defBuf += hookCallerClass
        remainingConstrStats = delayedInitCall(hookCallerClass) :: Nil
      }
    }

마지막으로 Constructors.scala 안에서 다시 rewriteDelayedInit를 실행합니다. 그래서 App Trait를 상속받을 경우 생성자에만 코드를 넣어두면 실행이 되는 것입니다.

뭐, 이게 맞는 플로우인지는 정확하게 보증은 못합니다. 저도 이제 막 스칼라를 공부하는 중이고, 아무리 봐도, 스칼라를 편안하게 쓰지는 못할듯 하네요 T.T 흑흑흑