[입 개발] Redis 접속이 안되요!!! – Protected Mode

최근에 자주 다음 질문을 받는 케이스가 늘어서 정리합니다.(사실 저도 당한…)
최신 버전 3.2.x 을 설치하고 클라이언트를 접속하고 보면… 접속이 안됩니다.

일단 client 로 접속을 하게 되면 다음과 같은 에러를 볼 수 있습니다.
(항상이 아니라 흔히 볼 수 있습니다.)

“-DENIED Redis is running in protected mode because protected ”
“mode is enabled, no bind address was specified, no ”
“authentication password is requested to clients. In this mode ”
“connections are only accepted from the loopback interface. ”
“If you want to connect from external computers to Redis you ”
“may adopt one of the following solutions: ”
“1) Just disable protected mode sending the command ”
“‘CONFIG SET protected-mode no’ from the loopback interface ”
“by connecting to Redis from the same host the server is ”
“running, however MAKE SURE Redis is not publicly accessible ”
“from internet if you do so. Use CONFIG REWRITE to make this ”
“change permanent. ”
“2) Alternatively you can just disable the protected mode by ”
“editing the Redis configuration file, and setting the protected ”
“mode option to ‘no’, and then restarting the server. ”
“3) If you started the server manually just for testing, restart ”
“it with the ‘–protected-mode no’ option. ”
“4) Setup a bind address or an authentication password. ”
“NOTE: You only need to do one of the above things in order for ”
“the server to start accepting connections from the outside.\r\n”;

이유가 무엇인고 하면 3.2.x 부터 Redis 에 Protected mode 라는 것이 생겼습니다. 이 Protected Mode라는 것은 또 무엇인고 하니, 혹시 예전의 보안 사고를 기억하시나요? Redis 는 굉장히 보안에 취약합니다. 특히 public 으로 열어두면 거의 해킹의 온상이 되는… 방법은 그렇게 공개하지는 않겠습니다.

그래서 추가된 것이 이 protected mode 입니다. protected mode 가 설정되어 있는 상태에서 패스워드가 설정되지 않고, 특정 ip로 bind가 되어있지 않으면, connection 자체가 위의 에러를 내면서 실패하게 됩니다.

그런데 이런 문의가 급증하는 것은 이 protected mode 가 default yes 이고 보통 특정 ip로 bind 시키지 않고 requirepass 를 지정하지 않습니다. 보통은 내부망에서만 쓰라는 얘기가 되는거죠. 그래서 이걸 해결 하기 위해서는 다음 명령을 127.0.0.1 즉 local loopback 주소에서 접속한 다음 날려야 합니다.

config set protected-mode no

 

실제 코드를 보면 다음 부분에서 문제가 되는겁니다. src/networking.c 에 있습니다.

    if (server.protected_mode &&
        server.bindaddr_count == 0 &&
        server.requirepass == NULL &&
        !(flags & CLIENT_UNIX_SOCKET) &&
        ip != NULL)
    {
        if (strcmp(ip,"127.0.0.1") && strcmp(ip,"::1")) {
            char *err =
                "-DENIED Redis is running in protected mode because protected "
                "mode is enabled, no bind address was specified, no "
                "authentication password is requested to clients. In this mode "
                "connections are only accepted from the loopback interface. "
                "If you want to connect from external computers to Redis you "
                "may adopt one of the following solutions: "
                "1) Just disable protected mode sending the command "
                "'CONFIG SET protected-mode no' from the loopback interface "
                "by connecting to Redis from the same host the server is "
                "running, however MAKE SURE Redis is not publicly accessible "
                "from internet if you do so. Use CONFIG REWRITE to make this "
                "change permanent. "
                "2) Alternatively you can just disable the protected mode by "
                "editing the Redis configuration file, and setting the protected "
                "mode option to 'no', and then restarting the server. "
                "3) If you started the server manually just for testing, restart "
                "it with the '--protected-mode no' option. "
                "4) Setup a bind address or an authentication password. "
                "NOTE: You only need to do one of the above things in order for "
                "the server to start accepting connections from the outside.\r\n";
            if (write(c->fd,err,strlen(err)) == -1) {
                /* Nothing to do, Just to avoid the warning... */
            }
            server.stat_rejected_conn++;
            freeClient(c);
            return;
        }
    }

 

점점 보안이 중요해지네요. 제 acl 패치는 언제 받아들여질지 T.T

[입 개발] Memcached 에서 incr/decr 은 음수에 대해서는 사용할 수 없습니다.

오늘 우리 팀의 조실장님이 Memcached 관련해서 에러가 난다고 보고를 해주셨습니다. 실제 분석까지 대충 다 끝낸… 조실장 화이팅!!!

그래서 과연 그런가 싶어서 memcached 소스를 먼저 열었습니다. 일단 증상은 다음과 같습니다. memcached 에 값을 음수로 설정하고  incr/decr 을 하면 에러가 발생한다.  일단 조건들은 다음과 같습니다.

  • client library 는 spymemcached

 

enum delta_result_type do_add_delta(conn *c, const char *key, const size_t nkey,
                                    const bool incr, const int64_t delta,
                                    char *buf, uint64_t *cas,
                                    const uint32_t hv) {
    char *ptr;
    uint64_t value;
    int res;
    item *it;

    it = do_item_get(key, nkey, hv);
    if (!it) {
        return DELTA_ITEM_NOT_FOUND;
    }

    /* Can't delta zero byte values. 2-byte are the "\r\n" */
    if (it->nbytes <= 2) {
        return NON_NUMERIC;
    }

    if (cas != NULL && *cas != 0 && ITEM_get_cas(it) != *cas) {
        do_item_remove(it);
        return DELTA_ITEM_CAS_MISMATCH;
    }

    ptr = ITEM_data(it);

    if (!safe_strtoull(ptr, &value)) {
        do_item_remove(it);
        return NON_NUMERIC;
    }

    if (incr) {
        value += delta;
        MEMCACHED_COMMAND_INCR(c->sfd, ITEM_key(it), it->nkey, value);
    } else {
        if(delta > value) {
            value = 0;
        } else {
            value -= delta;
        }
        MEMCACHED_COMMAND_DECR(c->sfd, ITEM_key(it), it->nkey, value);
    }

    pthread_mutex_lock(&c->thread->stats.mutex);
    if (incr) {
        c->thread->stats.slab_stats[ITEM_clsid(it)].incr_hits++;
    } else {
        c->thread->stats.slab_stats[ITEM_clsid(it)].decr_hits++;
    }
    pthread_mutex_unlock(&c->thread->stats.mutex);

    snprintf(buf, INCR_MAX_STORAGE_LEN, "%llu", (unsigned long long)value);
    res = strlen(buf);
    /* refcount == 2 means we are the only ones holding the item, and it is
     * linked. We hold the item's lock in this function, so refcount cannot
     * increase. */

    if (res + 2 <= it->nbytes && it->refcount == 2) { /* replace in-place */
        /* When changing the value without replacing the item, we
           need to update the CAS on the existing item. */
        ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);

        memcpy(ITEM_data(it), buf, res);
        memset(ITEM_data(it) + res, ' ', it->nbytes - res - 2);
        do_item_update(it);
    } else if (it->refcount > 1) {
        item *new_it;
        new_it = do_item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2, hv);
        if (new_it == 0) {
            do_item_remove(it);
            return EOM;
        }
        memcpy(ITEM_data(new_it), buf, res);
        memcpy(ITEM_data(new_it) + res, "\r\n", 2);
        item_replace(it, new_it, hv);
        // Overwrite the older item's CAS with our new CAS since we're
        // returning the CAS of the old item below.
        ITEM_set_cas(it, (settings.use_cas) ? ITEM_get_cas(new_it) : 0);
        do_item_remove(new_it);       /* release our reference */
    } else {
        /* Should never get here. This means we somehow fetched an unlinked
         * item. TODO: Add a counter? */
        if (settings.verbose) {
            fprintf(stderr, "Tried to do incr/decr on invalid item\n");
        }
        if (it->refcount == 1)
            do_item_remove(it);
        return DELTA_ITEM_NOT_FOUND;
    }

    if (cas) {
        *cas = ITEM_get_cas(it);    /* swap the incoming CAS value */
    }
    do_item_remove(it);         /* release our reference */
    return OK;
}

NON_NUMERIC 에러를 리턴하는 경우는 코드에서 2가지 입니다.

  • 기존 아이템이 2글자 이하일 경우
  • safe_strtoull 결과가 false 일 때

 

이제 다시 safe_strtoull 함수를 살펴보면 다음과 같습니다. 처음에 데이터를 unsigned long long 으로 받으므로 해당 값을 singed long long 으로 바꾸고… 이게 0보다 적으면(즉 overflow 상황이면) 실제로 – 로 시작하는지 확인합니다. 그렇습니다. 이건 데이터가 문자열로 들어가 있다는 소리!!! 하여튼 왜 그런지는 모르겠지만 memcached 에서는 음수로 셋팅한 값을 incr/decr 하면 안됩니다.

bool safe_strtoull(const char *str, uint64_t *out) {
    assert(out != NULL);
    errno = 0;
    *out = 0;
    char *endptr;
    unsigned long long ull = strtoull(str, &endptr, 10);
    if ((errno == ERANGE) || (str == endptr)) {
        return false;
    }

    if (xisspace(*endptr) || (*endptr == '\0' && endptr != str)) {
        if ((long long) ull < 0) {
            /* only check for negative signs in the uncommon case when
             * the unsigned number is so big that it's negative as a
             * signed number. */
            if (strchr(str, '-') != NULL) {
                return false;
            }
        }
        *out = ull;
        return true;
    }
    return false;
}

 

재미난건 ascii 에서는 CLIENT_ERROR 에 메시지로 에러 값이, binary 에서는 PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL 이라는 0x06 값이 전달됩니다.

반면에 Redis는 그런 구분 없이 음수형태도 그대로 저장됩니다.

void incrDecrCommand(client *c, long long incr) {
    long long value, oldvalue;
    robj *o, *new;

    o = lookupKeyWrite(c->db,c->argv[1]);
    if (o != NULL && checkType(c,o,OBJ_STRING)) return;
    if (getLongLongFromObjectOrReply(c,o,&value,NULL) != C_OK) return;

    oldvalue = value;
    if ((incr < 0 && oldvalue < 0 && incr < (LLONG_MIN-oldvalue)) ||
        (incr > 0 && oldvalue > 0 && incr > (LLONG_MAX-oldvalue))) {
        addReplyError(c,"increment or decrement would overflow");
        return;
    }
    value += incr;

    if (o && o->refcount == 1 && o->encoding == OBJ_ENCODING_INT &&
        (value < 0 || value >= OBJ_SHARED_INTEGERS) &&
        value >= LONG_MIN && value <= LONG_MAX)
    {
        new = o;
        o->ptr = (void*)((long)value);
    } else {
        new = createStringObjectFromLongLong(value);
        if (o) {
            dbOverwrite(c->db,c->argv[1],new);
        } else {
            dbAdd(c->db,c->argv[1],new);
        }
    }
    signalModifiedKey(c->db,c->argv[1]);
    notifyKeyspaceEvent(NOTIFY_STRING,"incrby",c->argv[1],c->db->id);
    server.dirty++;
    addReply(c,shared.colon);
    addReply(c,new);
    addReply(c,shared.crlf);
}

[책 리뷰] 알고리즘 문제 풀이 전략

해당 리뷰는 “한빛미디어”에서 제공하는 도서를 이용하여 리뷰를 진행하였습니다.

이 책의 제목은 미사여구를 포함해서 “프로그래머의 취업, 이직을 결정하는 알고리즘 문제 풀이 전략” 입니다.  솔직히 말해서, 국내는 알고리즘 문제를 이용해서 기술 면접이 이루어지는 곳이 엄청 많지는 않습니다. 그리고, 약간 면접관에 의해서 케이스 바이 케이스인 경우도 많구요. 다만, 이제 점점 전화면접이나 온라인 홈워크나 문제 풀이를 이용해서 지원자를 스크리닝 하는 경우는 점점 늘어나고 있습니다. 다만, 외국의 특정 G사, F사 처럼 문제 풀이만 시키는 형태가 되기에는 아직은 시간이 걸릴듯 합니다.

아픈 기억이지만, 저도 실제로 알고리즘 문제만 푸는 면접을 2012년에 본 적이 있습니다.  이렇게 아픈 기억이라고 얘기하는 건, 똑 떨어졌다라는 이야기입니다. 5시간 동안 알고리즘만 풀면… 머리가 안돌아가는…

다시 책 이야기로 돌아와서, 이 책은 크게 두 부분으로 구성되어 있습니다. 기본적인 자료구조/알고리즘 부분…(C코드로 되어있는…) 나머지는 실제로 문제와 그에 대한 해답입니다. 개인적으로 이 책의 최대 장점은 문제 부분보다는, 앞쪽에 잘 정리된 자료구조/알고리즘 부분입니다. 이미지를 통해서 실제로 단계별로 어떻게 이렇게 진행되는지에 대해서 자세히 설명되어 있습니다.(간단한거는 그냥 쉽게 훅 넘기기도 합니다. ㅎㅎㅎ)

보통 알고리즘 문제를 내는 곳도 엄청 어려운 문제를 내지는 않습니다. 그리고 정답을 맞추는 것보다 정답을 맞추어가는 과정을 중요시합니다. 그래서 실제 이런 문제를 풀 때는, 자신이 생각하는 과정과 풀이 과정을 계속 면접관과 맞추어가는 것이 중요합니다.

그래서 어떤 자료구조를 선택하고 알고리즘을 선택할 것인가에 대해서 기본기가 튼튼해야 하는데, 이런 부분이 잘 갖춰져 있다는 겁니다. 그리고 문제를 다 풀어보는 것도 꽤나 도움이 될듯합니다. 실제로 같은 문제가 나오지는 않겠지만…(craking code interview 라는 책이 있는데,  인사이트에서 “코딩 인터뷰 완전분석” 이라고 번역되어 나왔습니다. 외국에서 이 책은 거의 취업쪽 바이블로 통하는…) 문제를 풀어보고 연습해 둔다는 것이 상당히 중요합니다. 실제 외국에서도 이직을 할려면 이런 쪽 문제랑 공부를 대략 6개월 정도 합니다.

사실 제가 약한 부분이던, 전위/중위/후위 순회라든지, 정렬쪽이 상당히 설명이 잘 되어있어서 좋았습니다. 여러 책을 보시겠지만, 앞에서 언급한 책과 함께 보시면 좋을듯 합니다.

 

[입 개발] spring-cloud-config-server 이야기…

spring cloud 에서 가장 중요한 서버를 뽑으라면, 일단 Eureka 서버를 뽑을 수 있습니다.(아니면, concul 이나, etcd 등등등) 그런데 이 Eureka를 잘 쓰기 위해서 먼저 설정해야 할 서버가 있습니다.(사실 Eureka 는 구조를 설명할 능력이 안되므로…)

twelve factors application 개발이라는 어려운 용어를 쓰는 것에 보면, 실제 코드와 설정을 분리하라는 말이 나옵니다. 한글로 번역된 좋은 페이지가 http://12factor.net/ko/ 를 보시면 됩니다.

이런 역할이 지금 얘기할 config server 입니다. 간단하게 설명하면, 다른 서버들의 설정을 제공하는 서비스입니다. 예를 들어, 어떤 API Server 가 있다면, 설정에 자신의 서비스 이름과 config server 주소만 적어주면 됩니다. 그리고 config server 에 해당 API Server에 사용될 port 정보라든지 여러가지 설정들이 들어가게 됩니다. 보통 시작 Port는 설정 파일이나, 코드에 박혀있는 경우도 흔한데, 이럴 경우 쉽게 설정을 변경하기가 어렵습니다.

즉 config server는 바뀌기 쉬운 설정을 외부에서 받아와서 각각의 서비스를 실행하게 하자가 핵심인 겁니다. 그래서 Eureka 서버라든지, API Gateway 라든지, 모든 서비스가 config server에 접근하여 해당 설정을 가져가게 됩니다.

그래서 spring cloud 를 쓸 때 가장 먼저 고민해야 할 것들이… config server 와 Eureka의 HA나 clustering 입니다. 전에 Eureka를 통해서 클러스터 멤버쉽을 관리해주는게 어떻게 보면 spring cloud의 핵심이라고 얘기했었는데, 당장 config server 와 Eureka 가 죽어버리면… 서비스에 문제가 생기게 됩니다.(정확히는 config server나 Eureka 가 죽었을 경우 기존 설정 값을 이용하게 되지만, 신규 장비가 추가되거나 빠져야 할때 이슈가 발생하게 됩니다.)

spring_cloud_01

실제로 Config Service 의 경우는 Stateless 한 서버라, 여러 개를 띄우고, L4든… 클라이언트에서 여러 대 중에 DNS RR이든 알아서 시도하게 하면… 큰 문제가 없습니다.(즉, 기존에  옛날부터 사용되던 HA 방법이 여기에 그대로 적용될 수 있습니다.)

Spring Cloud Config Server의 동작은 실제 코드의 경우는 이것보다 훨씬 복잡하지만,  단순하게 생각하면 아래와 같은 부분이 다입니다. 여기에 Rest Request Handler만 추가되어 있는… 즉 특정 설정을 읽어서 거기 내용을 읽어온 다음, 내부적으로 관리하는 겁니다.

import java.io.*;
import java.net.*;

public class Test {
public static void main(String [] args) {
try {
URL url = new URL("https://raw.githubusercontent.com/charsyam/springboot-config-server-config/master/reservation-service.properties");
BufferedReader in = new BufferedReader(new InputStreamReader(url.openStream()));

String inputLine;
while ((inputLine = in.readLine()) != null) {
System.out.println(inputLine);
}
in.close();
} catch (Exception e) {
}
}
}

Eureka 서버의 경우에는 Eureka Replication을 만들 수 있습니다. 그리고, 여러 대 Eureka 서버의 주소를 적어두면, 알아서, 접속을 시도하게 됩니다. Eureka의 경우는 여러 IDC에 존재할 수도 있으므로, 자신이 속해있는 IDC의 Eureka 서버로 우선 접근하게 하는 그런 옵션도 제공합니다.

이런 분산 시스템을 만들때는… SPOF(Single Point of Failure) 를 없애는게 중요합니다. 서비스 자체는 잘 만들었는데,  이런 부분을 놓치는 경우가 많기 때문에 간단하게 정리했습니다.(이제 한동안 Spring Cloud는 패스를…)

 

[입개발] Cloud Native 와 Spring Cloud

Cloud Native 와 Spring Cloud

이 글은 Josh Long 의 Spring Cloud 강연을 들은 거의 후기와 평소에 제가 생각하는 것들을… 섞어서
제 마음대로 써볼려고 합니다. 일단 제가 이분야 꼬꼬마이니… 틀리시면 조용히 욕만… T.T 그리고 많은
지적질 감사하겠습니다.

참고: 관련 코드는 https://github.com/joshlong/cloud-native-workshop 에서 보실 수 있습니다.

먼저 고백하자면, 전 Java 도 잘 모르고, Spring은 더더욱 잘 모르고, Spring Cloud는 정말 세미나 두 번 들은게 다입니다.

아직 저도 잘 못읽어본…(솔직히 목차만 본…) Cloud Native Java 를 한번 보시는 것도 꽤 도움이 될듯합니다. 재미있는건 제가 Spring Cloud에 대해서 들었던 두 번이 각각 이 책의 저자인 Kenny 와 Josh 입니다.(영어도 잘하고, 심지어 기술도 좋은… 아, 얘들 미국인이지…)

그렇다면 먼저 Spring Cloud 야… 이번에 Spring 에서 미는 기술이고…(정확히는 컴포넌트의 연합?)
그 전에 Cloud Native는 무엇일까요?

한국어 해석은 매우 분분할 수 있는데… 클라우드에서 태어난, 클라우드향?, 클라우드에 적합한?
일단 이 개념을 전도하는(그 전도 맞습니다.) Pivotal 에서는 다음과 같이 정의하고 있습니다.

“Cloud Native describes the patterns of high performing organizations delivering software faster, consistently and reliably at scale. Continuous delivery, DevOps, and microservices label the why, how and what of the cloud natives.”

간단하게 해석하자면…(저 영어 못해요.) 소프트웨어를 빨리 개발하고 릴리즈하면서도, 확장성도 있고 안전성이 높게 만드는 패턴이라고 합니다.(이런 구라를… 그러나 현실에도 장동건처럼 잘생긴데, 돈도 잘벌고, 성격까지 좋은 사람들도…)

이런 기사도 있습니다. “http://www.informationweek.com/cloud/platform-as-a-service/cloud-native-what-it-means-why-it-matters/d/d-id/1321539&#8221;

실제 세미나도 Spring Boot를 소개하는 것으로 시작합니다. 큰 설정 없이 endpoint에 대한 핸들러만
설정해주고 구현해주는 것만으로 쉽게 비지니스 로직의 구현이 가능합니다.

예를 들어, @Get(“/profiles/me”) 이런 annotation을 이용하고 해당 핸들러를 구현하면… 바로
profiles/me 에 대한 url에 대한 처리가 가능해지는 겁니다. 그런데 Play나 뭐, 다른 쉬운 프레임워크
들을 사용해도 이런식의 개발은 쉽게 가능합니다.

일단 제가 Spring Boot 부터 잘 모르니… 자세한 부분은 넘기겠습니다.(그렇다면 내가 아는 것은 뭐지?)

이제 한번 서비스를 하나 만든다고 생각해보겠습니다. 서비스 포트는 8080번 이고, db 서버는 어디에 접속하고… 이런 정보들을 보통 설정 파일을 만들어서 넣어두고, 구동시 해당 값을 읽어서 서비스가 실행되게 됩니다. (그것도 아니면… 보통… 아예 고정으로 소스코드에 넣어둘수도…)

그런데 이러면… 사실… 각각의 서버들의 설정이 서로 다를 수 있습니다. 어디는 새로운 설정을 쓴다거나… 이럴 때의 좋은 방안은 설정 서버를 두는 것입니다. 그래서 서비스가 최초 구동시에 설정 서버에서 자신의 설정을 읽어서 이걸로 서비스를 시작하는 거죠. 이제 모든 서버들이 같은 설정으로 구동할 수 있습니다.

이런 걸 구축할려면… 사실… 여러가지를 알아야 합니다. 일반적으로 zookeeper 나, consul, etcd 같은 곳에 이런 설정을 저장하거나, DB나 다른 스토리지에 저장하기도 합니다. 그런데 이런걸 config-server 란 녀석이 해주는 겁니다.(세상 참 많이 좋아졌습니다.)

이제 설정은 config server에서 가져와서 실행이 되도록 했습니다. 그런데 실제로 서비스를 운영하다 보면… endpoint를 설정에 의해서 바꿔준다든지… 아니면 살짝 로직을 더 추가한다든지가 필요할 경우가 생기는…(두 개의 API를 묶는다든지..) 이렬 경우 뭔가 라우팅을 바꿔준다거나, 실제로 해당 API들을 통해서 뭔가 다른 작업을 할수 있는 API Gateway 를 만들어 주는 것이, 여러가지 방면에서 편리합니다. 여기서 Zuul 이라는 것을 이용해서 아주 쉽게 추가가 가능합니다.(API Gateway 는 사실 여러가지 녀석들이 있습니다.)

그런데 서비스를 운영하다보면, 당장 DB라든지, 아니면 다른 서드파티 서버들과 통신해야 하는 경우가 생깁니다. 예를 들어, 서비스가 분리되어 있는데, 계정 서비스를 호출해서 해당 사용자가 유효한 사용자인지를 체크한다고 합시다.  이럴 경우 일단 계정 서비스의 주소를 알아야 합니다. 서버 IP가 1.2.3.4 라고 하면, 우리는 1.2.3.4 서버의 주소를 가지고 여기로 리퀘스트를 보내게 됩니다.

그런데 이런 ip가 바뀌어야 한다면? 그렇다면 도메인을 쓰는 건 어떨가요? 도메인을 쓰면, 서버 주소가 바뀌더라도 도메인은 그대로이니… 이제 account.server.com 이라고 가정한다면, 이 도메인 주소를 이용하면서 좀 더 유연성이 증가했습니다. 그런데, 도메인의 경우는 클라이언트가 해당 주소를 바꿀 경우, 클라이언트가 해당 주소를 캐시하고 있을 수도 있습니다. 분산 시스템에서는 이런 문제를 위해서 Service Registery 라는 것을 정의하는데 여기서 Spring Cloud는 Eureka 라는 것을 제공합니다.

즉, 서비스가 뜰때, Eureka 에 자기는 A 서비스라고 등록합니다. 그리고 누군가 A 서비스의 목록을 주세요 라고 요청하면 Eureka 서버는 그 서버의 목록을 전달합니다. 심지어 서버가 죽으면 자동으로 해당 서버를 서비스 목록에서 제거합니다. 이런것을 보통 클러스터 멤버쉽이라고 부릅니다. (사실 꼭 이런게 아니더라도, IDC 라면 L4 라든지, VIP를 이용해 VVRP 를 이용한 ip 의 변경등을 할 수 있지만, cloud 환경에서는 이런것들이 쉽지만은 않습니다.)

거기다가 일반적으로 멤버쉽을 독립적으로 관리하면, 서버 목록 가져와서 거기서 라운드 로빈이든 랜덤하게든 선택한 다음 해당 주소로 접속하는 코드를 쫘야 하는데, feign 과 ribbon을 이용해서 해당 서비스에 좀더 동일한 방법으로 Eureka에 myservice 라는 이름으로 등록이 되어있으면, 호스트 이름이 account1.service.com 일때,
http://myservice/profiles/me&#8221; 라는 형태로 연결을 시도하면 내부적으로
http://account1.service.com/profiles/me&#8221; 로 알아서 도메인을 바꾸어줍니다.

그리고 CircuitBreaker 라는 개념을 손쉽게 적용할 수 있도록 해줍니다. hystrix 라는 것을 이용하는데, 예를 들어, 특정 서비스에서 A,B,C,D 라는 서비스의 결과를 가져와서 보여준다고 합니다. A,B,C,D가 독립적이라서 A,B,C는 잘 동작하는데 D서비스가 서비스 불능 상태에 빠져있다면… (Eureka 등에 등록된 서버가 하나도 없다면?)  로직에 따라서는 해당 서비스 때문에, 전체 서비스 결과가 안나올수도 있고, D 서비스의 결과가 Timeout 이 나야만 그제서야 결과가 보여질 수도 있습니다.(깨진 내용이나 등등등) circuitbreaker 를 이용하게 되면, 몇번 시도해서 해당 서비스가 장애라는 판단이 내부적으로 서게 되면, 미리 등록된 failback을 실행시켜줍니다. 그리고 백그라운드에서 해당 서비스가 살아나는지 체크해서 살아나게 되면, 원래 호출을 하도록 해주는 것이 CircuitBreaker 입니다.

사실 위에 설명한 Zuul, Eureka, Feign, Ribbon, Hystrix 등은 전부 Netflix에서 이미 오래전 부터
공개했던 컴포넌트들입니다. 그런데 Spring Cloud의 대단한 점은 이런것들을 아주 쓰기 쉽게 Integration 시켰다는 점입니다. 원래는 개별적으로 사용하기에 조금 어려운 것들을 간단한 설정이나 annotation 만으로 아주 손쉽게 사용할 수 있게 만들었습니다.

결론적으로 Spring Cloud는 여러가지 분산 시스템 개발에 필요한 개념들을 아주 사용하기 쉽게 만들었습니다. 사실 이런 개념들을 알고 적용하는가가… 대규모 서비스에서 개발할 때 필요하는 노하우이기도합니다.(클라우드 나올때도 이제 뭐먹고 살지 고민했는데, Spring Cloud 보니… 이제 나는 굶어죽겠다는 생각만…)

다만, 아주 쉽게 사용하도록 해두었지만… 사실 이걸 제대로 운영할 수준으로 적용하는 것은 쉬운 일이 아닙니다. 왜왜왜? 라고 물어보신다면, 결국 서비스라는 것은 운영에 대한 이슈가 생기기 때문입니다. 그래서 반대로 이 Cloud Native라는 형태가 cloud service 와 결합하면서, 상당히 큰 강점이 생기기도 합니다.(뭔가 모순적이지만… 이게 사실인…)

다시 정리하자면, Spring Cloud는 Cloud Native라는 개념을 효과적으로 풀어낸 구현체입니다. 굉장히 쉽게 만들어둔… 다만… Spring Cloud에서 사용하는 컴포넌트들의 의미를 잘 파악하면, Cloud Native라는 개념을 다른 언어, 다른 컴포넌트로도 풀어낼 수 있습니다.(그러면 저 같은 입개발자는 설 곳이 없어지는…)

클라우드라는 개념이 나왔을때도, 충격을 받았지만…(SE분들이랑 우린 이제 뭐 먹고 사나 했었는데…) Spring Cloud가 나온걸 보니… 또 한번… 굶어죽을 듯한 두려움과, 세상이 또 뭔가 재밌게 변하겠구나라는 생각이 듭니다.

 

 

[입 개발] memcached 소스분석 #2

item

해당 부분은 실제로 hashtable을 구축해서, item을 어떻게 저장하고 찾을 것인지에 대한 부분이다.

#define POWER_LARGEST 256 /* actual cap is 255 */

#define LARGEST_ID POWER_LARGEST

static item *heads[LARGEST_ID];
static item *tails[LARGEST_ID];

POWER_LARGEST는 slabclass의 최대 개수이다. 즉 item *heads, *tails는 slabclass 마다 존재한다고
생각하면 간단하다.

일단 프로토콜 파싱 부분을 보자. 여기서는 일단 text 프로토콜만 살펴본다.(실제로는 거의 유사하므로…)
protocol 파싱 부분은 memcached.c 에 존재한다. get은 데이터를 찾아오는 명령이다. 실제로 item_get
을 호출해서 item이 존재하는지 체크한다.

/* ntokens is overwritten here... shrug.. */
static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, bool return_cas) {
char *key;
size_t nkey;
int i = 0;
item *it;
token_t *key_token = &tokens[KEY_TOKEN];
char *suffix;
assert(c != NULL);

do {
while(key_token->length != 0) {

key = key_token->value;
nkey = key_token->length;

if(nkey > KEY_MAX_LENGTH) {
out_string(c, "CLIENT_ERROR bad command line format");
while (i-- > 0) {
item_remove(*(c->ilist + i));
}
return;
}

it = item_get(key, nkey);
if (settings.detail_enabled) {
stats_prefix_record_get(key, nkey, NULL != it);
}
if (it) {
if (i >= c->isize) {
item **new_list = realloc(c->ilist, sizeof(item *) * c->isize * 2);
if (new_list) {
c->isize *= 2;
c->ilist = new_list;
} else {
STATS_LOCK();
stats.malloc_fails++;
STATS_UNLOCK();
item_remove(it);
break;
}
}

/*
* Construct the response. Each hit adds three elements to the
* outgoing data list:
* "VALUE "
* key
* " " + flags + " " + data length + "\r\n" + data (with \r\n)
*/

if (return_cas)
{
MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey,
it->nbytes, ITEM_get_cas(it));
/* Goofy mid-flight realloc. */
if (i >= c->suffixsize) {
char **new_suffix_list = realloc(c->suffixlist,
sizeof(char *) * c->suffixsize * 2);
if (new_suffix_list) {
c->suffixsize *= 2;
c->suffixlist = new_suffix_list;
} else {
STATS_LOCK();
stats.malloc_fails++;
STATS_UNLOCK();
item_remove(it);
break;
}
}

suffix = cache_alloc(c->thread->suffix_cache);
if (suffix == NULL) {
STATS_LOCK();
stats.malloc_fails++;
STATS_UNLOCK();
out_of_memory(c, "SERVER_ERROR out of memory making CAS suffix");
item_remove(it);
while (i-- > 0) {
item_remove(*(c->ilist + i));
}
return;
}
*(c->suffixlist + i) = suffix;
int suffix_len = snprintf(suffix, SUFFIX_SIZE,
" %llu\r\n",
(unsigned long long)ITEM_get_cas(it));
if (add_iov(c, "VALUE ", 6) != 0 ||
add_iov(c, ITEM_key(it), it->nkey) != 0 ||
add_iov(c, ITEM_suffix(it), it->nsuffix - 2) != 0 ||
add_iov(c, suffix, suffix_len) != 0 ||
add_iov(c, ITEM_data(it), it->nbytes) != 0)
{
item_remove(it);
break;
}
}
else
{
MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey,
it->nbytes, ITEM_get_cas(it));
if (add_iov(c, "VALUE ", 6) != 0 ||
add_iov(c, ITEM_key(it), it->nkey) != 0 ||
add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0)
{
item_remove(it);
break;
}
}
if (settings.verbose > 1) {
int ii;
fprintf(stderr, ">%d sending key ", c->sfd);
for (ii = 0; ii < it->nkey; ++ii) {
fprintf(stderr, "%c", key[ii]);
}
fprintf(stderr, "\n");
}

/* item_get() has incremented it->refcount for us */
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.slab_stats[ITEM_clsid(it)].get_hits++;
c->thread->stats.get_cmds++;
pthread_mutex_unlock(&c->thread->stats.mutex);
item_update(it);
*(c->ilist + i) = it;
i++;

} else {
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.get_misses++;
c->thread->stats.get_cmds++;
pthread_mutex_unlock(&c->thread->stats.mutex);
MEMCACHED_COMMAND_GET(c->sfd, key, nkey, -1, 0);
}

key_token++;
}

/*
* If the command string hasn't been fully processed, get the next set
* of tokens.
*/
if(key_token->value != NULL) {
ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
key_token = tokens;
}

} while(key_token->value != NULL);

c->icurr = c->ilist;
c->ileft = i;
if (return_cas) {
c->suffixcurr = c->suffixlist;
c->suffixleft = i;
}

if (settings.verbose > 1)
fprintf(stderr, ">%d END\n", c->sfd);

/*
If the loop was terminated because of out-of-memory, it is not
reliable to add END\r\n to the buffer, because it might not end
in \r\n. So we send SERVER_ERROR instead.
*/
if (key_token->value != NULL || add_iov(c, "END\r\n", 5) != 0
|| (IS_UDP(c->transport) && build_udp_headers(c) != 0)) {
out_of_memory(c, "SERVER_ERROR out of memory writing get response");
}
else {
conn_set_state(c, conn_mwrite);
c->msgcurr = 0;
}
}

item_get 은 다음과 같이 구현되어 있다. key 를 hash해서 hv 값을 만든다.
이때 hash 함수는 jenkins hash 와 murmur3가 있는데 기본으로 jenkins hash가 적용된다.
(설정가능)

/*
* Returns an item if it hasn't been marked as expired,
* lazy-expiring as needed.
*/
item *item_get(const char *key, const size_t nkey) {
item *it;
uint32_t hv;
hv = hash(key, nkey);
item_lock(hv);
it = do_item_get(key, nkey, hv);
item_unlock(hv);
return it;
}

먼저 item_lock 코드를 살펴보자.

#define hashsize(n) ((unsigned long int)1<<(n))
#define hashmask(n) (hashsize(n)-1)

void item_lock(uint32_t hv) {
mutex_lock(&item_locks[hv & hashmask(item_lock_hashpower)]);
}

먼저 해당 item 이 들어있는 영역의 Lock을 건다. 여기서 이 item_locks는
memcached_thread_init() 함수의 thread 개수에 따라서 바뀌게 된다. thread
개수가 3개 이하면 1024개의 item_locks 가 생기고 5개 이상이면 8192개의
item_locks 가 생기게 된다.

이 의미는 해당 영역만 lock 이 걸리므로 다른 hash 결과와 연관된 item의 경우는
다른 thread에서 여전히 접근이 가능하다는 것이다.

/*
* Initializes the thread subsystem, creating various worker threads.
*
* nthreads Number of worker event handler threads to spawn
* main_base Event base for main thread
*/
void memcached_thread_init(int nthreads, struct event_base *main_base) {
int i;
int power;

for (i = 0; i < POWER_LARGEST; i++) {
pthread_mutex_init(&lru_locks[i], NULL);
}
pthread_mutex_init(&worker_hang_lock, NULL);

pthread_mutex_init(&init_lock, NULL);
pthread_cond_init(&init_cond, NULL);

pthread_mutex_init(&cqi_freelist_lock, NULL);
cqi_freelist = NULL;

/* Want a wide lock table, but don't waste memory */
if (nthreads < 3) {
power = 10;
} else if (nthreads < 4) {
power = 11;
} else if (nthreads < 5) {
power = 12;
} else {
/* 8192 buckets, and central locks don't scale much past 5 threads */
power = 13;
}

if (power >= hashpower) {
fprintf(stderr, "Hash table power size (%d) cannot be equal to or less than item lock table (%d)\n", hashpower, power);
fprintf(stderr, "Item lock table grows with `-t N` (worker threadcount)\n");
fprintf(stderr, "Hash table grows with `-o hashpower=N` \n");
exit(1);
}

item_lock_count = hashsize(power);
item_lock_hashpower = power;

item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
if (! item_locks) {
perror("Can't allocate item locks");
exit(1);
}
for (i = 0; i < item_lock_count; i++) {
pthread_mutex_init(&item_locks[i], NULL);
}

threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
if (! threads) {
perror("Can't allocate thread descriptors");
exit(1);
}

dispatcher_thread.base = main_base;
dispatcher_thread.thread_id = pthread_self();

for (i = 0; i < nthreads; i++) {
int fds[2];
if (pipe(fds)) {
perror("Can't create notify pipe");
exit(1);
}

threads[i].notify_receive_fd = fds[0];
threads[i].notify_send_fd = fds[1];

setup_thread(&threads[i]);
/* Reserve three fds for the libevent base, and two for the pipe */
stats.reserved_fds += 5;
}

/* Create threads after we've done all the libevent setup. */
for (i = 0; i < nthreads; i++) {
create_worker(worker_libevent, &threads[i]);
}

/* Wait for all the threads to set themselves up before returning. */
pthread_mutex_lock(&init_lock);
wait_for_thread_registration(nthreads);
pthread_mutex_unlock(&init_lock);
}

이제 다시 do_item_get() 함수를 살펴보자.

/** wrapper around assoc_find which does the lazy expiration logic */
item *do_item_get(const char *key, const size_t nkey, const uint32_t hv) {
item *it = assoc_find(key, nkey, hv);
if (it != NULL) {
refcount_incr(&it->refcount);
/* Optimization for slab reassignment. prevents popular items from
* jamming in busy wait. Can only do this here to satisfy lock order
* of item_lock, slabs_lock. */
/* This was made unsafe by removal of the cache_lock:
* slab_rebalance_signal and slab_rebal.* are modified in a separate
* thread under slabs_lock. If slab_rebalance_signal = 1, slab_start =
* NULL (0), but slab_end is still equal to some value, this would end
* up unlinking every item fetched.
* This is either an acceptable loss, or if slab_rebalance_signal is
* true, slab_start/slab_end should be put behind the slabs_lock.
* Which would cause a huge potential slowdown.
* Could also use a specific lock for slab_rebal.* and
* slab_rebalance_signal (shorter lock?)
*/
/*if (slab_rebalance_signal &&
((void *)it >= slab_rebal.slab_start && (void *)it < slab_rebal.slab_end)) {
do_item_unlink(it, hv);
do_item_remove(it);
it = NULL;
}*/
}
int was_found = 0;

if (settings.verbose > 2) {
int ii;
if (it == NULL) {
fprintf(stderr, "> NOT FOUND ");
} else {
fprintf(stderr, "> FOUND KEY ");
was_found++;
}
for (ii = 0; ii < nkey; ++ii) {
fprintf(stderr, "%c", key[ii]);
}
}

if (it != NULL) {
if (item_is_flushed(it)) {
do_item_unlink(it, hv);
do_item_remove(it);
it = NULL;
if (was_found) {
fprintf(stderr, " -nuked by flush");
}
} else if (it->exptime != 0 && it->exptime <= current_time) {
do_item_unlink(it, hv);
do_item_remove(it);
it = NULL;
if (was_found) {
fprintf(stderr, " -nuked by expire");
}
} else {
it->it_flags |= ITEM_FETCHED|ITEM_ACTIVE;
DEBUG_REFCNT(it, '+');
}
}

if (settings.verbose > 2)
fprintf(stderr, "\n");

return it;
}

실제로 item은 다시 assoc_find 함수를 통해서 hashtable에서 item을 가져오게 된다.
item이 있다면, flush 되었는지, expire 가 되었는지 확인해서 이 경우에는 item을 지우고
(정확히는 item 객체를 풀에 반납하고) NULL을 반납하게 된다.

memcached의 경우는 내부적으로 flush_all 이라는 명령이 오면.. 현재 시간을 저장하고, 그것보다
이전에 생성된 데이터의 경우에 (item->time 에 생성시간이 저장됨.) 지워버림. 이것을 flush 라고 함

item *assoc_find(const char *key, const size_t nkey, const uint32_t hv) {
item *it;
unsigned int oldbucket;

if (expanding &&
(oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
{
it = old_hashtable[oldbucket];
} else {
it = primary_hashtable[hv & hashmask(hashpower)];
}

item *ret = NULL;
int depth = 0;
while (it) {
if ((nkey == it->nkey) && (memcmp(key, ITEM_key(it), nkey) == 0)) {
ret = it;
break;
}
it = it->h_next;
++depth;
}
MEMCACHED_ASSOC_FIND(key, nkey, depth);
return ret;
}

memcached 에서 primary_hashtable 와 old_hashtable 가 있는데 old_hashtable는
hashtable의 expanding 중에만 존재하는 hashtable이다. 정확히는 확장 전까지의
primary_hashtable이 old_hashtable이 된다. 확장 전의 bucket 이면 old_hashtable에서
데이터를 찾고, 이미 확장된 bucket 은 primary_hashtable 에서 찾는다.

memcached 의 hashtable은 일반적으로 해당 hashtable 안에 linked list 가 들어가 있는
구조이다. 즉 hv 값으로 hashtable을 찾고 거기서는 linked list의 선형 탐색을 하게 되는
것이다. java8에서의 hashmap은 안에 tree 형태로 데이터가 들어가서 많은 데이터를 가지고
있을 경우, 더 빠른 탐색 시간을 보장하는데… memcached 와 redis 등은 hashtable extending
으로 이런 이슈를 해결한다.

이제 hashtable에 저장하는 코드를 살펴보자. process_update_command() 에서 시작하게 된다.

static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {
char *key;
size_t nkey;
unsigned int flags;
int32_t exptime_int = 0;
time_t exptime;
int vlen;
uint64_t req_cas_id=0;
item *it;

assert(c != NULL);

set_noreply_maybe(c, tokens, ntokens);

if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
out_string(c, "CLIENT_ERROR bad command line format");
return;
}

key = tokens[KEY_TOKEN].value;
nkey = tokens[KEY_TOKEN].length;

if (! (safe_strtoul(tokens[2].value, (uint32_t *)&flags)
&& safe_strtol(tokens[3].value, &exptime_int)
&& safe_strtol(tokens[4].value, (int32_t *)&vlen))) {
out_string(c, "CLIENT_ERROR bad command line format");
return;
}

/* Ubuntu 8.04 breaks when I pass exptime to safe_strtol */
exptime = exptime_int;

/* Negative exptimes can underflow and end up immortal. realtime() will
immediately expire values that are greater than REALTIME_MAXDELTA, but less
than process_started, so lets aim for that. */
if (exptime < 0)
exptime = REALTIME_MAXDELTA + 1;

// does cas value exist?
if (handle_cas) {
if (!safe_strtoull(tokens[5].value, &req_cas_id)) {
out_string(c, "CLIENT_ERROR bad command line format");
return;
}
}

vlen += 2;
if (vlen < 0 || vlen - 2 < 0) {
out_string(c, "CLIENT_ERROR bad command line format");
return;
}

if (settings.detail_enabled) {
stats_prefix_record_set(key, nkey);
}

it = item_alloc(key, nkey, flags, realtime(exptime), vlen);

if (it == 0) {
if (! item_size_ok(nkey, flags, vlen))
out_string(c, "SERVER_ERROR object too large for cache");
else
out_of_memory(c, "SERVER_ERROR out of memory storing object");
/* swallow the data line */
c->write_and_go = conn_swallow;
c->sbytes = vlen;

/* Avoid stale data persisting in cache because we failed alloc.
* Unacceptable for SET. Anywhere else too? */
if (comm == NREAD_SET) {
it = item_get(key, nkey);
if (it) {
item_unlink(it);
item_remove(it);
}
}

return;
}
ITEM_set_cas(it, req_cas_id);

c->item = it;
c->ritem = ITEM_data(it);
c->rlbytes = it->nbytes;
c->cmd = comm;
conn_set_state(c, conn_nread);
}

일단 할당을 위해서 item_alloc() 을 호출하게 됩니다. item_alloc은 do_item_alloc 에서
내부적으로 lock을 사용하므로 따로 lock을 사용하지 않습니다.

item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) {
item *it;
/* do_item_alloc handles its own locks */
it = do_item_alloc(key, nkey, flags, exptime, nbytes, 0);
return it;
}

do_item_alloc 에서는 먼저 item 이 사용해야할 size를 구합니다. slabclass가 size 별로
구분되어 있는 걸 기억한다면, 아 이제 slabclass 에서 item 을 가져오겠다는 걸 예상할 수 있습니다.
item_make_header() 는 item이 차지할 size를 알려주는 함수입니다.

slab_alloc을 호출해서 사용할 item 을 가져옵니다.(이 때 실제로 slab에서 할당됩니다.)
여기서는 실제 item 구조체에 data, flags, key등을 셋팅하고 item 구조체를 넘겨줍니다.

item *do_item_alloc(char *key, const size_t nkey, const int flags,
const rel_time_t exptime, const int nbytes,
const uint32_t cur_hv) {
int i;
uint8_t nsuffix;
item *it = NULL;
char suffix[40];
unsigned int total_chunks;
size_t ntotal = item_make_header(nkey + 1, flags, nbytes, suffix, &nsuffix);
if (settings.use_cas) {
ntotal += sizeof(uint64_t);
}

unsigned int id = slabs_clsid(ntotal);
if (id == 0)
return 0;

/* If no memory is available, attempt a direct LRU juggle/eviction */
/* This is a race in order to simplify lru_pull_tail; in cases where
* locked items are on the tail, you want them to fall out and cause
* occasional OOM's, rather than internally work around them.
* This also gives one fewer code path for slab alloc/free
*/
for (i = 0; i < 5; i++) {
/* Try to reclaim memory first */
if (!settings.lru_maintainer_thread) {
lru_pull_tail(id, COLD_LRU, 0, false, cur_hv);
}
it = slabs_alloc(ntotal, id, &total_chunks, 0);
if (settings.expirezero_does_not_evict)
total_chunks -= noexp_lru_size(id);
if (it == NULL) {
if (settings.lru_maintainer_thread) {
lru_pull_tail(id, HOT_LRU, total_chunks, false, cur_hv);
lru_pull_tail(id, WARM_LRU, total_chunks, false, cur_hv);
lru_pull_tail(id, COLD_LRU, total_chunks, true, cur_hv);
} else {
lru_pull_tail(id, COLD_LRU, 0, true, cur_hv);
}
} else {
break;
}
}

if (i > 0) {
pthread_mutex_lock(&lru_locks[id]);
itemstats[id].direct_reclaims += i;
pthread_mutex_unlock(&lru_locks[id]);
}

if (it == NULL) {
pthread_mutex_lock(&lru_locks[id]);
itemstats[id].outofmemory++;
pthread_mutex_unlock(&lru_locks[id]);
return NULL;
}

assert(it->slabs_clsid == 0);
//assert(it != heads[id]);

/* Refcount is seeded to 1 by slabs_alloc() */
it->next = it->prev = it->h_next = 0;
/* Items are initially loaded into the HOT_LRU. This is '0' but I want at
* least a note here. Compiler (hopefully?) optimizes this out.
*/
if (settings.lru_maintainer_thread) {
if (exptime == 0 && settings.expirezero_does_not_evict) {
id |= NOEXP_LRU;
} else {
id |= HOT_LRU;
}
} else {
/* There is only COLD in compat-mode */
id |= COLD_LRU;
}
it->slabs_clsid = id;

DEBUG_REFCNT(it, '*');
it->it_flags = settings.use_cas ? ITEM_CAS : 0;
it->nkey = nkey;
it->nbytes = nbytes;
memcpy(ITEM_key(it), key, nkey);
it->exptime = exptime;
memcpy(ITEM_suffix(it), suffix, (size_t)nsuffix);
it->nsuffix = nsuffix;
return it;
}

그런데 뭔가 이상한 걸 느끼지 못했나요? get 은 실제 hashtable에서 데이터를 가져왔는데…
왜 set 은 실제 데이터를 넣는 부분이 없을까요? 실제 memcached 코드에서는 이 부분이 나눠져
있습니다. 먼저 memcached text 프로토콜에서 set 프로토콜을 한번 살펴봅시다.
먼저 set 명령은 2 line으로 구분되고 첫번째 라인은 다음과 같이 구성됩니다.

SET key [flags] [exptime] length [noreply]

중간에 length 가 보이지요? 그럼 그 다음중에는 무엇이 와야 할까요? 네… 정답입니다.
위에서 length 지정한 크기 만큼의 실제 데이터죠. 그래서 process_update_command의 마지막에서
다음과 같이 conn_set_state 함수를 통해서 현재 connection의 상태를 conn_nread로 만들어둡니다.

c->item = it;
c->ritem = ITEM_data(it);
c->rlbytes = it->nbytes;
c->cmd = comm;
conn_set_state(c, conn_nread);

그러면 실제 event_handler 에서 conn_nread 상태에서는 다음과 같이 호출되게 됩니다.

case conn_nread:
if (c->rlbytes == 0) {
complete_nread(c);
break;
}

/* Check if rbytes < 0, to prevent crash */
if (c->rlbytes < 0) {
if (settings.verbose) {
fprintf(stderr, "Invalid rlbytes to read: len %d\n", c->rlbytes);
}
conn_set_state(c, conn_closing);
break;
}

/* first check if we have leftovers in the conn_read buffer */
if (c->rbytes > 0) {
int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
if (c->ritem != c->rcurr) {
memmove(c->ritem, c->rcurr, tocopy);
}
c->ritem += tocopy;
c->rlbytes -= tocopy;
c->rcurr += tocopy;
c->rbytes -= tocopy;
if (c->rlbytes == 0) {
break;
}
}

/* now try reading from the socket */
res = read(c->sfd, c->ritem, c->rlbytes);
if (res > 0) {
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.bytes_read += res;
pthread_mutex_unlock(&c->thread->stats.mutex);
if (c->rcurr == c->ritem) {
c->rcurr += res;
}
c->ritem += res;
c->rlbytes -= res;
break;
}
if (res == 0) { /* end of stream */
conn_set_state(c, conn_closing);
break;
}
if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
if (!update_event(c, EV_READ | EV_PERSIST)) {
if (settings.verbose > 0)
fprintf(stderr, "Couldn't update event\n");
conn_set_state(c, conn_closing);
break;
}
stop = true;
break;
}
/* otherwise we have a real error, on which we close the connection */
if (settings.verbose > 0) {
fprintf(stderr, "Failed to read, and not due to blocking:\n"
"errno: %d %s \n"
"rcurr=%lx ritem=%lx rbuf=%lx rlbytes=%d rsize=%d\n",
errno, strerror(errno),
(long)c->rcurr, (long)c->ritem, (long)c->rbuf,
(int)c->rlbytes, (int)c->rsize);
}
conn_set_state(c, conn_closing);
break;

conn_nread 에서는 c->rlbytes 값이 0이 될때까지 계속 이벤트가 발생할 때 마다
read 함수를 통해서 데이터를 읽어들이게 됩니다. 그리고 c->rlbytes == 0이면
필요한 만큼의 데이터를 읽었다는 의미이므로 complete_nread()를 호출합니다.

static void complete_nread(conn *c) {
assert(c != NULL);
assert(c->protocol == ascii_prot
|| c->protocol == binary_prot);

if (c->protocol == ascii_prot) {
complete_nread_ascii(c);
} else if (c->protocol == binary_prot) {
complete_nread_binary(c);
}
}

이제 complete_nread_ascii 함수에서 store_item()을 호출해서 최종 마무리 작업을 하게 됩니다.

/*
* we get here after reading the value in set/add/replace commands. The command
* has been stored in c->cmd, and the item is ready in c->item.
*/
static void complete_nread_ascii(conn *c) {
assert(c != NULL);

item *it = c->item;
int comm = c->cmd;
enum store_item_type ret;

pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.slab_stats[ITEM_clsid(it)].set_cmds++;
pthread_mutex_unlock(&c->thread->stats.mutex);

if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
out_string(c, "CLIENT_ERROR bad data chunk");
} else {
ret = store_item(it, comm, c);

switch (ret) {
case STORED:
out_string(c, "STORED");
break;
case EXISTS:
out_string(c, "EXISTS");
break;
case NOT_FOUND:
out_string(c, "NOT_FOUND");
break;
case NOT_STORED:
out_string(c, "NOT_STORED");
break;
default:
out_string(c, "SERVER_ERROR Unhandled storage type.");
}
}

item_remove(c->item); /* release the c->item reference */
c->item = 0;
}

store_item 은 다시 item_lock을 걸고 do_store_item 을 호출합니다.

/*
* Stores an item in the cache (high level, obeys set/add/replace semantics)
*/
enum store_item_type store_item(item *item, int comm, conn* c) {
enum store_item_type ret;
uint32_t hv;

hv = hash(ITEM_key(item), item->nkey);
item_lock(hv);
ret = do_store_item(item, comm, c, hv);
item_unlock(hv);
return ret;
}

do_store_item 은 좀 더 복잡한 작업을 합니다. 최초의 함수명을 기억하시나요?
process_update_command 입니다. 즉 없던 item이 추가되는 경우만 아니라, 기존의 item 이
바뀌는 경우도 당연히 발생합니다.

그래서 먼저 do_item_get() 을 호출해서 item을 찾습니다. 일단은 기존에 item 이 없던
새로운 item이 추가되는 경우만 살펴보도록 하겠습니다.

/*
* Stores an item in the cache according to the semantics of one of the set
* commands. In threaded mode, this is protected by the cache lock.
*
* Returns the state of storage.
*/
enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t hv) {
char *key = ITEM_key(it);
item *old_it = do_item_get(key, it->nkey, hv);
enum store_item_type stored = NOT_STORED;

item *new_it = NULL;
int flags;

if (old_it != NULL && comm == NREAD_ADD) {
/* add only adds a nonexistent item, but promote to head of LRU */
do_item_update(old_it);
} else if (!old_it && (comm == NREAD_REPLACE
|| comm == NREAD_APPEND || comm == NREAD_PREPEND))
{
/* replace only replaces an existing value; don't store */
} else if (comm == NREAD_CAS) {
/* validate cas operation */
if(old_it == NULL) {
// LRU expired
stored = NOT_FOUND;
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.cas_misses++;
pthread_mutex_unlock(&c->thread->stats.mutex);
}
else if (ITEM_get_cas(it) == ITEM_get_cas(old_it)) {
// cas validates
// it and old_it may belong to different classes.
// I'm updating the stats for the one that's getting pushed out
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.slab_stats[ITEM_clsid(old_it)].cas_hits++;
pthread_mutex_unlock(&c->thread->stats.mutex);

item_replace(old_it, it, hv);
stored = STORED;
} else {
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.slab_stats[ITEM_clsid(old_it)].cas_badval++;
pthread_mutex_unlock(&c->thread->stats.mutex);

if(settings.verbose > 1) {
fprintf(stderr, "CAS: failure: expected %llu, got %llu\n",
(unsigned long long)ITEM_get_cas(old_it),
(unsigned long long)ITEM_get_cas(it));
}
stored = EXISTS;
}
} else {
/*
* Append - combine new and old record into single one. Here it's
* atomic and thread-safe.
*/
if (comm == NREAD_APPEND || comm == NREAD_PREPEND) {
/*
* Validate CAS
*/
if (ITEM_get_cas(it) != 0) {
// CAS much be equal
if (ITEM_get_cas(it) != ITEM_get_cas(old_it)) {
stored = EXISTS;
}
}

if (stored == NOT_STORED) {
/* we have it and old_it here - alloc memory to hold both */
/* flags was already lost - so recover them from ITEM_suffix(it) */

flags = (int) strtol(ITEM_suffix(old_it), (char **) NULL, 10);

new_it = do_item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */, hv);

if (new_it == NULL) {
/* SERVER_ERROR out of memory */
if (old_it != NULL)
do_item_remove(old_it);

return NOT_STORED;
}

/* copy data from it and old_it to new_it */

if (comm == NREAD_APPEND) {
memcpy(ITEM_data(new_it), ITEM_data(old_it), old_it->nbytes);
memcpy(ITEM_data(new_it) + old_it->nbytes - 2 /* CRLF */, ITEM_data(it), it->nbytes);
} else {
/* NREAD_PREPEND */
memcpy(ITEM_data(new_it), ITEM_data(it), it->nbytes);
memcpy(ITEM_data(new_it) + it->nbytes - 2 /* CRLF */, ITEM_data(old_it), old_it->nbytes);
}

it = new_it;
}
}

if (stored == NOT_STORED) {
if (old_it != NULL)
item_replace(old_it, it, hv);
else
do_item_link(it, hv);

c->cas = ITEM_get_cas(it);

stored = STORED;
}
}

if (old_it != NULL)
do_item_remove(old_it); /* release our reference */
if (new_it != NULL)
do_item_remove(new_it);

if (stored == STORED) {
c->cas = ITEM_get_cas(it);
}

return stored;
}

기존 item 이 있다면(update), item_replace 이 호출되고, 새로운 item(set) 이라면
do_item_link 가 호출되게 됩니다.

int do_item_link(item *it, const uint32_t hv) {
MEMCACHED_ITEM_LINK(ITEM_key(it), it->nkey, it->nbytes);
assert((it->it_flags & (ITEM_LINKED|ITEM_SLABBED)) == 0);
it->it_flags |= ITEM_LINKED;
it->time = current_time;

STATS_LOCK();
stats.curr_bytes += ITEM_ntotal(it);
stats.curr_items += 1;
stats.total_items += 1;
STATS_UNLOCK();

/* Allocate a new CAS ID on link. */
ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
assoc_insert(it, hv);
item_link_q(it);
refcount_incr(&it->refcount);

return 1;
}

여기서 최종적으로 assoc_insert 를 호출하면서 hashtable에 데이터가 저장됩니다.
그리고 아까 언급했던 item->time 에 현재 시간이 들어갑니다. assoc_find 와 거의 동일합니다.
hv 값을 통해서 hashtable의 index를 구하고 거기 첫 값을 it->h_next 로 설정하고,
해당 table의 첫번째 값을 새로운 item으로 설정합니다. 즉, hashtable의 head로 새로운
item이 들어가는 거죠. 이때 hash_items 가 (hashsize(hashpower) * 3) / 2 보다 커지면
두배로 확장하게 됩니다.

/* Note: this isn't an assoc_update. The key must not already exist to call this */
int assoc_insert(item *it, const uint32_t hv) {
unsigned int oldbucket;

// assert(assoc_find(ITEM_key(it), it->nkey) == 0); /* shouldn't have duplicately named things defined */

if (expanding &&
(oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
{
it->h_next = old_hashtable[oldbucket];
old_hashtable[oldbucket] = it;
} else {
it->h_next = primary_hashtable[hv & hashmask(hashpower)];
primary_hashtable[hv & hashmask(hashpower)] = it;
}

pthread_mutex_lock(&hash_items_counter_lock);
hash_items++;
if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) {
assoc_start_expand();
}
pthread_mutex_unlock(&hash_items_counter_lock);

MEMCACHED_ASSOC_INSERT(ITEM_key(it), it->nkey, hash_items);
return 1;
}

 

[입 개발] memcached 소스 분석 #1

slab

MAX_NUMBER_OF_SLAB_CLASSES = 63 + 1로 정의됨

static slabclass_t slabclass[MAX_NUMBER_OF_SLAB_CLASSES];

slabclass_t 구조체는 다음과 같다. slabclass 에서 list_size 는
slab_list 의 capacity 이다. 그리고 slabs는 현재 몇개의 slab이
할당되었는지는 나타낸다.(현재의 length)

typedef struct {
unsigned int size; /* sizes of items */
unsigned int perslab; /* how many items per slab */

void *slots; /* list of item ptrs */
unsigned int sl_curr; /* total free items in list */

unsigned int slabs; /* how many slabs were allocated for this class */

void **slab_list; /* array of slab pointers */
unsigned int list_size; /* size of prev array */

size_t requested; /* The number of requested bytes */
} slabclass_t;

memcache 는 시작시에 factor 값에 의해서 사이즈 Range 별로 slabclass id를 가지도록
설정한다. slabs_preallocate 을 이용해서 미리 메모리를 할당해둘 수 도 있다.
이걸 이용하지 않으면 chunk 가 필요할 때 chunk를 할당한다. 기본 chunk size는 1MB이다.

#define CHUNK_ALIGN_BYTES 8

CHUNK_ALIGN_BYTES = 8

void slabs_init(const size_t limit, const double factor, const bool prealloc) {
int i = POWER_SMALLEST - 1;
//default chunk_size = 48
//sizeof(item) = 48
//factor = 1.25
//item_size_max = 1024 * 1024
//POWER_SMALLEST = 1

unsigned int size = sizeof(item) + settings.chunk_size;

mem_limit = limit;

if (prealloc) {
/* Allocate everything in a big chunk with malloc */
mem_base = malloc(mem_limit);
if (mem_base != NULL) {
mem_current = mem_base;
mem_avail = mem_limit;
} else {
fprintf(stderr, "Warning: Failed to allocate requested memory in"
" one large chunk.\nWill allocate in smaller chunks\n");
}
}

memset(slabclass, 0, sizeof(slabclass));

while (++i < MAX_NUMBER_OF_SLAB_CLASSES-1 && size <= settings.item_size_max / factor) {
/* Make sure items are always n-byte aligned */
if (size % CHUNK_ALIGN_BYTES)
size += CHUNK_ALIGN_BYTES - (size % CHUNK_ALIGN_BYTES);

slabclass[i].size = size;
slabclass[i].perslab = settings.item_size_max / slabclass[i].size;
size *= factor;
if (settings.verbose > 1) {
fprintf(stderr, "slab class %3d: chunk size %9u perslab %7u\n",
i, slabclass[i].size, slabclass[i].perslab);
}
}

power_largest = i;
slabclass[power_largest].size = settings.item_size_max;
slabclass[power_largest].perslab = 1;
if (settings.verbose > 1) {
fprintf(stderr, "slab class %3d: chunk size %9u perslab %7u\n",
i, slabclass[i].size, slabclass[i].perslab);
}

/* for the test suite: faking of how much we've already malloc'd */
{
char *t_initial_malloc = getenv("T_MEMD_INITIAL_MALLOC");
if (t_initial_malloc) {
mem_malloced = (size_t)atol(t_initial_malloc);
}

}

if (prealloc) {
slabs_preallocate(power_largest);
}
}

slab size 초기화는 다음과 같이 이루어진다. 이 의미는 size가 96까지를 다루는 slabclass
는 그 안에 10922개의 아이템을 저장할 수 있다는 의미이다.

size: 96 perslab: 10922
size: 120 perslab: 8738
size: 152 perslab: 6898
size: 192 perslab: 5461
size: 240 perslab: 4369
size: 304 perslab: 3449
size: 384 perslab: 2730
size: 480 perslab: 2184
size: 600 perslab: 1747
size: 752 perslab: 1394
size: 944 perslab: 1110
size: 1184 perslab: 885
size: 1480 perslab: 708
size: 1856 perslab: 564
size: 2320 perslab: 451
size: 2904 perslab: 361
size: 3632 perslab: 288
size: 4544 perslab: 230
size: 5680 perslab: 184
size: 7104 perslab: 147
size: 8880 perslab: 118
size: 11104 perslab: 94
size: 13880 perslab: 75
size: 17352 perslab: 60
size: 21696 perslab: 48
size: 27120 perslab: 38
size: 33904 perslab: 30
size: 42384 perslab: 24
size: 52984 perslab: 19
size: 66232 perslab: 15
size: 82792 perslab: 12
size: 103496 perslab: 10
size: 129376 perslab: 8
size: 161720 perslab: 6
size: 202152 perslab: 5
size: 252696 perslab: 4
size: 315872 perslab: 3
size: 394840 perslab: 2
size: 493552 perslab: 2
size: 616944 perslab: 1
size: 771184 perslab: 1

이렇게 만들어진 slabclass 는 slabs_clsid 를 통해서 접근 할 수 있다. 인덱스를
하나씩 증가하면 적절한 크기의 slabclass를 선택한다.

unsigned int slabs_clsid(const size_t size) {
int res = POWER_SMALLEST;

if (size == 0)
return 0;
while (size > slabclass[res].size)
if (res++ == power_largest) /* won't fit in the biggest slab */
return 0;
return res;
}

slabclass는 item 과도 관계가 있다. 실제 slab_alloc 은 item_alloc 이 호출될때
호출되게 된다.

typedef unsigned int rel_time_t;

typedef struct _stritem {
/* Protected by LRU locks */
struct _stritem *next;
struct _stritem *prev;
/* Rest are protected by an item lock */
struct _stritem *h_next; /* hash chain next */
rel_time_t time; /* least recent access */
rel_time_t exptime; /* expire time */
int nbytes; /* size of data */
unsigned short refcount;
uint8_t nsuffix; /* length of flags-and-length string */
uint8_t it_flags; /* ITEM_* above */
uint8_t slabs_clsid;/* which slab class we're in */
uint8_t nkey; /* key length, w/terminating null and padding */
/* this odd type prevents type-punning issues when we do
* the little shuffle to save space when not using CAS. */
union {
uint64_t cas;
char end;
} data[];
/* if it_flags & ITEM_CAS we have 8 bytes CAS */
/* then null-terminated key */
/* then " flags length\r\n" (no terminating null) */
/* then data with terminating \r\n (no terminating null; it's binary!) */
} item;

최초 item 할당시에 당연히 slab 도 없기 때문에, do_slabs_newslab()를 호출하게 된다.
SLAB_GLOBAL_PAGE_POOL 는 Reassignment를 위한 것이므로 최초에는 slabclass만 존재하고
내부에 할당된 slab은 없음. get_page_from_global_pool() 에서도 데이터가 없으므로 실제
메모리 할당은 memory_allocate 에 의해서 이루어진다.

static int do_slabs_newslab(const unsigned int id) {
slabclass_t *p = &slabclass[id];
slabclass_t *g = &slabclass[SLAB_GLOBAL_PAGE_POOL];
int len = settings.slab_reassign ? settings.item_size_max
: p->size * p->perslab;
char *ptr;

if ((mem_limit && mem_malloced + len > mem_limit && p->slabs > 0
&& g->slabs == 0)) {
mem_limit_reached = true;
MEMCACHED_SLABS_SLABCLASS_ALLOCATE_FAILED(id);
return 0;
}

if ((grow_slab_list(id) == 0) ||
(((ptr = get_page_from_global_pool()) == NULL) &&
((ptr = memory_allocate((size_t)len)) == 0))) {

MEMCACHED_SLABS_SLABCLASS_ALLOCATE_FAILED(id);
return 0;
}

memset(ptr, 0, (size_t)len);
split_slab_page_into_freelist(ptr, id);

p->slab_list[p->slabs++] = ptr;
MEMCACHED_SLABS_SLABCLASS_ALLOCATE(id);

return 1;
}

할당된 메모리 ptr은 split_slab_page_into_freelist 에 의해서 초기화 된다.

static void split_slab_page_into_freelist(char *ptr, const unsigned int id) {
slabclass_t *p = &slabclass[id];
int x;
for (x = 0; x < p->perslab; x++) {
do_slabs_free(ptr, 0, id);
ptr += p->size;
}
}

p->size는 slabclass item의 크기이므로 그 값만큼 증가하면서 do_slabs_free 를 호출해서
item을 저장할 수 있는 형태로 정보를 저장한다. ptr 이 p->size 만큼 계속 증가하는데 주목하자.

static void do_slabs_free(void *ptr, const size_t size, unsigned int id) {
slabclass_t *p;
item *it;

assert(id >= POWER_SMALLEST && id <= power_largest);
if (id < POWER_SMALLEST || id > power_largest)
return;

MEMCACHED_SLABS_FREE(size, id, ptr);
p = &slabclass[id];

it = (item *)ptr;
it->it_flags = ITEM_SLABBED;
it->slabs_clsid = 0;
it->prev = 0;
it->next = p->slots;
if (it->next) it->next->prev = it;
p->slots = it;

p->sl_curr++;
p->requested -= size;
return;
}

p는 slabclass, it는 해당 ptr의 메모리 영역이다. p->slots는 기존에 할당된 item list를
가리키는 포인터이다. 즉 it->next 로 현재의 it->next 에 기존의 item list를 저장하고,
기존의 item list의 prev는 새롭게 추가될 it가 된다. 그리고 다시 p->slots은 it를 가리키게
되므로, linked list로 slabclass에 할당되는 모든 item들이 double linked list 형식으로
저장되게 된다.(실제로 메모리 할당은 item_max_size 형태로 할당되지만… 논리적으로 이어진다.)