[입 개발] Redis 7.x 에서의 ShardedPubSub

오래간만에 Redis 7에서 도입될 Sharded PubSub 에 대해서 좀 분석을 해보게 되었습니다. Sharded PubSub 은 기존 Redis Cluster 에서의 PubSub 의 단점을 해결하기 위해서 도입된 기술입니다. 먼저 Sharded Pub/Sub을 이야기하기 전에 Redis Cluster에서의 PubSub을 알아보고 단점을 먼저 확인해봅시다.

기본적으로 Redis Cluster 에서의 Pub/Sub은 모든 노드에 데이터를 뿌리게 됩니다. 그래서 Client #1이 한대의 서버에 publish를 하면 해당 노드는 모든 노드(Primary + Replica)들에 publish를 broadcast 하게 되고, 어떤 노드에 subscribe를 하든 해당 broadcast 되는 메시지를 받을 수 있습니다.

이 broadcast 는 클라이언트가 Primary에 subscribe 를 하든 Replica 에 subscribe 를 하든 상관없습니다. 물론 publish 역시 상관이 없습니다.

그리고 subscribe 하는 클라이언트는 해당 서버에 접속을 하게 되죠. 일단 Redis에서의 pub/sub의 메시지 전달은 at most once 입니다. 즉, 최대 한번, 또는 못받을 수가 있는 구조죠.

그럼 뭐가 성능상 문제냐? 라고 물으신다면, 항상 broadcast 하는 것이 문제입니다. 안그래도 Redis Pub/Sub 자체가 pattern 이라는 걸 지원해서, 모든 채널을 확인해야 하는 이슈가 있기 때문에, 보통 메시지 전달은 채널수 + 채널에 붙은 클라이언트 수 만큼 루프를 돌아야 합니다. 항상 모든 서버에 일단 broadcast 해야 한다는게 그래서 전체 성능에 영향을 미칠 수 가 있는거죠.(Redis는 Single Threaded 라 항상 오래 동작하는 기능은 조심해야 하는…), 클러스터가 커지면 커질수록 문제가 더 발생할 수 있습니다.

그럼 Sharded Pub/Sub은 뭐냐? 말 그대로입니다. Sharded 해서 특정 노드에만 해당 채널의 정보를 publish 하도록 하자가 됩니다. 이 구조는 보통 다음과 같습니다.

이는 기존 Redis Cluster 의 특성을 그대로 이용합니다. 일반적으로 key를 crc16으로 Hash 해서 해당 키가 속한 slot 을 처리하는 서버로만 메시지를 전달하게 되는데 (-MOVED 를 이용합니다.), 이 특성을 이용해서 Pub/Sub도 하나의 Shard 군에서만 처리하자 입니다.(제가 노드가 아니라 Shard 군이라고 한 것에 주의하세요.)

먼저 코드부터 확인해보도록 하겠습니다. pubsubtype 이라는 구조체가 ShardedPubSub을 지원하기 위해서 도입이 되었습니다.

typedef struct pubsubtype {
    int shard;
    dict *(*clientPubSubChannels)(client*);
    int (*subscriptionCount)(client*);
    dict **serverPubSubChannels;
    robj **subscribeMsg;
    robj **unsubscribeMsg;
}pubsubtype;

위의 pubsubtype 으로 두 개의 타입이 생성되어 있음. 기본인 pubSubType 과 ShardedPubSub을 위한 pubSubShardType 은 다음과 같습니다.

pubsubtype pubSubType = {
    .shard = 0,
    .clientPubSubChannels = getClientPubSubChannels,
    .subscriptionCount = clientSubscriptionsCount,
    .serverPubSubChannels = &server.pubsub_channels,
    .subscribeMsg = &shared.subscribebulk,
    .unsubscribeMsg = &shared.unsubscribebulk,
};

/*
 * Pub/Sub type for shard level channels bounded to a slot.
 */
pubsubtype pubSubShardType = {
    .shard = 1,
    .clientPubSubChannels = getClientPubSubShardChannels,
    .subscriptionCount = clientShardSubscriptionsCount,
    .serverPubSubChannels = &server.pubsubshard_channels,
    .subscribeMsg = &shared.ssubscribebulk,
    .unsubscribeMsg = &shared.sunsubscribebulk
};

이를 지원하기 위해서 redisServer 구조체에도 sharded channels을 구독하는 client의 정보를 저장하고 있는 변수가 추가됨.

dict *pubsubshard_channels;  /* Map channels to list of subscribed clients */
int cluster_allow_pubsubshard_when_down; /* Is pubsubshard allowed when the cluster is down, doesn't affect pubsub global. */

client 구조체에도 이를 지원하기 위한 변수가 추가됨

dict *pubsubshard_channels;  /* shard level channels a client is interested in (SSUBSCRIBE) */

여기서 주의해서 봐야 하는 것은 아래처럼 pubsub_channels 과 pubsubshard_channels 가 keylistDictType 으로 구성되어 있다는 것입니다.

server.pubsub_channels = dictCreate(&keylistDictType);
server.pubsub_patterns = dictCreate(&keylistDictType);
server.pubsubshard_channels = dictCreate(&keylistDictType);

getClientPubSubChannels 와 getClientPubSubShardChannels 는 아래와 같이 서로 다른 변수를 전달함.

dict* getClientPubSubChannels(client *c) {
    return c->pubsub_channels;
}

dict* getClientPubSubShardChannels(client *c) {
    return c->pubsubshard_channels;
}

clientSubscriptionsCount 와 clientShardSubscriptionsCount 는 다음과 같다.

/* Return the number of channels + patterns a client is subscribed to. */
int clientSubscriptionsCount(client *c) {
    return dictSize(c->pubsub_channels) + listLength(c->pubsub_patterns);
}

/* Return the number of shard level channels a client is subscribed to. */
int clientShardSubscriptionsCount(client *c) {
    return dictSize(c->pubsubshard_channels);
}

먼저 subscribe 하는 코드를 살펴보면 다음과 같다. sharded pubsub 을 위해서 ssubscribe 라는 명령이 존재한다. 코드를 보면 cluster_enabled 상태에서는 해당 채널이 등록되어 있지 않으면 slotToChannelAdd 명령을 통해서 cluster 의 slot_to_channels 에 추가하게 됩니다.

/* SSUBSCRIBE channel [channel ...] */
void ssubscribeCommand(client *c) {
    if (c->flags & CLIENT_DENY_BLOCKING) {
        /* A client that has CLIENT_DENY_BLOCKING flag on
         * expect a reply per command and so can not execute subscribe. */
        addReplyError(c, "SSUBSCRIBE isn't allowed for a DENY BLOCKING client");
        return;
    }

    for (int j = 1; j < c->argc; j++) {
        /* A channel is only considered to be added, if a
         * subscriber exists for it. And if a subscriber
         * already exists the slotToChannel doesn't needs
         * to be incremented. */
        if (server.cluster_enabled &
            (dictFind(*pubSubShardType.serverPubSubChannels, c->argv[j]) == NULL)) {
            slotToChannelAdd(c->argv[j]->ptr);
        }
        pubsubSubscribeChannel(c, c->argv[j], pubSubShardType);
    }
    c->flags |= CLIENT_PUBSUB;
}

그리고

/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
 * 0 if the client was already subscribed to that channel. */
int pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) {
    dictEntry *de;
    list *clients = NULL;
    int retval = 0;

    /* Add the channel to the client -> channels hash table */
    if (dictAdd(type.clientPubSubChannels(c),channel,NULL) == DICT_OK) {
        retval = 1;
        incrRefCount(channel);
        /* Add the client to the channel -> list of clients hash table */
        de = dictFind(*type.serverPubSubChannels, channel);
        if (de == NULL) {
            clients = listCreate();
            dictAdd(*type.serverPubSubChannels, channel, clients);
            incrRefCount(channel);
        } else {
            clients = dictGetVal(de);
        }
        listAddNodeTail(clients,c);
    }
    /* Notify the client */
    addReplyPubsubSubscribed(c,channel,type);
    return retval;
}

publish 는 spublish 라는 전용 커맨드가 있습니다. publishCommand 는 clusterPropagatePublish 라는 모든 Shard에 데이터를 보내는 것과 달리 spublishCommand 는 clusterPropagatePublishShard 함수는 해당 shard 에만 메세지를 보내게 됩니다.(이 부분이 가장 큰 차이입니다.) – 기존의 Pub/Sub은 모든 노드에 데이터를 보내기 때문에 아무 노드에만 Subscribe 를 해도 메시지를 전달 받을 수 있었지만, Sharded Pub/Sub에서는 이게 불가능합니다.

/* SPUBLISH <channel> <message> */
void spublishCommand(client *c) {
    int receivers = pubsubPublishMessageInternal(c->argv[1], c->argv[2], pubSubShardType);
    if (server.cluster_enabled) {
        clusterPropagatePublishShard(c->argv[1], c->argv[2]);
    } else {
        forceCommandPropagation(c,PROPAGATE_REPL);
    }
    addReplyLongLong(c,receivers);
}

여기서 살짝 publishCommand 함수를 살펴보면(Sharded PubSub이 아닌 경우) 다음과 같습니다. 거의 동일합니다.

/* PUBLISH <channel> <message> */
void publishCommand(client *c) {
    if (server.sentinel_mode) {
        sentinelPublishCommand(c);
        return;
    }

    int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
    if (server.cluster_enabled)
        clusterPropagatePublish(c->argv[1],c->argv[2]);
    else
        forceCommandPropagation(c,PROPAGATE_REPL);
    addReplyLongLong(c,receivers);
}

pubsubPublishMessageInternal 는 세 번째 파라매터로 처음에 소개한 pubsubtype 을 받게 되고 코드는 다음과 같습니다. pattern 을 사용하는 방식은 여기 코드에서도 나오지만 shard type을 지원하지 않습니다. 이는 pattern형식은 전체 노드를 subscribe 하지 않고서는 지원할 수 없기 때문입니다. (패턴의 hash 값이 어떤 slot에 들어갈지를 알 수 없음.)

/*
 * Publish a message to all the subscribers.
 */
int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type) {
    int receivers = 0;
    dictEntry *de;
    dictIterator *di;
    listNode *ln;
    listIter li;

    /* Send to clients listening for that channel */
    de = dictFind(*type.serverPubSubChannels, channel);
    if (de) {
        list *list = dictGetVal(de);
        listNode *ln;
        listIter li;

        listRewind(list,&li);
        while ((ln = listNext(&li)) != NULL) {
            client *c = ln->value;
            addReplyPubsubMessage(c,channel,message);
            updateClientMemUsage(c);
            receivers++;
        }
    }

    if (type.shard) {
        /* Shard pubsub ignores patterns. */
        return receivers;
    }

    /* Send to clients listening to matching channels */
    di = dictGetIterator(server.pubsub_patterns);
    if (di) {
        channel = getDecodedObject(channel);
        while((de = dictNext(di)) != NULL) {
            robj *pattern = dictGetKey(de);
            list *clients = dictGetVal(de);
            if (!stringmatchlen((char*)pattern->ptr,
                                sdslen(pattern->ptr),
                                (char*)channel->ptr,
                                sdslen(channel->ptr),0)) continue;

            listRewind(clients,&li);
            while ((ln = listNext(&li)) != NULL) {
                client *c = listNodeValue(ln);
                addReplyPubsubPatMessage(c,pattern,channel,message);
                updateClientMemUsage(c);
                receivers++;
            }
        }
        decrRefCount(channel);
        dictReleaseIterator(di);
    }

여기서 볼 부분은 실제로 위의 해당 channel 이 속한 cluster 로 clusterSendPublish를 통해서 전달되게 됩니다. 여기서 clusterGetNodesServingMySlots 에서는 해당 slot 을 가진 primary 와 replica를 모두 전달받게 됩니다. 즉, 보통 최소 2개 정도가 정상입니다.

/* -----------------------------------------------------------------------------
 * CLUSTER Pub/Sub shard support
 *
 * Publish this message across the slot (primary/replica).
 * -------------------------------------------------------------------------- */
void clusterPropagatePublishShard(robj *channel, robj *message) {
    list *nodes_for_slot = clusterGetNodesServingMySlots(server.cluster->myself);
    if (listLength(nodes_for_slot) != 0) {
        listIter li;
        listNode *ln;
        listRewind(nodes_for_slot, &li);
        while((ln = listNext(&li))) {
            clusterNode *node = listNodeValue(ln);
            if (node != myself) {
                clusterSendPublish(node->link, channel, message, CLUSTERMSG_TYPE_PUBLISHSHARD);
            }
        }
    }
    listRelease(nodes_for_slot);
}

clusterSendPublish 함수는 다음과 같이 publish 메세지를 해당 slot 을 가진 노드에 전달합니다.

/* Send a PUBLISH message.
 *
 * If link is NULL, then the message is broadcasted to the whole cluster.
 *
 * Sanitizer suppression: In clusterMsgDataPublish, sizeof(bulk_data) is 8.
 * As all the struct is used as a buffer, when more than 8 bytes are copied into
 * the 'bulk_data', sanitizer generates an out-of-bounds error which is a false
 * positive in this context. */
REDIS_NO_SANITIZE("bounds")
void clusterSendPublish(clusterLink *link, robj *channel, robj *message, uint16_t type) {
    unsigned char *payload;
    clusterMsg buf[1];
    clusterMsg *hdr = (clusterMsg*) buf;
    uint32_t totlen;
    uint32_t channel_len, message_len;

    channel = getDecodedObject(channel);
    message = getDecodedObject(message);
    channel_len = sdslen(channel->ptr);
    message_len = sdslen(message->ptr);

    clusterBuildMessageHdr(hdr,type);
    totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
    totlen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len;

    hdr->data.publish.msg.channel_len = htonl(channel_len);
    hdr->data.publish.msg.message_len = htonl(message_len);
    hdr->totlen = htonl(totlen);

    /* Try to use the local buffer if possible */
    if (totlen < sizeof(buf)) {
        payload = (unsigned char*)buf;
    } else {
        payload = zmalloc(totlen);
        memcpy(payload,hdr,sizeof(*hdr));
        hdr = (clusterMsg*) payload;
    }
    memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
    memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
        message->ptr,sdslen(message->ptr));

    if (link)
        clusterSendMessage(link,payload,totlen);
    else
        clusterBroadcastMessage(payload,totlen);

    decrRefCount(channel);
    decrRefCount(message);
    if (payload != (unsigned char*)buf) zfree(payload);
}

그래서 Sharded Pub/Sub 은 ssubscribe, spublish 등의 명령을 가지고 있습니다. 그래서 테스트를 해보면 다음과 같은 결과를 볼 수 있습니다. ssubscribe test 를 하면 7003 으로 -MOVED가 전송됩니다. 7003으로 접속해서 다시 ssubscribe를 하면 성공함을 볼 수 있습니다.

➜  redis git:(unstable) ✗ telnet 0 7005
Trying 0.0.0.0...
Connected to 0.
Escape character is '^]'.
ssubscribe test
-MOVED 6918 127.0.0.1:7003
quit
+OK
Connection closed by foreign host.
➜  redis git:(unstable) ✗ telnet 0 7003
Trying 0.0.0.0...
Connected to 0.
Escape character is '^]'.
ssubscribe test
*3
$10
ssubscribe
$4
test
:1

spublish 도 마찬가지 입니다.

➜  ~  telnet 0 7002
Trying 0.0.0.0...
Connected to 0.
Escape character is '^]'.
spublish test 123
-MOVED 6918 127.0.0.1:7003
quit
+OK
Connection closed by foreign host.
➜  ~ telnet 0 7003
Trying 0.0.0.0...
Connected to 0.
Escape character is '^]'.
spublish test 123
:1

실제로 spublish 를 하면 이제 해당 shard 군만 받게 되는 것을 볼 수 있습니다. 이로 인해서 broadcast 를 줄여서, 실제 부하를 줄이겠다는게 Redis 7.x에서 나오는 Sharded Pub/Sub 입니다. 다만 제가 보기에는 Sharded Pub/Sub도 명확한 한계가 있는 방식입니다. 물론, 이전의 방식이 더 좋다는 것은 아니지만, Redis 가 진짜 대규모의 Pub/Sub을 처리하기 위해서는 좀 더 많은 구조적인 개선이 필요할듯 합니다.

[입 개발] 스타트업을 위한 AWS 로그 시스템 Part #3

로그를 분석하거나 저장할 필요가 없는 서비스면 상관이 없겠지만, 결국 대부분의 서비스는 로그를 저장하고 이를 통해서 서비스를 상황을 분석하게 됩니다.(혼자서 테스트로만 쓰는 서비스면 그럴 필요가 없죠.) 그래서 이제 마지막으로 어떻게 s3에 저장할 것인가에 대해서 얘기를 할려고 합니다. 2부에서도 보였지만, 아래 형태 처럼 S3에 저장이되면 EMR, Athena 등을 이용해서 처리를 할 수 있습니다.

가장 쉬운 방법은 해당 CloudWatch Log를 S3로 덤프하는 것입니다. 아래 처럼 CloudWatch 에서 Export data to Amazon S3를 통해서 덤프를 하는 것입니다.

해당 메뉴를 선택하고, 덤프할 시간대를 지정하면 해당 경로에 덤프가 되게 됩니다. 여기에 주의할 것은 CloudWatch Log Stream 은 로컬 시간으로 보여지지만, 덤프는 UTC 기준으로 설정해야 한다는 부분입니다.

가장 쉽고, 쉽게 생성할 수 있습니다. API로도 쉽게 가능한데, 단점(?)은 로그가 cloudwatch에서 보이는 형태로 그대로 저장됩니다. 예를 들어 우리의 Log 자체는 json 형태인데 cloud watch 에는 시간 날짜 서버 주소 등이 추가되어서 다음과 같은 형태로 저장되게 됩니다.

2022-02-24T15:15:41.565Z Feb 25 00:15:41 ip-10-10-104-20 web: {"timestamp": "2022-02-25T00:15:41" 어쩌구 저쩌구 원래 로그}

그리고 실시간이 아니라 배치 형태입니다. 물론 (1분 단위로 저장하면 될것 같긴 한데…) 그리고 로그 구조도 위의 s3 bucket prefix 설정 이후에 {uuid}/{logstream name}/000000.gz 이런식으로 저장되게 됩니다.

그럼 실시간으로 하고 싶다면? kinesis firehose delivery stream 을 사용할 수 있습니다. create delivery stream 을 통해서 firehose delivery stream 을 생성할 수 있습니다.

이때 기본적으로 Source를 DIRECT_PUT으로 설정하고Destination 을 Amazon S3 로 설정하면 됩니다.

그리고 어디에 저장할지 S3 Bucket 과 S3 bucket Prefix 를 지정하면 됩니다. 항상 저 prefix 뒤에 YYYY/MM/dd/HH 가 붙게됩니다.(UTC 기준입니다.)

이런 작업을 하기 전에 해당 firehose 가 s3에 쓰기 위한 IAM 과 cloudwatch에서 해당 firehose delivery stream 을 쓰기 위한 IAM을 두 개 생성해야 합니다.

그리고 이제 cloudwatch에서 Subscription Filters 를 이용해서 로그를 firehose 에 준 실시간(몇분 정도의 데이터가 모여서 전달이 되는 걸로 보입니다.) 형태로 아까 지정한 s3 경로에 데이터가 저장되게 됩니다.)

그런데 문제는 Subscription Filters 형태로 그냥 저장을 해버리면, 로그 포맷이 다음과 같은 형태입니다.

{"messageType":"DATA_MESSAGE","owner":"171305364530","logGroup":"/aws/elasticbeanstalk/myservervice/var/log/myservice/reqlog.log","logStream":"myservice/i-0d93c1be6e7f0b515","subscriptionFilters":["adminlog_to_s3"],"logEvents":[{"id":"36685096528927476510604570585254667982719470768821960704","timestamp":1645016621747,"message":"{\"logType\":\"BasicErrorController#error\",\"server_timestamp\":1645016613084,\"remote_ip\":\"20.114.132.182\",\"userId\":-1,\"url\":\"/error\",\"status\":500}"}]}{"messageType":"DATA_MESSAGE","owner":"171305364530","logGroup":"/aws/elasticbeanstalk/myservice/var/log/myservice/reqlog.log","logStream":"i-0d93c1be6e7f0b515","subscriptionFilters":["adminlog_to_s3"],"logEvents":[{"id":"36685096528927476510604570585230365759195078421286944768","timestamp":1645016621747,"message":"{\"logType\":\"BasicErrorController#error\",\"server_timestamp\":1645016613084,\"remote_ip\":\"20.114.132.182\",\"userId\":-1,\"url\":\"/error\",\"status\":500}"}]}

Subscription Filter에서 저런 형태로 데이터를 Firehose로 넘겨주게됩니다. logEvents 안에 몇개의 로그들이 배열로 들어가 있고, 그 중에 message 부분에 실제 저장한 로그가 들어가 있습니다.. (다음 로그도 합쳐져서 저장이 되어서 json 파싱도 어려운…)

그러면 이걸 어떻게 하는가? 실제로 Subscription Filter를 lambda로 만들어서 lambda로 데이터를 보내고, 해당 데이터만 추출해서 데이터를 저장하는 형태로 사용하면 원래 파일에 남아있던 로그만 저장할 수 가 있습니다.

실제로 이 부분을 만들어야 하나 싶었는데, 다음을 이용하면 바로 aws lambda 로 설정이 됩니다. https://github.com/jlhood/json-lambda-logs-to-kinesis-firehose

코드를 보면 실제로 특별한 작업을 하는 것은 아니고 다음과 같습니다. 즉 Subscription Filter로 인해서 발생한 로그가 AWS Lambda의 handler 를 호출하면, 아까 Firehose 로그 형식을 읽어서 파싱합니다. logEvents 부분을 읽어서 개별로 FIREHOSE의 해당 경로에 저장하게 됩니다.

FIREHOSE = boto3.client('firehose')


def handler(event, context):
    """Forward JSON-formatted CW Log events to Firehose Delivery Stream."""
    LOG.debug('Received event: %s', event)

    log_messages = _get_log_messages(event)
    for log_message in log_messages:
        if _is_json(log_message):
            if not log_message.endswith('\n'):
                log_message += '\n'

            FIREHOSE.put_record(
                DeliveryStreamName=config.DELIVERY_STREAM_NAME,
                Record={
                    'Data': log_message
                }
            )


def _get_log_messages(event):
    data = json.loads(gzip.decompress(base64.b64decode(event['awslogs']['data'])))
    return [log_event['message'] for log_event in data['logEvents']]

위의 Lambda 를 등록하면 이제 해당 로그가 다음 같은 구조로 흐르게 됩니다.

개인적으로는 실시간이 필요없다면 그냥 export 방식이 가장 수월해 보입니다. (필요하면 주기를 좀 줄여서 실시간 처럼 보일 수도…)

저렇게 S3에 저장이 된 후에 EMR등을 이용해서 분석을 할 수 있습니다. 아마 초반에는 실시간 데이터는 필요 없을듯 하니, 최대한 간단하게 가져갈 수 있는게 좋을듯 합니다. 다만… 세상이 그렇게 쉽지 많은 않은… 보안 로그도 관리해야하고… 핀테크는 제약이 좀 더 많네요.

스타트업에서의 목표는 최소 비용(사람의 노력이 가장 큰 비용입니다.)으로 어느정도의 효과를 뽑아내는 거라고 생각합니다.(최대가 아닙니다.)

데이터 엔지니어어 생활도 오래한거 같지만… 데이터쪽은 항상 뭔가 할려고 하니 어렵네요. 일단은 이렇게 땜방을 치고… 나중에 좀 더 고도화를 해야…

이렇게 로그를 수집하려는 이유는, 일단은 에러로그 같은 걸 좀 쉽게 보기 위해서… DB를 최대한 안 읽고, 필요한 현재 상태를 파악하기 위해서 입니다. 사실 로그에서 가장 중요한 것은, 어떻게 구축하냐가 아니라, 어떤 정보를 남기는가를 정의하는 것입니다. DB에 있는 데이터를 계속 접근해야 하면, 데이터 량이 작을때는 별 문제가 아니지만, 나중에는 DB에서 그 데이터를 읽기 위해서 많은 비용을 들이게 됩니다.(단점은 로그가 안정적인가? – 유실은 없을까 등… 고민이 많습니다.)

어서 빨리 서비스가 성장해서 데이터 처리에 더 많은 고민을 할 수 있으면 좋겠습니다.

[입 개발] 스타트업을 위한 AWS 로그 시스템 Part #2

1부에서는 CloudWatch Log Stream 에 데이터를 저장하는 것 까지 살펴보았습니다. 그런데 그렇게 로그를 보낸다고 하면 고민해야 할 부분들이 있습니다.

  • 로그 파일의 사이즈
    • /var/log/myservice/myservice.log 에 파일을 저장한다고 가정을 하겠습니다. 그러면 해당 파일에 계속 로그가 쌓이다 보면 어떤 문제가 발생하게 될까요?
    • 계속 데이터가 쌓이다보면, 결국 해당 디스크의 디스크 용량을 서비스 로그가 전부 디스크를 사용하게 되면, 해당 인스턴스에 문제가 발생하게 될 것입니다.

그렇다면 위의 문제를 기존에 beanstalk 에서는 어떻게 사용하고 있을까요? beanstalk의 기본 설정으로는 logrotate 를 사용하고 있습니다. logrotate 에 대한 기본 설명 자체는 아래 블로그가 잘 설명하고 있어서 이것으로 대체하도록 하겠습니다.

https://velog.io/@gillog/logrotate

해당 설정은 beanstalk를 사용하면 /etc/logrotate.elasticbeanstalk.hourly 를 보면 확인할 수 있습니다.

[ec2-user@ ~]$ cd /etc/logrotate.elasticbeanstalk.hourly/
[ec2-user@ logrotate.elasticbeanstalk.hourly]$ ll
total 32
-rw-r--r-- 1 root root 157 Feb 11 00:54 logrotate.elasticbeanstalk.eb-engine.conf
-rw-r--r-- 1 root root 156 Feb 11 00:54 logrotate.elasticbeanstalk.eb-hooks.conf
-rw-r--r-- 1 root root 171 Feb 15 11:28 logrotate.elasticbeanstalk.healthd.conf
-rw-r--r-- 1 root root 157 Feb 15 11:28 logrotate.elasticbeanstalk.nginx.conf
-rw-r--r-- 1 root root 156 Feb 11 00:54 logrotate.elasticbeanstalk.web-stderr.conf
-rw-r--r-- 1 root root 156 Feb 11 00:54 logrotate.elasticbeanstalk.web-stdout.conf

여기에 logrotate 필요한 로그에 대한 설정을 다음과 같이 추가해주면 됩니다.

/var/log/myservice/*.log {
 su root root
 size 1M
 rotate 5
 missingok
 compress
 notifempty
 copytruncate
 dateext
 dateformat %s
 olddir /var/log/rotated
}

이 때 해당 설정 파일의 umask 는 444 나 644 가 되어야 합니다. 위의 설정도 .ebextention의 files 섹션에 내용을 추가함으로써 생성할 수 있습니다.

  "/etc/logrotate.elasticbeanstalk.hourly/logrotate.elasticbeanstalk.reqlog.conf" :
    mode: "000444"
    owner: root
    group: root
    content: |
      /var/log/lemontree-admin/*.log {
       su root root
       size 1M
       rotate 5
       missingok
       compress
       notifempty
       copytruncate
       dateext
       dateformat %s
       olddir /var/log/rotated 
      }

그럼 위의 설정만으로 과연 해당 로그가 logrotate가 되게 될까요? 위의 설정중에 olddir 을 보면 logrotate가 된 파일이 저장되게 됩니다. 위의 rotate 5 설정에 의해서 최고 5개만 남게 되죠.

/var/log/rotated 로 가보면 다음과 같이 파일 목록이 존재하게 됩니다. 파일 뒤에 붙는 이름의 형식은 위의 설정의 dateformat %s 를 사용하면 timestamp 로, 아니면 YYYYMMDD나 순서등으로 이름이 생성됩니다.

[ec2-user@ip-]$ logrotate.elasticbeanstalk.hourly]$ cd /var/log/rotated/
[ec2-user@ip rotated]$ ll
total 72
-rw-r--r-- 1 root   root   5444 Feb 11 02:42 eb-engine.log1644516061.gz
-rw-r--r-- 1 root   root   5678 Feb 12 01:36 eb-engine.log1644598861.gz
-rw-r--r-- 1 root   root   3888 Feb 12 13:46 eb-engine.log1644642061.gz
-rw-r--r-- 1 root   root   5664 Feb 12 14:25 eb-engine.log1644645661.gz
-rw-r--r-- 1 root   root   3899 Feb 15 11:28 eb-engine.log1644894061.gz
-rw------- 1 root   root   1851 Feb 16 20:31 web.stdout.log1645012861.gz
-rw------- 1 root   root   1843 Feb 16 21:57 web.stdout.log1645016462.gz
-rw------- 1 root   root   1693 Feb 16 22:44 web.stdout.log1645020061.gz
-rw------- 1 root   root   3905 Feb 16 23:43 web.stdout.log1645023661.gz
-rw------- 1 root   root   2076 Feb 17 00:55 web.stdout.log1645027261.gz

logrotate 를 사용할 경우, 만약에 해당 log 사이즈가 많고 지속적으로 데이터가 저장되고 있으면 logrotate가 발생하는 순간에 많은 부하가 발생할 수 있습니다. 이것은 logrotate의 동작 특성을 살펴보면 알 수 있는데 위의 설정에서 copytruncate 옵션을 주면 다음과 같이 처리되게 됩니다.

  • 먼저 logrotate 는 임시 파일을 생성합니다.
  • tmpfile의 이름을 필요한 형태로 변경합니다.
  • Myserver.log 의 내용을 Myserver.log.YYYYMMDD 로 복사합니다. 이 때 데이터의 양이 많으면 복사 작업 때문에 시스템 IO가 많이 발생하고 이때 서비스 timeout 과 로그 유실이 발생할 수 있습니다.
  • 최종적으로 Myserver.log 의 내용을 지우고 다시 해당 Myserver.log 에 로그를 쓰게 됩니다.

여기서 발생할 수 있는 부분에 대해서는 아래 블로그를 참고하면 될듯합니다.

https://brunch.co.kr/@alden/27 – 참고하고 보니, 갓앨든(강진우) 님의 브런치네요. 사랑해요 앨든!!!

그런데 이렇게 logrotate 설정만으로는 logrotate가 되지 않습니다. 실제로 해당 logrotate를 매 시간마다 실행시켜 주는 것은 cron 에 의해서 동작하고 있습니다. 그래서 실제로 cron 작업을 추가하는 마지막 작업이 있어야 합니다.

이것은 /etc/cron.hourly/ 를 살펴보면 쉽게 찾을 수 있습니다.

[ec2-user@ip- rotated]$ cd /etc/cron.hourly/
[ec2-user@ip- cron.hourly]$ ll
total 40
-rwxr-xr-x 1 root root 392 Jan 16  2020 0anacron
-rwxr-xr-x 1 root root 121 Feb 15 11:28 cron.logcleanup.elasticbeanstalk.healthd-proxy.conf
-rwxr-xr-x 1 root root 152 Feb 15 11:28 cron.logrotate.elasticbeanstalk.eb-engine.conf
-rwxr-xr-x 1 root root 151 Feb 15 11:28 cron.logrotate.elasticbeanstalk.eb-hooks.conf
-rwxr-xr-x 1 root root 150 Feb 15 11:28 cron.logrotate.elasticbeanstalk.healthd.conf
-rwxr-xr-x 1 root root 148 Feb 15 11:28 cron.logrotate.elasticbeanstalk.nginx.conf
-rwxr-xr-x 1 root root 149 Feb 12 18:07 cron.logrotate.elasticbeanstalk.reqlog.conf.bak
-rwxr-xr-x 1 root root 153 Feb 15 11:28 cron.logrotate.elasticbeanstalk.web-stderr.conf
-rwxr-xr-x 1 root root 153 Feb 15 11:28 cron.logrotate.elasticbeanstalk.web-stdout.conf

cron 설정을 보시면 다음과 같이 내용이 들어가 있습니다.

#!/bin/sh
test -x /usr/sbin/logrotate || exit 0
/usr/sbin/logrotate -f /etc/logrotate.elasticbeanstalk.hourly/logrotate.elasticbeanstalk.web-stdout.conf

이 해당 설정도 이제 files 섹션에 추가해주면 logrotate 설정이 완료하게 됩니다.

  "/etc/cron.hourly/cron.logrotate.elasticbeanstalk.myservice.conf" :
    mode: "000444"
    owner: root
    group: root
    content: |
      #!/bin/sh
      test -x /usr/sbin/logrotate || exit 0
      /usr/sbin/logrotate -f /etc/logrotate.elasticbeanstalk.hourly/logrotate.elasticbeanstalk.myservice.conf

이제 이것으로 기본적으로 서버의 설정은 끝났습니다.

  • 1부에서는 로그의 수집
  • 2부에서는 서버 안정성을 위한 logrotate 설정을 살펴보았습니다.

그런데 과연 이것만으로 다 된것일까요?

log의 retention 설정이 중요한데, cloudwatch 에 있는 log group의 retention 은 저희가 설정하기 나름입니다. 1일에서 영구적으로도 가능합니다. 해당 로그는 logs insight를 통해서 살펴 볼 수 있습니다. 다만 이 retention 기간이 지나면 없어지기 때문에 영구적으로 설정하거나, 잘 백업을 해둬야 합니다.

사실 로그는 이렇게 확인을 하기 위해서도 저장하지만, 분석한 필요한 정보들을 포함하고 있기 때문에, 분석을 위해서 이를 저장할 필요가 있습니다. AWS에서 가장 쉽게 사용하는 분석툴은 athena 나 EMR을 이용해서 분석을 해야 할텐데요. 결국 이를 위해서는 cloudwatch log stream에 있는 로그들을 s3에 적절한 형태로 저장할 필요가 있습니다.

3부에서는 이 부분에 대해서 다뤄보도록 하겠습니다.

[입 개발] 스타트업을 위한 AWS 로그 시스템 Part #1

스타트업에 다니고 있다보니 당장 필요하지는 않지만, 로그 수집 시스템을 구축해야 했습니다.(당장 필요하지 않은데 왜 하는거지!!!) 라는 의문이 들 수 있는데…(들어야 정상입니다.), 사실 꼭 초반에 구성하지 않더라도 큰 문제는 없습니다. 아직 저희는 서비스가 출시 전이기 때문에, 출시 전에 로그를 수집하더라도 큰 문제는 없지만, 다음과 같은 생각이 있었습니다.(만구 제 생각입니다.)

  • 서비스 출시 전으로 갈 수록 서비스 관련 이슈가 많으므로 로그 시스템을 구축할 수 있는 시간이 줄어들 것이다.
  • 그러면, 실제로 필요한 로그들을 정리하기 어려워서 놓치는 부분들이 생기고, 필요한 로그들이 있는지 체크하기 어려울 것이다.

그래서 일단은 로그 수집 시스템을 구축하려고 했는데, 그렇다고 기존에 다른 회사에서 사용하던 로그 수집 시스템을 그대로 사용하기에는 관리 비용이 비싸다고 생각했습니다.

beanstalk 에서도 자체적으로 로그를 보여주게 되는데, Beanstalk 에서 로그가 보고싶은 environment를 선택하고 Logs 를 선택합니다.

그리고 Request Logs 를 선택하고 Last 100 Lines 나 Full Logs를 선택해서 원하는 로그를 볼 수 있습니다.

개발중에는 대략 이렇게 로그를 볼 수도 있습니다. 그런데 서비스를 진행하면서 이렇게 로그를 볼 수 는 없습니다. 왜냐하면 로그는 서비스의 현재와 과거의 모습, 그리고 우리가 어떻게 성장을 해야 하는지를 보여주는 핵심 지표이기 때문입니다.

예를 들어 로그를 모아서 다음과 같은 정보들을 나중에 보여주고 싶습니다.

  • 현재 서비스의 사용자 수(DAU, WAU, MAU)
  • 현재 가장 많이 발생하는 Exception(장애시 이런게 엄청 중요합니다.)
  • 어떤 이벤트로 변하는 사용자 수

그래서 로그를 수집하는 일반적인 파이프라인은 다음과 같은 형태를 많이 사용합니다. 그리고 저기서 ELK 가 붙어서 비주얼라이제이션까지 처리를 많이 하게 되죠.

처음에는 위의 형태를 사용할까 생각을 했는데 몇 가지 고민이 생겼습니다.

  • 우리는 AWS Beanstalk 를 쓰고 있다.
    • beanstalk 라고 filebeat를 설치하는 것이 어렵지는 않습니다.
  • Kafka를 써야 하나?
    • Kafka는 굉장히 좋은 툴이고, AWS는 Managed Kafka 까지 제공해주고 있습니다. 그럼에도 불구하고, 우리가 Kafka의 운영에 신경을 써야 한다는 것이 고민 포인트였습니다.
  • ELK를 다 써야 하나?
    • elastic도 굉장히 좋은 솔루션인데, 스타트업에서는 살짝 고민되는 비용이기도 합니다.
    • 사실, 반대로는 스타트업이라서 로그가 별로 없어서 써도 상관없는 비용이기도 한…(무슨 소리냐!!!)

그래서 이것 저것을 조사하다 보니, 소규모에는 CloudWatch Log 정도로도 충분할꺼라는 지인들의 조언을 많이 받게 되어서 (DataDog이나 어려가지 SaaS 솔루션의 추천도 많고, 예전에 써본 경험도 있긴 한데… 흑흑흑 쓸 수 없었던…) 해당 솔루션 들을 리서치 하게 되었다.

그래서 현재는 (아직 구축이 끝나지 않았다!!!) 다음과 같다. 현재 필요한 로그들을 cloudwatch 에 까지 보내는 것은 테스트가 되었고, 뒷 부분에 lambda 로 s3에 저장을 할지, firehose 로 저장을 할지를 고민중이다.(이것도 사실 테스트는 되었는데……)

즉 지금 보는 솔루션은 beanstalk 에 awslogs 와 cwlogs 를 통해서 로그를 cloudwatch 로 저장하고 이를 다시 s3에 저장한 다음 분석 작업을 진행하는 것이다.(분석 작업은 현재 없다!!! 엉???)

그런데 beanstalk 가 조금 애매한 부분이 terraform 으로 environment 는 설정이 가능한데, 거기의 세세한 설정은 또 다른 방식으로 해야하는 것들이 있습니다. 예를 들어, 기본적으로 바로 beanstalk를 사용한다면, 기본 로그들 말고 우리가 남겨야 하는 custom log 들이 있습니다. 이런 것들을 어떻게 수집해야 할까요?

사실 beanstalk에서는 아주 간단하게 적용할 수 있는 방법을 제공합니다. 첫번째는 환경 설정에서 로그를 남기는 설정을 켜두는 것입니다.

Beanstalk 설정에서 Configuration 을 선택하고 Software 에서 Edit 를 누르면 설정을 쉽게 할 수 있습니다. 아래에서 Enabled 만 설정하시면 자동으로 cloudwatch에 로그가 남게 됩니다.

위의 설정을 켜고 CloudWatch 메뉴로 가서 Log groups 를 선택하게 되면 바로 기본적으로 로그가 남는 것을 알 수 있습니다. 예를 들어 자바 App을 구성하시면 5개의 로그가 기본적으로 생깁니다.(nginx 는 nginx를 proxy로 쓴다면…)

  • /aws/elasticbeanstalk/{environment}/var/log/eb-engine.log
  • /aws/elasticbeanstalk/{environment}/var/log/eb-hooks.log
  • /aws/elasticbeanstalk/{environment}/var/log/nginx/access.log
  • /aws/elasticbeanstalk/{environment}/var/log/nginx/error.log
  • /aws/elasticbeanstalk/{environment}/var/log/web.stdout.log

그런데 사실, 모든 출력을 web.stdout.log 로 출력을 해도 되지만, 관리를 위해서 필요한 로그를 추가를 하게 됩니다.

그렇다면 이 것은 어떻게 추가를 해야할까요? 일단 수동으로 하는 건 최대한 피해야 하고(수동으로 할 수 있는 방법도 없는 T.T)

beanstalk 는 이걸 위해 .ebextensions 에 확장을 위한 방법을 제공합니다. 사실 이미 aws 쪽에서는 이에 대한 Sample 을 제공하고 있습니다.(https://github.com/awsdocs/elastic-beanstalk-samples/blob/master/configuration-files/aws-provided/instance-configuration/logs-streamtocloudwatch-linux.config)

일단 초기에는 다음과 같이 설정했습니다.

packages:
  yum:
    awslogs: []

commands:
  01-awslog:
    command: systemctl enable awslogsd.service
  02-awslog:
    command: systemctl restart awslogsd


files:
  "/etc/awslogs/awslogs.conf" :
    mode: "000600"
    owner: root
    group: root
    content: |
      [general]
      state_file = /var/lib/awslogs/agent-state

  "/etc/awslogs/config/myservice.conf":
    mode: "000600"
    owner: root
    group: root
    content: |
      [/var/log/myservice/myservice.log]
      log_group_name = `{"Fn::Join":["/", ["/aws/elasticbeanstalk", "myservice", "var/log/myservice/myservice.log"]]}`
      log_stream_name = {instance_id}
      file = /var/log/myservice/myservice.log

  "/opt/elasticbeanstalk/tasks/bundlelogs.d/myservice.conf" :
    mode: "000755"
    owner: root
    group: root
    content: |
      /var/log/myservice/*.log

  "/opt/elasticbeanstalk/tasks/taillogs.d/myservice.conf" :
    mode: "000755"
    owner: root
    group: root
    content: |
      /var/log/myservice/*.log

위의 설정을 살펴보면 packages 는 Amazon linux 2 부터는 awslogs 가 설치가 안되어 있기 때문에 기본적으로 설치를 해주는 부분입니다.

packages:
  yum:
    awslogs: []

두 번째 commands 는 awslogs 를 설정하고 재시작 하는 부분입니다.

commands:
  01-awslog:
    command: systemctl enable awslogsd.service
  02-awslog:
    command: systemctl restart awslogsd

세 번째 files 섹션은 해당 파일들을 생성합니다. 여기에 적는 파일들은 설정파일들입니다. awslogs.conf 는 state_file을 저장하는 해당 파일은 filebeat 같은 종류의 파일들이 현재 데이터를 어디까지 보냈는지를 기록해두는 것처럼 awslogs가 보통 파일의 어디까지를 전송했는지 position 값을 가지고 있습니다.

  "/etc/awslogs/awslogs.conf" :
    mode: "000600"
    owner: root
    group: root
    content: |
      [general]
      state_file = /var/lib/awslogs/agent-state

이제 custom 로그를 cloudwatch 로 보내는 설정입니다. /etc/awslogs/config 에 들어가는 설정이 cloudwatch로 보내는 설정들이라고 보시면 됩니다. 혹시나 추가하고 싶은 파일들이 더 있으면 더 만들어주거나 하면 됩니다.

  "/etc/awslogs/config/myservice.conf":
    mode: "000600"
    owner: root
    group: root
    content: |
      [/var/log/myservice/myservice.log]
      log_group_name = `{"Fn::Join":["/", ["/aws/elasticbeanstalk", "myservice", "var/log/myservice/myservice.log"]]}`
      log_stream_name = {instance_id}
      file = /var/log/myservice/myservice.log

그리고 아래의 두 설정은 아까 beanstalk environment 설정에서 본 request logs 에서 Last 100 라인과 모두 보기를 위해서 추가해주는 설정입니다. 저기에 파일 내용을 보고 해당 파일들의 내용을 보여줍니다.

"/opt/elasticbeanstalk/tasks/bundlelogs.d/myservice.conf" :
    mode: "000755"
    owner: root
    group: root
    content: |
      /var/log/myservice/*.log

  "/opt/elasticbeanstalk/tasks/taillogs.d/myservice.conf" :
    mode: "000755"
    owner: root
    group: root
    content: |
      /var/log/myservice/*.log

이렇게 실행을 시키고 보면, 실행이 안되었습니다. 이유를 살펴보니, Log를 생성해야 하는 폴더가 없고 권한이 없어서 App이 실행이 안되었습니다. 그래서 commands 섹션을 다음과 같이 바꿔줍니다. 필요한 폴더를 생성해주는 것입니다.

commands:
  01-directory:
    command: "sudo mkdir -p /var/log/myservice/"
  02-directory:
    command: "sudo chmod 777 /var/log/myservice/"
  03-awslog:
    command: systemctl enable awslogsd.service
  04-awslog:
    command: systemctl restart awslogsd

이렇게 하고 보니, 이제 beanstalk 환경에서는 마지막 100라인등의 로그에는 잘 추가가 되었는데 cloudwatch에는 보이지 않았습니다. 이상하다 싶어서 설정을 바꿔보고 이름을 바꿔봐도 T.T 로그가 보이지 않았습니다.

이때 문제를 확인하는 방법중에 하나가 직접 해당 인스턴스에 들어가서 설정 파일들을 살펴보는 것입니다. ssh로 접근을 해서 설정을 보아도 크게 이상한게 없었는데…(제 눈이 침침해서…)

위에서 AWS 공식 예제와 다른 부분을 살펴보면 다음 부분이 있습니다. 제가 실수했던 부분은 해당 파일이 있는지만 확인하고 넘어간 것이었는데…

  "/etc/awslogs/awscli.conf" :
    mode: "000600"
    owner: root
    group: root
    content: |
      [plugins]
      cwlogs = cwlogs
      [default]
      region = `{"Ref":"AWS::Region"}`

ssh 로 직접 들어가서 해당 파일을 확인해보니… 파일 내용이 다음과 같았습니다.

[plugins]
cwlogs = cwlogs
[default]
region = us-east-1

네네, 제 Region은 ap-northeast-2 인데… 저 설정이 us-east-1 입니다. 제가 아까 awslogs 에 있는 설정은 cloudwatch 로그를 위한 것이라고 했습니다. cwlogs 가 cloudwatch logs인거죠.

그런데 이상한 부분은 일단 기존의 다른 로그들은 ap-northeast-2로 가고 있는데, custom 로그만 머나먼 버지니아(us-east-1)로 가고 있던 것입니다. 그래서 해당 region 의 cloudwatch를 보니… 로그가 거기에 쌓이고 있는…

이렇게 cloudwatch에 로그가 쌓이는 걸 확인했지만 사실 여기서 cloudwatch에 데이터를 보내는 것이 완료된 것이 아닙니다. 어떤 부분이 남아있을까요? 그것은 2부에서…

[입 개발] Redis LRU(Least Recently Used Algorithm)에 대해서

Redis 는 데이터를 영구적으로 저장하는 Persistent Store 의 역할도 하지만, 주로 데이터의 접근을 빠르게 하기 위한 Cache 로 많이 사용됩니다.

Redis 는 메모리를 데이터 저장소로 이용하기 때문에, Disk를 사용하는 다른 솔루션 보다 적은 양의 데이터를 저장하게 되고, 이로 인해서 최대치 까지 데이터를 저장하면, 새로운 데이터를 저장하기 위해서 기존 데이터를 버려야 하는 작업을 해야 합니다. 이를 eviction 이라고 부릅니다. – 다만 Expire 가 되어서 사라지는 것은 Eviction 이라고 부르지 않습니다. (실제로 Redis Enterprise 솔루션을 보면 이렇게 eviction 되어야 하는 데이터를 flash disk 에 저장해서 좀 더 빠른 접근을 하게 하는 솔루션도 있습니다.)

eviction 을 위해서 사용하는 방식중에 가장 일반적인 방식중에 하나가 LRU 입니다. LRU는 Least Recently Used Algorithm 으로, 가장 오랫동안 참조되지 않은 페이지를 교체하는 기법입니다. 보통 OS에서 페이징에서 사용하는 메모리를 교체하는 방식에서 사용되고 있으면 보통 페이지 교체 알고리즘은 다음과 같습니다.

알고리즘비고
FIFO(First In First Out)가장 먼저 메모리에 올라온 페이지를 교체
* Belady’s Anomaly(FIFO anomaly)가 발생할 수 있음
OPT(OPTimal Page Replacement)앞으로 가장 오랫동안 사용되지 않을 페이지를 교체
* 앞으로 프로세스가 사용할 페이지를 미리 알아야해서 불가능
LRU(Least Recently Used)가장 오랫동안 사용되지 않은 페이지를 교체
Count-BasedOPT와 비슷한 성능을 보여주고 같은 이유로 현실적으로 사용불가능
LFU(Least Frequently Used)참조 횟수가 가장 적은 페이지를 교체
MFU(Most Frequently Used)참조 횟수가 가장 많은 페이지를 교체
NUR(Not Used Recently)최근에 사용하지 않은 페이지를 교체(클럭 알고리즘)
Random아무거나 교체…

Redis 에서 지원해주는 알고리즘은 LRU와 LFU, RANDOM의 3가지를 제공하고 있습니다. 오늘은 여기서 LRU를 살펴보도록 하겠습니다.

보통 LRU를 구현하는 방식은 다음과 같습니다.

  • Page에 데이터를 접근한 시간에 대한 시간 데이터를 저장해서 해당 값이 가장 오래된 페이지를 교체
  • Page를 List 형태로 저장한 다음 사용된 Page를 항상 List 의 제일 앞으로 올리고, 데이터가 존재하지 않으면 가장 끝의 데이터를 삭제하고, 새로운 데이터를 List의 위에 올리는 방식으로 구현

다음 그림을 살펴보면 가장 사용되지 않은 페이지들이 교체되는 것을 볼 수 있습니다.

다시 Redis 로 돌아와서 Redis 에서 이런 Eviction 이 동작하게 하기 위해서는 maxmemory 를 설정해야 합니다. maxmemory 설정이 없으면 32bit 에서는 3GB로 제한이 자동으로 설정되고 64bit 에서는 메모리가 부족할 때 까지 계속 저장하게 됩니다.

maxmemory 10G

Redis Evicition Policy

Redis 에서는 다음과 같은 Evicition 정책을 제공합니다.

Policy비고
noeviction설정된 메모리 한계에 도달하면, 데이터 추가 명령 실행이 실패하게 됩니다.
allkeys-lruLRU 정책을 모든 Key 에 대해서 적용해서 데이터를 삭제하게 됩니다.
volatile-lruLRU 정책을 Expire 가 설정된 Key에 대해서 적용해서 데이터를 삭제하게 됩니다
allkeys-random모든 Key에 대해서 Random 하게 데이터를 삭제하게 됩니다.
volatile-randomExipre 가 설정된 Key 에 대해서 Random 하게 삭제하게 됩니다.
allkeys-lfuLFU 정책을 모든 Key 에 대해서 적용해서 데이터를 삭제하게 됩니다.
volatile-lfuLFU 정책을 Expire 가 설정된 Key 에 대해서 적용해서 데이터를 삭제하게 됩니다
volatile-ttlExpire 가 설정된 key 에 대해서 ttl이 짧은 순서로 먼저 삭제 하게 됩니다.

Eviction Process 어떻게 동작하는가?

Redis 에서 Evcition 은 메모리가 부족할 때 동작하게 되는데 다음과 같습니다.

  • 클라이언트 명령을 실행했을 때
  • Redis 가 메모리를 체크했을 때 현재 사용량이 maxmemory 설정보다 높을 때
  • 새로운 커맨드가 실행되었을 때

보통 Redis 에서 Eviction 은 server.c 에서 processCommand 함수 안에서 performEvictions 함수를 호출합니다.

    if (server.maxmemory && !scriptIsTimedout()) {
        int out_of_memory = (performEvictions() == EVICT_FAIL);

        /* performEvictions may evict keys, so we need flush pending tracking
         * invalidation keys. If we don't do this, we may get an invalidation
         * message after we perform operation on the key, where in fact this
         * message belongs to the old value of the key before it gets evicted.*/
        trackingHandlePendingKeyInvalidations();

        /* performEvictions may flush slave output buffers. This may result
         * in a slave, that may be the active client, to be freed. */
        if (server.current_client == NULL) return C_ERR;

        int reject_cmd_on_oom = is_denyoom_command;
        /* If client is in MULTI/EXEC context, queuing may consume an unlimited
         * amount of memory, so we want to stop that.
         * However, we never want to reject DISCARD, or even EXEC (unless it
         * contains denied commands, in which case is_denyoom_command is already
         * set. */
        if (c->flags & CLIENT_MULTI &&
            c->cmd->proc != execCommand &&
            c->cmd->proc != discardCommand &&
            c->cmd->proc != quitCommand &&
            c->cmd->proc != resetCommand) {
            reject_cmd_on_oom = 1;
        }

        if (out_of_memory && reject_cmd_on_oom) {
            rejectCommand(c, shared.oomerr);
            return C_OK;
        }

        /* Save out_of_memory result at script start, otherwise if we check OOM
         * until first write within script, memory used by lua stack and
         * arguments might interfere. */
        if (c->cmd->proc == evalCommand ||
            c->cmd->proc == evalShaCommand ||
            c->cmd->proc == fcallCommand ||
            c->cmd->proc == fcallroCommand)
        {
            server.script_oom = out_of_memory;
        }
    }

evict.c 의 performEvictions 함수는 다음과 같습니다. performEvictions 함수는 상당히 길고 복잡한 동작을 진행합니다.

int performEvictions(void) {
    /* Note, we don't goto update_metrics here because this check skips eviction
     * as if it wasn't triggered. it's a fake EVICT_OK. */
    if (!isSafeToPerformEvictions()) return EVICT_OK;

    int keys_freed = 0;
    size_t mem_reported, mem_tofree;
    long long mem_freed; /* May be negative */
    mstime_t latency, eviction_latency;
    long long delta;
    int slaves = listLength(server.slaves);
    int result = EVICT_FAIL;

    if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL) == C_OK) {
        result = EVICT_OK;
        goto update_metrics;
    }

    if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION) {
        result = EVICT_FAIL;  /* We need to free memory, but policy forbids. */
        goto update_metrics;
    }

    unsigned long eviction_time_limit_us = evictionTimeLimitUs();

    mem_freed = 0;

    latencyStartMonitor(latency);

    monotime evictionTimer;
    elapsedStart(&evictionTimer);

    /* Unlike active-expire and blocked client, we can reach here from 'CONFIG SET maxmemory'
     * so we have to back-up and restore server.core_propagates. */
    int prev_core_propagates = server.core_propagates;
    serverAssert(server.also_propagate.numops == 0);
    server.core_propagates = 1;
    server.propagate_no_multi = 1;

    while (mem_freed < (long long)mem_tofree) {
        int j, k, i;
        static unsigned int next_db = 0;
        sds bestkey = NULL;
        int bestdbid;
        redisDb *db;
        dict *dict;
        dictEntry *de;

        if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
            server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
        {
            struct evictionPoolEntry *pool = EvictionPoolLRU;

            while(bestkey == NULL) {
                unsigned long total_keys = 0, keys;

                /* We don't want to make local-db choices when expiring keys,
                 * so to start populate the eviction pool sampling keys from
                 * every DB. */
                for (i = 0; i < server.dbnum; i++) {
                    db = server.db+i;
                    dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ?
                            db->dict : db->expires;
                    if ((keys = dictSize(dict)) != 0) {
                        evictionPoolPopulate(i, dict, db->dict, pool);
                        total_keys += keys;
                    }
                }
                if (!total_keys) break; /* No keys to evict. */

                /* Go backward from best to worst element to evict. */
                for (k = EVPOOL_SIZE-1; k >= 0; k--) {
                    if (pool[k].key == NULL) continue;
                    bestdbid = pool[k].dbid;

                    if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
                        de = dictFind(server.db[bestdbid].dict,
                            pool[k].key);
                    } else {
                        de = dictFind(server.db[bestdbid].expires,
                            pool[k].key);
                    }

                    /* Remove the entry from the pool. */
                    if (pool[k].key != pool[k].cached)
                        sdsfree(pool[k].key);
                    pool[k].key = NULL;
                    pool[k].idle = 0;

                    /* If the key exists, is our pick. Otherwise it is
                     * a ghost and we need to try the next element. */
                    if (de) {
                        bestkey = dictGetKey(de);
                        break;
                    } else {
                        /* Ghost... Iterate again. */
                    }
                }
            }
        }

        /* volatile-random and allkeys-random policy */
        else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
                 server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM)
        {
            /* When evicting a random key, we try to evict a key for
             * each DB, so we use the static 'next_db' variable to
             * incrementally visit all DBs. */
            for (i = 0; i < server.dbnum; i++) {
                j = (++next_db) % server.dbnum;
                db = server.db+j;
                dict = (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ?
                        db->dict : db->expires;
                if (dictSize(dict) != 0) {
                    de = dictGetRandomKey(dict);
                    bestkey = dictGetKey(de);
                    bestdbid = j;
                    break;
                }
            }
        }

        /* volatile-random and allkeys-random policy */
        else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
                 server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM)
        {
            /* When evicting a random key, we try to evict a key for
             * each DB, so we use the static 'next_db' variable to
             * incrementally visit all DBs. */
            for (i = 0; i < server.dbnum; i++) {
                j = (++next_db) % server.dbnum;
                db = server.db+j;
                dict = (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ?
                        db->dict : db->expires;
                if (dictSize(dict) != 0) {
                    de = dictGetRandomKey(dict);
                    bestkey = dictGetKey(de);
                    bestdbid = j;
                    break;
                }
            }
        }

        /* Finally remove the selected key. */
        if (bestkey) {
            db = server.db+bestdbid;
            robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
            /* We compute the amount of memory freed by db*Delete() alone.
             * It is possible that actually the memory needed to propagate
             * the DEL in AOF and replication link is greater than the one
             * we are freeing removing the key, but we can't account for
             * that otherwise we would never exit the loop.
             *
             * Same for CSC invalidation messages generated by signalModifiedKey.
             *
             * AOF and Output buffer memory will be freed eventually so
             * we only care about memory used by the key space. */
            delta = (long long) zmalloc_used_memory();
            latencyStartMonitor(eviction_latency);
            if (server.lazyfree_lazy_eviction)
                dbAsyncDelete(db,keyobj);
            else
                dbSyncDelete(db,keyobj);
            latencyEndMonitor(eviction_latency);
            latencyAddSampleIfNeeded("eviction-del",eviction_latency);
            delta -= (long long) zmalloc_used_memory();
            mem_freed += delta;
            server.stat_evictedkeys++;
            signalModifiedKey(NULL,db,keyobj);
            notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
                keyobj, db->id);
            propagateDeletion(db,keyobj,server.lazyfree_lazy_eviction);
            decrRefCount(keyobj);
            keys_freed++;

            if (keys_freed % 16 == 0) {
                /* When the memory to free starts to be big enough, we may
                 * start spending so much time here that is impossible to
                 * deliver data to the replicas fast enough, so we force the
                 * transmission here inside the loop. */
                if (slaves) flushSlavesOutputBuffers();

                /* Normally our stop condition is the ability to release
                 * a fixed, pre-computed amount of memory. However when we
                 * are deleting objects in another thread, it's better to
                 * check, from time to time, if we already reached our target
                 * memory, since the "mem_freed" amount is computed only
                 * across the dbAsyncDelete() call, while the thread can
                 * release the memory all the time. */
                if (server.lazyfree_lazy_eviction) {
                    if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) {
                        break;
                    }
                }

                /* After some time, exit the loop early - even if memory limit
                 * hasn't been reached.  If we suddenly need to free a lot of
                 * memory, don't want to spend too much time here.  */
                if (elapsedUs(evictionTimer) > eviction_time_limit_us) {
                    // We still need to free memory - start eviction timer proc
                    startEvictionTimeProc();
                    break;
                }
            }
        } else {
            goto cant_free; /* nothing to free... */
        }
    }
    /* at this point, the memory is OK, or we have reached the time limit */
    result = (isEvictionProcRunning) ? EVICT_RUNNING : EVICT_OK;

cant_free:
    if (result == EVICT_FAIL) {
        /* At this point, we have run out of evictable items.  It's possible
         * that some items are being freed in the lazyfree thread.  Perform a
         * short wait here if such jobs exist, but don't wait long.  */
        if (bioPendingJobsOfType(BIO_LAZY_FREE)) {
            usleep(eviction_time_limit_us);
            if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) {
                result = EVICT_OK;
            }
        }
    }

    serverAssert(server.core_propagates); /* This function should not be re-entrant */

    /* Propagate all DELs */
    propagatePendingCommands();

    server.core_propagates = prev_core_propagates;
    server.propagate_no_multi = 0;

    latencyEndMonitor(latency);
    latencyAddSampleIfNeeded("eviction-cycle",latency);

update_metrics:
    if (result == EVICT_RUNNING || result == EVICT_FAIL) {
        if (server.stat_last_eviction_exceeded_time == 0)
            elapsedStart(&server.stat_last_eviction_exceeded_time);
    } else if (result == EVICT_OK) {
        if (server.stat_last_eviction_exceeded_time != 0) {
            server.stat_total_eviction_exceeded_time += elapsedUs(server.stat_last_eviction_exceeded_time);
            server.stat_last_eviction_exceeded_time = 0;
        }
    }
    return result;
}

메모리가 충분히 확보가 될 때까지 반복하면서, 먼저 eviction 을 위한 bestKey를 찾고 이것을 eviction 하게 됩니다. 처음에는 evictionPoolPopulate 라는 함수를 통해서 eviction pool 에 샘플링 데이터를 추가하게 됩니다. 샘플링 방법은 랜덤하게 데이터를 가져와서 idle time 을 구하고 여기서 이 idle 값을 가지고 eviction pool을 채우게 됩니다.

/* This is a helper function for performEvictions(), it is used in order
 * to populate the evictionPool with a few entries every time we want to
 * expire a key. Keys with idle time bigger than one of the current
 * keys are added. Keys are always added if there are free entries.
 *
 * We insert keys on place in ascending order, so keys with the smaller
 * idle time are on the left, and keys with the higher idle time on the
 * right. */

void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {
    int j, k, count;
    dictEntry *samples[server.maxmemory_samples];

    count = dictGetSomeKeys(sampledict,samples,server.maxmemory_samples);
    for (j = 0; j < count; j++) {
        unsigned long long idle;
        sds key;
        robj *o;
        dictEntry *de;

        de = samples[j];
        key = dictGetKey(de);

        /* If the dictionary we are sampling from is not the main
         * dictionary (but the expires one) we need to lookup the key
         * again in the key dictionary to obtain the value object. */
        if (server.maxmemory_policy != MAXMEMORY_VOLATILE_TTL) {
            if (sampledict != keydict) de = dictFind(keydict, key);
            o = dictGetVal(de);
        }

        /* Calculate the idle time according to the policy. This is called
         * idle just because the code initially handled LRU, but is in fact
         * just a score where an higher score means better candidate. */
        if (server.maxmemory_policy & MAXMEMORY_FLAG_LRU) {
            idle = estimateObjectIdleTime(o);
        } else if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
            /* When we use an LRU policy, we sort the keys by idle time
             * so that we expire keys starting from greater idle time.
             * However when the policy is an LFU one, we have a frequency
             * estimation, and we want to evict keys with lower frequency
             * first. So inside the pool we put objects using the inverted
             * frequency subtracting the actual frequency to the maximum
             * frequency of 255. */
            idle = 255-LFUDecrAndReturn(o);
        } else if (server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {
            /* In this case the sooner the expire the better. */
            idle = ULLONG_MAX - (long)dictGetVal(de);
        } else {
            serverPanic("Unknown eviction policy in evictionPoolPopulate()");
        }

        /* Insert the element inside the pool.
         * First, find the first empty bucket or the first populated
         * bucket that has an idle time smaller than our idle time. */
        k = 0;
        while (k < EVPOOL_SIZE &&
               pool[k].key &&
               pool[k].idle < idle) k++;
        if (k == 0 && pool[EVPOOL_SIZE-1].key != NULL) {
            /* Can't insert if the element is < the worst element we have
             * and there are no empty buckets. */
            continue;
        } else if (k < EVPOOL_SIZE && pool[k].key == NULL) {
            /* Inserting into empty position. No setup needed before insert. */
        } else {
            /* Inserting in the middle. Now k points to the first element
             * greater than the element to insert.  */
            if (pool[EVPOOL_SIZE-1].key == NULL) {
                /* Free space on the right? Insert at k shifting
                 * all the elements from k to end to the right. */

                /* Save SDS before overwriting. */
                sds cached = pool[EVPOOL_SIZE-1].cached;
                memmove(pool+k+1,pool+k,
                    sizeof(pool[0])*(EVPOOL_SIZE-k-1));
                pool[k].cached = cached;
            } else {
                /* No free space on right? Insert at k-1 */
                k--;
                /* Shift all elements on the left of k (included) to the
                 * left, so we discard the element with smaller idle time. */
                sds cached = pool[0].cached; /* Save SDS before overwriting. */
                if (pool[0].key != pool[0].cached) sdsfree(pool[0].key);
                memmove(pool,pool+1,sizeof(pool[0])*k);
                pool[k].cached = cached;
            }
        }

        /* Try to reuse the cached SDS string allocated in the pool entry,
         * because allocating and deallocating this object is costly
         * (according to the profiler, not my fantasy. Remember:
         * premature optimization bla bla bla. */
        int klen = sdslen(key);
        if (klen > EVPOOL_CACHED_SDS_SIZE) {
            pool[k].key = sdsdup(key);
        } else {
            memcpy(pool[k].cached,key,klen+1);
            sdssetlen(pool[k].cached,klen);
            pool[k].key = pool[k].cached;
        }
        pool[k].idle = idle;
        pool[k].dbid = dbid;
    }
}

그럼 이제 eviciton 동작은 알았는데, evcition 을 위한 값은 언제 설정하게 될까요? Redis 내부에는 실제 정보를 가지는 redisObject 라는 타입이 있습니다. LRU_BITS 는 24bits 입니다.

typedef struct redisObject {
    unsigned type:4;
    unsigned encoding:4;
    unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
                            * LFU data (least significant 8 bits frequency
                            * and most significant 16 bits access time). */
    int refcount;
    void *ptr;
} robj;

그럼 이 LRU 값은 언제 업데이트가 될까요? 실제로 db.c 파일의 lookupKey 를 보면 거기서 접근할 때 또는 생성시에 LRU_CLOCK이라는 함수를 통해서 업데이트가 되어집니다.

robj *lookupKey(redisDb *db, robj *key, int flags) {
    dictEntry *de = dictFind(db->dict,key->ptr);
    robj *val = NULL;
    if (de) {
        val = dictGetVal(de);
        int force_delete_expired = flags & LOOKUP_WRITE;
        if (force_delete_expired) {
            /* Forcing deletion of expired keys on a replica makes the replica
             * inconsistent with the master. The reason it's allowed for write
             * commands is to make writable replicas behave consistently. It
             * shall not be used in readonly commands. Modules are accepted so
             * that we don't break old modules. */
            client *c = server.in_script ? scriptGetClient() : server.current_client;
            serverAssert(!c || !c->cmd || (c->cmd->flags & (CMD_WRITE|CMD_MODULE)));
        }
        if (expireIfNeeded(db, key, force_delete_expired)) {
            /* The key is no longer valid. */
            val = NULL;
        }
    }

    if (val) {
        /* Update the access time for the ageing algorithm.
         * Don't do it if we have a saving child, as this will trigger
         * a copy on write madness. */
        if (!hasActiveChildProcess() && !(flags & LOOKUP_NOTOUCH)){
            if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
                updateLFU(val);
            } else {
                val->lru = LRU_CLOCK();
            }
        }

        if (!(flags & (LOOKUP_NOSTATS | LOOKUP_WRITE)))
            server.stat_keyspace_hits++;
        /* TODO: Use separate hits stats for WRITE */
    } else {
        if (!(flags & (LOOKUP_NONOTIFY | LOOKUP_WRITE)))
            notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id);
        if (!(flags & (LOOKUP_NOSTATS | LOOKUP_WRITE)))
            server.stat_keyspace_misses++;
        /* TODO: Use separate misses stats and notify event for WRITE */
    }

    return val;
}

그리고 LRU_CLOCK 은 다음과 같이 구현되어 있습니다.

/* Return the LRU clock, based on the clock resolution. This is a time
 * in a reduced-bits format that can be used to set and check the
 * object->lru field of redisObject structures. */
unsigned int getLRUClock(void) {
    return (mstime()/LRU_CLOCK_RESOLUTION) & LRU_CLOCK_MAX;
}

/* This function is used to obtain the current LRU clock.
 * If the current resolution is lower than the frequency we refresh the
 * LRU clock (as it should be in production servers) we return the
 * precomputed value, otherwise we need to resort to a system call. */
unsigned int LRU_CLOCK(void) {
    unsigned int lruclock;
    if (1000/server.hz <= LRU_CLOCK_RESOLUTION) {
        atomicGet(server.lruclock,lruclock);
    } else {
        lruclock = getLRUClock();
    }
    return lruclock;
}

그런데 제가 위에서 Redis LRU는 샘플링을 해서 데이터를 가져와서 이를 처리한다고 했습니다. evictionPoolPopulate 함수였죠. 그래서 Redis의 LRU는 엄밀한 LRU가 아니라 Approximated LRU algorithm을 구현하고 있습니다. 즉 LRU를 위해서 Best Candidate 을 구하는 것이 아니라, 적당히 구해온다는 것입니다.(Redis 는 그냥 Sampling Key를 통해서 가져옵니다.)

Redis 2.x(지금은 6.0 시대!!!) 에서 Redis 3.0으로 넘어가면서 좀 더 실제에 가까운 LRU 알고리즘으로 구현이 바뀌었다고 합니다.(지금 코드는 6.0 기반이라… 좀 더 개선이?) 아래 그림에 대해서 다음과 같이 설명이 되어있습니다.

  • 밝은 회색 밴드는 Eviction 된 object
  • 회색 밴드는 Evcition 되지 않은 object
  • 녹색 밴드는 추가된 object

sampling이 5로 되어있는 것을 10으로 바꾸면 좀 더 좋은 결과를 보여주는데, 이 때 CPU 사용량이 더 늘어난다고 합니다.

LFU MODE

Redis 4.0 부터는 LFU(Least Frequently Used Eviction Mode) 가 추가되었습니다. LRU에 비해서 좀 더 좋은 성능을 보여준다고 하는데요. 이 부분은 다음번에 추가로 설명을 드리도록 하겠습니다. 참고로 hot keys 를 보기위해서는 LFU 모드를 사용해야만 하고 object freq <key> 명령을 통해서 hot key를 대략적으로 찾을 수 있습니다.

Reference

[입 개발] AWS VPC Peering

AWS VPC Peering 을 위해서 체크해야 하는 부분

  1. A VPC 에서 B VPC로 VPC Peering 신청(다른 계정의 VPC도 가능)
  2. B VPC에서는 VPC Peering 수락
  3. B VPC와 통신해야 하는 A VPC의 모든 Subnet 에 B VPC로의 routing 설정
    1. 해당 Subnet 에 할당된 Route Table 들에 추가해 주면 된다.
    2. 항목은 Peering 선택
    3. 예를 들어 A VPC private, public, db 등의 3개의 subnet 이 있을 경우, A private 에서 B private 로만 접근이 필요하면 A private subnect 과 연결된 route table 에 B VPC Peer로 향한 라우팅 정보를 추가하면 된다.
  4. A VPC와 통신해야 하는 B VPC의 모든 Subnect 에 A VPC로의 routing 설정
  5. 각기 접근 가능한 Security Group 설정을 해주어야 한다. 서로 접근이 가능하도록

이는 AWS에서 VPC Peering 은 VPC 단위로 라우팅은 subnet 단위로 이루어지기 때문이다.

[입 개발] MariaDB Connector 와 AWS Aurora

먼저 저는 자바를 잘 모르고 AWS Aurora도 잘 모르고 MariaDB Connector도 잘 모르는 초초보에 자바맹인것을 먼저 밝히고 해당 글을 작성합니다.

지인 분의 서비스가 Aurora RDS Mysql 을 쓰다가 Failover 를 한다고 해서 뭔가 잘못된 정보를 드렸다가, 자세히 보다보니, AWS Aurora의 Manual Scale Up을 하기 위해서는 다음과 같이 하면 된다고 합니다.

https://svrlove.blogspot.com/2019/05/aws-aurora-rds.html

요약하면, Reader를 먼저 Scale Up 하고 Failover 하면 된다고, 그냥 하면 될꺼라고 알려드려서 죄송합니다. 흑…

그런데 그 얘기를 듣고 나서, 다시 들은 얘기가 select 가 Reader로 가고 있다라는 것이었습니다. 이게 제가 듣기에는 완전히 이상한게… Connector 가 단순히 자동으로 Write는 Primary에게, Read는 Replica 에게 쿼리를 전달해 준다면, 굉장히 편리하긴 한데(완전히 다 좋은 건 아니지만…), 이렇게 될 경우 다음과 같이 Replication Lag가 발생하면, Write 후에 Read를 할 경우 새롭게 Write한 데이터가 아니라, 과거의 데이터를 읽을 수 있습니다. 다음과 같이 Primary DB에 A,B가 저장되면 이것은 Replica 에 다시 A,B가 저장되게 되는데 만약 Replica의 처리가 늦어져서 A, B가 아직 Replica에 반영되지 않았다면, 우리가 A를 select 하더라도 Replica에서는 데이터가 없다고 나올 것입니다.

그래서 이런 부분은 설정으로 명시적으로 이루어져야 사용자의 실수할 여지가 줄어들게 됩니다. 그런데 지인 분의 서버는 Select를 자동으로 Replica에서 했다고 하는 것입니다. 아무런 설정 없이…

그래서 먼저 관련 자료를 찾다보니, 인프런의 CTO 이신 창천항로님의 글이 보입니다. 일단 먼저 읽어보시길 권장합니다.

여길 보면 Aurora를 쓰면 @Transactional(readOnly=true) 를 붙여주면 자동으로 Reader 로 연결이 된다는 것입니다. 이걸 보고 제가 외친 한마디는 “이게 말이되?”, 어떻게 Spring JPA 쪽에서 Connection 정보를 알아서 연결이 됨? 이라는 의문에 쌓이게 되었습니다.

기본적으로 JPA에서 DataSource를 하나만 만들어주고 readOnly=true 가 셋팅이 되면 Connection에 Read Only를 설정하는 것으로 알고 있습니다.(그래봤자… DataSource 가 하나면 하나만 와야… 엉?)

그래서 일단 MariaDB Connector 소스를 받아봤습니다.(이때, 좀 더 조사부터 했어야 하는데… 내가 미…) MariaDB Connector는 다음 github에서 받을 수 있습니다. (https://github.com/mariadb-corporation/mariadb-connector-j)

그리고 열심히 소스를 보는데…(솔직히… 봐도 모르는…), 그러다가 먼저 CHANGELOG.md 파일을 보니 Aurora로 기능이 들어온 것들이 있습니다. 그중에 Relesae 1.5.1 관련해서 CONJ-325의 제목이 Aurora host auto-discovery 입니다. 이거다 싶어서 Aurora로 소스 코드를 검색해도 아무런 내용이 없습니다.

그래서 삽질로 spring-cloud-aws 의 JDBC 도 보러 갔다가… 아닌듯하여… 돌아오게 되는… 흑…(몰라~~~ 알수가 없어…)

그런데 지인분이 다음 링크를 던져주면서, LoadBalacing 기능이 Connector에 있다는 것을 알게 되었습니다.(https://mariadb.com/kb/en/failover-and-high-availability-with-mariadb-connector-j/?fbclid=IwAR2EnwLRBGc1T0bQLJTloP9WnisrjM0smV2h4bGa23UcT9Teq55gYVkwctI)

아래 내용을 보시면 이렇게 호스트 주소를 여러개 적어두면, 첫번째는 Primary, 나머지는 Replica로 동작한다는 것을 알 수 있고, @Transactional(readOnly=true) 를 설정하면 readOnly 용 Connection을 가져가서 아까 말한 것과 유사한 상황이 발생한다는 것을 알게 되었습니다.

여기까지 보면, 사실 눈치챈 분들도 계시겠지만, 저는 아주 초초보라, “jdbc:mysql://host/test” 이런 형태의 endpoint 만 사용해 보아서 사실… 전혀 눈치를 못챘던…

그런데, 이제 지인분이 다시 얘기해주시는 게 보이는데, connection url 에 “aurora” 가 포함되어 있다고 하시는 겁니다. 엉, 이게 뭐야… 하면서 위의 endpoint url을 다시보니… “replication” 이라는게 포함되어 있습니다. 그리고 다시 창천항로 님의 글을 보니… 거기도 jdbc url 이 “jdbc:mysql:aurora:…..” 이런식으로 되어있습니다.

그런데 최신 소스에서 아무리 검색해도 aurora라는 파일이나 String이 존재하지가 않습니다.(CHANGLELOG.md 제외) 그런데 replication 으로 검색을 해보니, HaMode.java(./src/main/java/org/mariadb/jdbc/export/HaMode.java) 라는 파일이 발견이 됩니다. 아래 내용을 보면 뭔가 Ha 관련 설정이 있는 것이 보입니다. 위의 jdbc url 의 HaMode 와 관련해서 뭔가 설정을 할 것 같은 부분이 보이는 거죠.

public enum HaMode {
  REPLICATION("replication") {
    public Optional<HostAddress> getAvailableHost(
        List<HostAddress> hostAddresses,
        ConcurrentMap<HostAddress, Long> denyList,
        boolean primary) {
      return HaMode.getAvailableHostInOrder(hostAddresses, denyList, primary);
    }
  },
  SEQUENTIAL("sequential") {
    public Optional<HostAddress> getAvailableHost(
        List<HostAddress> hostAddresses,
        ConcurrentMap<HostAddress, Long> denyList,
        boolean primary) {
      return getAvailableHostInOrder(hostAddresses, denyList, primary);
    }
  },
  LOADBALANCE("load-balance") {
    public Optional<HostAddress> getAvailableHost(
        List<HostAddress> hostAddresses,
        ConcurrentMap<HostAddress, Long> denyList,
        boolean primary) {
      // use in order not blacklisted server
      List<HostAddress> loopAddress = new ArrayList<>(hostAddresses);
      loopAddress.removeAll(denyList.keySet());
      Collections.shuffle(loopAddress);

      return loopAddress.stream().filter(e -> e.primary == primary).findFirst();
    }
  },

그리고 HaMode.java 에서 from이란 함수를 보면 다음과 같습니다. 이게 분명히 aurora 라는 문자가 있으면 HaMode 로 바꿔줘야 하는데 그런게 없네요.

  public static HaMode from(String value) {
    for (HaMode haMode : values()) {
      if (haMode.value.equalsIgnoreCase(value) || haMode.name().equalsIgnoreCase(value)) {
        return haMode;
      }
    }
    throw new IllegalArgumentException(
        String.format("Wrong argument value '%s' for HaMode", value));
  }

그래서 해당 github에서 해당 파일의 History를 뒤지다 보니 뭔가 이상합니다. 파일의 기록이 특정 시점 이전이 없는… -_- 그리고 버전을 보니 master branch 는 3.0.x 입니다. 그런데 주변에서 많이 사용하는 건 2.7.x네요. 급히 소스를 바꾸고 검색을 해보니… HaMode.java 가 옮겨졌는데 다음 위치에 있습니다. 우리가 찾던 AURORA가 있는 것이 보입니다. (./src/main/java/org/mariadb/jdbc/internal/util/constant/HaMode.java)

public enum HaMode {
  AURORA,
  REPLICATION,
  SEQUENTIAL,
  LOADBALANCE,
  NONE
}

그럼 관련 호스트 목록은 어디서 가져오는가? 2.7.x대에는 ./src/main/java/org/mariadb/jdbc/internal/failover/impl/AuroraListener.java 라는 파일이 존재하고 AuroraListener 에서 information_schema.replica_host_status 테이블에서 server_id에서 endpoint를 생성할 수 있습니다.

  private List<String> getCurrentEndpointIdentifiers(Protocol protocol) throws SQLException {
    List<String> endpoints = new ArrayList<>();
    try {
      proxy.lock.lock();
      try {
        // Deleted instance may remain in db for 24 hours so ignoring instances that have had no
        // change
        // for 3 minutes
        Results results = new Results();
        protocol.executeQuery(
            false,
            results,
            "select server_id, session_id from information_schema.replica_host_status "
                + "where last_update_timestamp > now() - INTERVAL 3 MINUTE");
        results.commandEnd();
        ResultSet resultSet = results.getResultSet();

        while (resultSet.next()) {
          endpoints.add(resultSet.getString(1) + "." + clusterDnsSuffix);
        }

        // randomize order for distributed load-balancing
        Collections.shuffle(endpoints);

      } finally {
        proxy.lock.unlock();
      }
    } catch (SQLException qe) {
      logger.warning("SQL exception occurred: " + qe.getMessage());
      if (protocol.getProxy().hasToHandleFailover(qe)) {
        if (masterProtocol == null || masterProtocol.equals(protocol)) {
          setMasterHostFail();
        } else if (secondaryProtocol.equals(protocol)) {
          setSecondaryHostFail();
        }
        addToBlacklist(protocol.getHostAddress());
        reconnectFailedConnection(new SearchFilter(isMasterHostFail(), isSecondaryHostFail()));
      }
    }

    return endpoints;
  }

그래서 결론부터 말하자면 jdbc url 의 endpoint 에 “jdbc:mysql:aurora” 라고 설정을 하면 mariadb connector 가 내부적으로 저런 작업을 다 해주게 됩니다. 그게 아니라면 HaMode의 다른 값을 보고 적절히 설정하시면 원하는 형태로 나눌 수 가 있게 됩니다. 코드를 대충 보셨으니 저 값을 쓰면 어떻게 되겠구나라고 보시면 될듯 합니다. 참고로 저런 정책을 파싱하는 부분은 ./src/main/java/org/mariadb/jdbc/UrlParser.java 를 보시면 잘 나와있습니다.

즉 aurora를 안 붙이면, 저렇게 동작하지 않는다는 얘기…

그런데… One More Thing…

MariaDB Connector 가 있고, Mysql Connector 가 있습니다. Mysql Connector는 과연 같은 걸 지원할까요? 그래서 Mysql Connector 소스도 까 보았는데, 재밌는건 “jdbc:mysql:[replication|loadbalance|failover]” 는 공통으로 지원이 됩니다. 그런데 Aurora는 안보이더군요.

사실 이 두 개의 Driver를 바꾸면 다음과 같이 Timestamp나 여러가지 다른 이슈들이 있을 수 있으므로 확인이 필요합니다.

다 끝나고 나서, 좋은 레퍼런스들이 나오기 시작했는데, 기계인간 이종립 님의 문서도 좋았습니다.

솔직히 저는 저런 문법을 처음 보았기 때문에 흑… 이해를 잘 못했는데, 다른 분들은 이미 다 아실듯한…

참고로 최종적으로 @Transactional(readOnly=True) 일 경우 Connection.setReadOnly를 호출하게 되는데, src/main/java/org/mariadb/jdbc/internal/failover/FailoverProxy.java 를 보게 되면 해당 함수가 호출되면 다음과 같은 작업을 하게 됩니다.

  @Override
  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
      ......
      case METHOD_SET_READ_ONLY:
        this.listener.switchReadOnlyConnection((Boolean) args[0]);
      case METHOD_GET_READ_ONLY:
        return this.listener.isReadOnly();
      ......
   }

위의 코드를 보면 listener의 switchReadOnlyConnection 를 호출하는 것을 보실 수 있습니다.

switchReadOnlyConnection 은 Connection의 종류에 따라서 달라지는데, MasterFailoverListener 이냐, MasterReplicasListener.java 냐에 따라서 다르게 구현되어 있습니다.

MasterReplicasListener가 우리가 원하는 read-only(secondary)로 바꾸어 주는 부분이므로 src/main/java/org/mariadb/jdbc/internal/failover/impl/MastersReplicasListener.java 를 확인해봅니다.

  /**
   * Switch to a read-only(secondary) or read and write connection(master).
   *
   * @param mustBeReadOnly the read-only status asked
   * @throws SQLException if operation hasn't change protocol
   */
  @Override
  public void switchReadOnlyConnection(Boolean mustBeReadOnly) throws SQLException {
    checkWaitingConnection();
    if (currentReadOnlyAsked != mustBeReadOnly) {
      proxy.lock.lock();
      try {
        // another thread updated state
        if (currentReadOnlyAsked == mustBeReadOnly) {
          return;
        }
        currentReadOnlyAsked = mustBeReadOnly;
        if (currentReadOnlyAsked) {
          if (currentProtocol == null) {
            // switching to secondary connection
            currentProtocol = this.secondaryProtocol;
          } else if (currentProtocol.isMasterConnection()) {
            // must change to replica connection
            if (!isSecondaryHostFail()) {
              try {
                // switching to secondary connection
                syncConnection(this.masterProtocol, this.secondaryProtocol);
                currentProtocol = this.secondaryProtocol;
                // current connection is now secondary
                return;
              } catch (SQLException e) {
                // switching to secondary connection failed
                if (setSecondaryHostFail()) {
                  addToBlacklist(secondaryProtocol.getHostAddress());
                }
              }
            }
            // stay on master connection, since replica connection is fail
            FailoverLoop.addListener(this);
          }
        } else {
          if (currentProtocol == null) {
            // switching to master connection
            currentProtocol = this.masterProtocol;
          } else if (!currentProtocol.isMasterConnection()) {
            // must change to master connection
            if (!isMasterHostFail()) {
              try {
                // switching to master connection
                syncConnection(this.secondaryProtocol, this.masterProtocol);
                currentProtocol = this.masterProtocol;
                // current connection is now master
                return;
              } catch (SQLException e) {
                // switching to master connection failed
                if (setMasterHostFail()) {
                  addToBlacklist(masterProtocol.getHostAddress());
                }
              }
            } else if (urlParser.getOptions().allowMasterDownConnection) {
              currentProtocol = null;
              return;
            }

            try {
              reconnectFailedConnection(new SearchFilter(true, false));
              handleFailLoop();

            } catch (SQLException e) {
              // stop failover, since we will throw a connection exception that will close the
              // connection.
              FailoverLoop.removeListener(this);
              HostAddress failHost =
                  (this.masterProtocol != null) ? this.masterProtocol.getHostAddress() : null;
              throwFailoverMessage(
                  failHost, true, new SQLException("master connection failed"), false);
            }

            if (!isMasterHostFail()) {
              // connection established, no need to send Exception !
              // switching to master connection
              try {
                syncConnection(this.secondaryProtocol, this.masterProtocol);
                currentProtocol = this.masterProtocol;
              } catch (SQLException e) {
                // switching to master connection failed
                if (setMasterHostFail()) {
                  addToBlacklist(masterProtocol.getHostAddress());
                }
              }
            } else {
              currentReadOnlyAsked = !mustBeReadOnly;
              HostAddress failHost =
                  (this.masterProtocol != null) ? this.masterProtocol.getHostAddress() : null;
              throwFailoverMessage(
                  failHost, true, new SQLException("master connection failed"), false);
            }
          }
        }
      } finally {
        proxy.lock.unlock();
      }
    }
  }

[입 컨설팅] Self Managed Redis 가 좋을까? Managed Redis 가 좋을까?

다음 블로그는 Open Up의 도움을 받아서 작성되었습니다.

Redis 를 사용하는 방법은 여러가지가 있습니다. On-Premis 에서도 직접 서버에 Redis를 설치하고 이용하는 Self Managed 형태나 회사에서 제공하는 Managed 형태가 있습니다.(큰 N이나 K 같은 회사에서는 내부에서 관리해주는 일종의 Managed 형태로 Redis를 사용하고 있습니다.)

AWS와 같은 Cloud 서비스를 사용하는 경우에도 많은 경우 Managed 형태를 사용해야 하는지 아니면 Self-Managed 를 사용해야 하는지에 대한 고민을 많이 하게 됩니다. 그렇다면 어떤 걸 이용하는 것이 더 좋을까요?

결론부터 말하자면, 결국 Case By Case 입니다. 어떤 기준에서 선택하는가? 이 것을 쉽게 관리할 능력이 있는 사람이 있는가에 따라서 선택할 수 있는 옵션이 달라지게 됩니다.

일단 여기서는 AWS의 ElastiCache 와 EC2에서 직접 설치해서 사용하는 경우만 비교를 해야 할듯합니다.

먼저 가격에 대해서 비교를 해보도록 하겠습니다. 가장 민감한… 일단 가격 비교는 ap-northeast-2 아시아/서울 리전을 대상으로 했습니다.

사양AWS ElastiCache(USD/hour)EC2(USD/hour)
t4g.medium0.0940.0416
t3.medium0.0990.052
m6g.xlarge0.3630.188
m6g.16xlarge5.8063.008
r6g.16xlarge7.8623.904
r5.24xlarge12.4147.296

위의 가격은 ElastiCache를 사용하는 것과 EC2를 사용하는 것의 가격을 비교한 것입니다. 사양에 따라서 다르긴 한데, 대략 2배 정도의 가격 차이가 나는 것을 볼 수 있습니다.

두 번째는 CPU의 활용도 입니다. 위의 스펙의 사양을 보면 다음과 같습니다.

사양VCPUMem(GB)Mem/VCPU
t4g.medium242GB
t3.meduim242GB
m6g.xlarge4164GB
m6g.16xlarge642564GB
r6g.16xlarge645128GB
r5.24xlarge967688GB

Redis 는 기본적으로 Single Threaded 입니다. Redis 6나 AWS 의 ElastiCache 5.x에서 r5.2xlarge 부터는 내부적으로 Multi-Threaded IO를 이용해서 Read/Write 부분의 패킷처리를 한다고 하지만, 내부적으로 프로세스 처리의 기본은 Single Threaded 입니다. 즉 아무리 VCPU가 많아도 제대로 처리가 안될 수 있습니다.

그리고, ElastiCache 에서 어떻게 메모리를 관리하는지 알 수 없지만 NUMA아키텍처를 사용한다면 절반 이상의 메모리를 사용할 때 속도에 문제가 발생할 여지가 있습니다. (메모리를 절반 이하로 사용한다면 이슈가 없습니다.)

즉, vCPU를 활용하고 NUMA 이슈를 피하기 위해서, EC2에서 Redis를 직접적으로 운영한다면, CPU와 메모리를 더 효율적으로 사용할 수 있으므로, 좀 더 많은 Traffic을 이용할 수 있습니다. 이를 위해, 실제로 하나의 EC2 인스턴스에 여러 개의 Redis 인스턴스를 띄우게 됩니다. 용량이 클수록 적당히 많은 메모리를 할당하는 형태로 가지만, 하나의 Redis당 20GB 이상은 띄우지 않는게 관리가 용이합니다.

지금까지 Self-Managed 의 장점을 얘기했지만, 반대로 이 점들은 다시 Self-Managed의 단점이기도 합니다.

AWS의 ElastiCache는 기본적으로 도메인 기반의 Failover를 처리해줍니다. 예를 들어, charsyam-redis-master 라는 도메인을 이용하다가 장애가 발생하면, Replica 노드가 자동으로 charsyam-redis-master 라는 도메인을 가져가서, 클라이언트가 재접속 하는 것 만으로 Transparent 하게 Failover가 가능합니다.

예전에 카카오의 경우 RHA(Redis High Availabilty) 라는 기능을 만들어서 AWS의 ElastiCache 와 비슷한 방식의 Failover를 제공했습니다. 자세한 것은 https://tech.kakao.com/2016/03/18/redis-ha-dns/

즉, 직접적으로 EC2에서 Redis를 운영하신다면, 이런 부분을 직접적으로 관리를 해야 합니다. 적은 인력으로 이런 기능을 구현하고 관리하는 것은 쉬운 일이 아닙니다.

결과적으로 정리를 하면 비용적인 측면이 중요하다면, 그리고 충분한 관리 인력이 있다면 Self-Managed 가 더 효율적일 수 있습니다. 반대로 이런 부분에 대한 고민을 하지 않고, 다른 부분에 더 신경을 쓰고 싶다면 Managed Redis를 사용하는 것이 훨씬 더 유리하다고 생각합니다.

단순히 Failover 뿐만이 아니라, 모니터링이나 이런 부분에 대해서는 운영관리가 필요합니다.

만약에 저에게 너는 뭘 하고 싶냐고 하면, 저는 무조건 남이 관리해주는 거라고 말을 합니다. 즉, Managed가 훨씬 좋을듯 합니다.

[입 개발] Redis 에서 Redis Cluster 로 갈 때 주의해야할 부분들

Redis 를 사용하다 보면, 결국 데이터의 관리를 위해서 Redis Cluster 의 사용을 고민하게 됩니다. 그러면 Redis 를 사용하다가 Redis Cluster 로 가게 될 때 주의해야 할 부분은 어떤 것인지 얘기해보도록 하겠습니다.

  • Redis Cluster 에서는 DB 0만 사용할 수 있습니다.

Redis를 사용하는 많은 경우 select 명령을 통해서 DB를 사용하는 케이스가 많습니다. 예를 들어 0번은 일반적인 캐싱 관련 1번은 유저 관련, 2번은 Follower 관련 이런식으로 나눠서 데이터를 저장하게 됩니다.

코드를 보시면 다음과 같이 select 명령을 cluster 에서는 사용할 수 가 없습니다.

void selectCommand(client *c) {
    int id;

    if (getIntFromObjectOrReply(c, c->argv[1], &id, NULL) != C_OK)
        return;

    if (server.cluster_enabled && id != 0) {
        addReplyError(c,"SELECT is not allowed in cluster mode");
        return;
    }
    if (selectDb(c,id) == C_ERR) {
        addReplyError(c,"DB index is out of range");
    } else {
        addReply(c,shared.ok);
    }
}

즉 다른 DB를 선택해서 저장할 수 없다는 것입니다.

이 문제를 해결하기 위해서는 전부 하나의 DB로 저장하고 적절히 prefix를 통해서 구분해야 합니다. 다만 여러 DB로 나눌 경우에는 하나의 DB내의 Key 전체를 스캔하는 시간이 줄어들게 되지만, 결국 전체를 scan 해야 하는 문제가 발생하긴 합니다.(적절히 Scan 명령을 잘 사용할 수 밖에 없습니다.)

  • Redis Cluster 에서는 SORT 명령을 사용할 수 없습니다.

정확하게 말하면 SORT 명령을 무조건 사용할 수 없다는 아닙니다. 그래서 더 문제가 됩니다. cluster 모드에서는 sort 의 BY 와 GET을 쓸 수 가 없습니다. 그런데 해당 기능들이 Redis를 활용하는 곳에서는 많이 활용하는 기능입니다. sortCommandGeneric 은 꽤 긴 함수이지만 중간 부분만 확인해 보시면 금방 막혀있는 부분을 확인하실 수 있습니다.

void sortCommandGeneric(client *c, int readonly) {
    list *operations;
    unsigned int outputlen = 0;
    int desc = 0, alpha = 0;
    long limit_start = 0, limit_count = -1, start, end;
    int j, dontsort = 0, vectorlen;
    int getop = 0; /* GET operation counter */
    int int_conversion_error = 0;
    int syntax_error = 0;
    robj *sortval, *sortby = NULL, *storekey = NULL;
    redisSortObject *vector; /* Resulting vector to sort */

    /* Create a list of operations to perform for every sorted element.
     * Operations can be GET */
    operations = listCreate();
    listSetFreeMethod(operations,zfree);
    j = 2; /* options start at argv[2] */

    /* The SORT command has an SQL-alike syntax, parse it */
    while(j < c->argc) {
        int leftargs = c->argc-j-1;
        if (!strcasecmp(c->argv[j]->ptr,"asc")) {
            desc = 0;
        } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
            desc = 1;
        } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
            alpha = 1;
        } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
            if ((getLongFromObjectOrReply(c, c->argv[j+1], &limit_start, NULL)
                 != C_OK) ||
                (getLongFromObjectOrReply(c, c->argv[j+2], &limit_count, NULL)
                 != C_OK))
            {
                syntax_error++;
                break;
            }
            j+=2;
        } else if (readonly == 0 && !strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
            storekey = c->argv[j+1];
            j++;
        } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
            sortby = c->argv[j+1];
            /* If the BY pattern does not contain '*', i.e. it is constant,
             * we don't need to sort nor to lookup the weight keys. */
            if (strchr(c->argv[j+1]->ptr,'*') == NULL) {
                dontsort = 1;
            } else {
                /* If BY is specified with a real patter, we can't accept
                 * it in cluster mode. */
                if (server.cluster_enabled) {
                    addReplyError(c,"BY option of SORT denied in Cluster mode.");
                    syntax_error++;
                    break;
                }
            }
            j++;
        } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
            if (server.cluster_enabled) {
                addReplyError(c,"GET option of SORT denied in Cluster mode.");
                syntax_error++;
                break;
            }
            listAddNodeTail(operations,createSortOperation(
                SORT_OP_GET,c->argv[j+1]));
            getop++;
            j++;
        } else {
            addReplyErrorObject(c,shared.syntaxerr);
            syntax_error++;
            break;
        }
        j++;
    }

    /* Handle syntax errors set during options parsing. */
    if (syntax_error) {
        listRelease(operations);
        return;
    }

    /* Lookup the key to sort. It must be of the right types */
    if (!storekey)
        sortval = lookupKeyRead(c->db,c->argv[1]);
    else
        sortval = lookupKeyWrite(c->db,c->argv[1]);
    if (sortval && sortval->type != OBJ_SET &&
                   sortval->type != OBJ_LIST &&
                   sortval->type != OBJ_ZSET)
    {
        listRelease(operations);
        addReplyErrorObject(c,shared.wrongtypeerr);
        return;
    }

    /* Now we need to protect sortval incrementing its count, in the future
     * SORT may have options able to overwrite/delete keys during the sorting
     * and the sorted key itself may get destroyed */
    if (sortval)
        incrRefCount(sortval);
    else
        sortval = createQuicklistObject();


    /* When sorting a set with no sort specified, we must sort the output
     * so the result is consistent across scripting and replication.
     *
     * The other types (list, sorted set) will retain their native order
     * even if no sort order is requested, so they remain stable across
     * scripting and replication. */
    if (dontsort &&
        sortval->type == OBJ_SET &&
        (storekey || c->flags & CLIENT_LUA))
    {
        /* Force ALPHA sorting */
        dontsort = 0;
        alpha = 1;
        sortby = NULL;
    }
    /* Destructively convert encoded sorted sets for SORT. */
    if (sortval->type == OBJ_ZSET)
        zsetConvert(sortval, OBJ_ENCODING_SKIPLIST);

    /* Obtain the length of the object to sort. */
    switch(sortval->type) {
    case OBJ_LIST: vectorlen = listTypeLength(sortval); break;
    case OBJ_SET: vectorlen =  setTypeSize(sortval); break;
    case OBJ_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break;
    default: vectorlen = 0; serverPanic("Bad SORT type"); /* Avoid GCC warning */
    }

    /* Perform LIMIT start,count sanity checking. */
    start = (limit_start < 0) ? 0 : limit_start;
    end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
    if (start >= vectorlen) {
        start = vectorlen-1;
        end = vectorlen-2;
    }
    if (end >= vectorlen) end = vectorlen-1;

    /* Whenever possible, we load elements into the output array in a more
     * direct way. This is possible if:
     *
     * 1) The object to sort is a sorted set or a list (internally sorted).
     * 2) There is nothing to sort as dontsort is true (BY <constant string>).
     *
     * In this special case, if we have a LIMIT option that actually reduces
     * the number of elements to fetch, we also optimize to just load the
     * range we are interested in and allocating a vector that is big enough
     * for the selected range length. */
    if ((sortval->type == OBJ_ZSET || sortval->type == OBJ_LIST) &&
        dontsort &&
        (start != 0 || end != vectorlen-1))
    {
        vectorlen = end-start+1;
    }

    /* Load the sorting vector with all the objects to sort */
    vector = zmalloc(sizeof(redisSortObject)*vectorlen);
    j = 0;

    if (sortval->type == OBJ_LIST && dontsort) {
        /* Special handling for a list, if 'dontsort' is true.
         * This makes sure we return elements in the list original
         * ordering, accordingly to DESC / ASC options.
         *
         * Note that in this case we also handle LIMIT here in a direct
         * way, just getting the required range, as an optimization. */
        if (end >= start) {
            listTypeIterator *li;
            listTypeEntry entry;
            li = listTypeInitIterator(sortval,
                    desc ? (long)(listTypeLength(sortval) - start - 1) : start,
                    desc ? LIST_HEAD : LIST_TAIL);

            while(j < vectorlen && listTypeNext(li,&entry)) {
                vector[j].obj = listTypeGet(&entry);
                vector[j].u.score = 0;
                vector[j].u.cmpobj = NULL;
                j++;
            }
            listTypeReleaseIterator(li);
            /* Fix start/end: output code is not aware of this optimization. */
            end -= start;
            start = 0;
        }
    } else if (sortval->type == OBJ_LIST) {
        listTypeIterator *li = listTypeInitIterator(sortval,0,LIST_TAIL);
        listTypeEntry entry;
        while(listTypeNext(li,&entry)) {
            vector[j].obj = listTypeGet(&entry);
            vector[j].u.score = 0;
            vector[j].u.cmpobj = NULL;
            j++;
        }
        listTypeReleaseIterator(li);
    } else if (sortval->type == OBJ_SET) {
        setTypeIterator *si = setTypeInitIterator(sortval);
        sds sdsele;
        while((sdsele = setTypeNextObject(si)) != NULL) {
            vector[j].obj = createObject(OBJ_STRING,sdsele);
            vector[j].u.score = 0;
            vector[j].u.cmpobj = NULL;
            j++;
        }
        setTypeReleaseIterator(si);
    } else if (sortval->type == OBJ_ZSET && dontsort) {
        /* Special handling for a sorted set, if 'dontsort' is true.
         * This makes sure we return elements in the sorted set original
         * ordering, accordingly to DESC / ASC options.
         *
         * Note that in this case we also handle LIMIT here in a direct
         * way, just getting the required range, as an optimization. */

        zset *zs = sortval->ptr;
        zskiplist *zsl = zs->zsl;
        zskiplistNode *ln;
        sds sdsele;
        int rangelen = vectorlen;

        /* Check if starting point is trivial, before doing log(N) lookup. */
        if (desc) {
            long zsetlen = dictSize(((zset*)sortval->ptr)->dict);

            ln = zsl->tail;
            if (start > 0)
                ln = zslGetElementByRank(zsl,zsetlen-start);
        } else {
            ln = zsl->header->level[0].forward;
            if (start > 0)
                ln = zslGetElementByRank(zsl,start+1);
        }

        while(rangelen--) {
            serverAssertWithInfo(c,sortval,ln != NULL);
            sdsele = ln->ele;
            vector[j].obj = createStringObject(sdsele,sdslen(sdsele));
            vector[j].u.score = 0;
            vector[j].u.cmpobj = NULL;
            j++;
            ln = desc ? ln->backward : ln->level[0].forward;
        }
        /* Fix start/end: output code is not aware of this optimization. */
        end -= start;
        start = 0;
    } else if (sortval->type == OBJ_ZSET) {
        dict *set = ((zset*)sortval->ptr)->dict;
        dictIterator *di;
        dictEntry *setele;
        sds sdsele;
        di = dictGetIterator(set);
        while((setele = dictNext(di)) != NULL) {
            sdsele =  dictGetKey(setele);
            vector[j].obj = createStringObject(sdsele,sdslen(sdsele));
            vector[j].u.score = 0;
            vector[j].u.cmpobj = NULL;
            j++;
        }
        dictReleaseIterator(di);
    } else {
        serverPanic("Unknown type");
    }
    serverAssertWithInfo(c,sortval,j == vectorlen);

    /* Now it's time to load the right scores in the sorting vector */
    if (!dontsort) {
        for (j = 0; j < vectorlen; j++) {
            robj *byval;
            if (sortby) {
                /* lookup value to sort by */
                byval = lookupKeyByPattern(c->db,sortby,vector[j].obj,storekey!=NULL);
                if (!byval) continue;
            } else {
                /* use object itself to sort by */
                byval = vector[j].obj;
            }

            if (alpha) {
                if (sortby) vector[j].u.cmpobj = getDecodedObject(byval);
            } else {
                if (sdsEncodedObject(byval)) {
                    char *eptr;

                    vector[j].u.score = strtod(byval->ptr,&eptr);
                    if (eptr[0] != '\0' || errno == ERANGE ||
                        isnan(vector[j].u.score))
                    {
                        int_conversion_error = 1;
                    }
                } else if (byval->encoding == OBJ_ENCODING_INT) {
                    /* Don't need to decode the object if it's
                     * integer-encoded (the only encoding supported) so
                     * far. We can just cast it */
                    vector[j].u.score = (long)byval->ptr;
                } else {
                    serverAssertWithInfo(c,sortval,1 != 1);
                }
            }

            /* when the object was retrieved using lookupKeyByPattern,
             * its refcount needs to be decreased. */
            if (sortby) {
                decrRefCount(byval);
            }
        }


        server.sort_desc = desc;
        server.sort_alpha = alpha;
        server.sort_bypattern = sortby ? 1 : 0;
        server.sort_store = storekey ? 1 : 0;
        if (sortby && (start != 0 || end != vectorlen-1))
            pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
        else
            qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
    }

    /* Send command output to the output buffer, performing the specified
     * GET/DEL/INCR/DECR operations if any. */
    outputlen = getop ? getop*(end-start+1) : end-start+1;
    if (int_conversion_error) {
        addReplyError(c,"One or more scores can't be converted into double");
    } else if (storekey == NULL) {
        /* STORE option not specified, sent the sorting result to client */
        addReplyArrayLen(c,outputlen);
        for (j = start; j <= end; j++) {
            listNode *ln;
            listIter li;

            if (!getop) addReplyBulk(c,vector[j].obj);
            listRewind(operations,&li);
            while((ln = listNext(&li))) {
                redisSortOperation *sop = ln->value;
                robj *val = lookupKeyByPattern(c->db,sop->pattern,
                    vector[j].obj,storekey!=NULL);

                if (sop->type == SORT_OP_GET) {
                    if (!val) {
                        addReplyNull(c);
                    } else {
                        addReplyBulk(c,val);
                        decrRefCount(val);
                    }
                } else {
                    /* Always fails */
                    serverAssertWithInfo(c,sortval,sop->type == SORT_OP_GET);
                }
            }
        }
    } else {
        robj *sobj = createQuicklistObject();

        /* STORE option specified, set the sorting result as a List object */
        for (j = start; j <= end; j++) {
            listNode *ln;
            listIter li;

            if (!getop) {
                listTypePush(sobj,vector[j].obj,LIST_TAIL);
            } else {
                listRewind(operations,&li);
                while((ln = listNext(&li))) {
                    redisSortOperation *sop = ln->value;
                    robj *val = lookupKeyByPattern(c->db,sop->pattern,
                        vector[j].obj,storekey!=NULL);

                    if (sop->type == SORT_OP_GET) {
                        if (!val) val = createStringObject("",0);

                        /* listTypePush does an incrRefCount, so we should take care
                         * care of the incremented refcount caused by either
                         * lookupKeyByPattern or createStringObject("",0) */
                        listTypePush(sobj,val,LIST_TAIL);
                        decrRefCount(val);
                    } else {
                        /* Always fails */
                        serverAssertWithInfo(c,sortval,sop->type == SORT_OP_GET);
                    }
                }
            }
        }
        if (outputlen) {
            setKey(c,c->db,storekey,sobj);
            notifyKeyspaceEvent(NOTIFY_LIST,"sortstore",storekey,
                                c->db->id);
            server.dirty += outputlen;
        } else if (dbDelete(c->db,storekey)) {
            signalModifiedKey(c,c->db,storekey);
            notifyKeyspaceEvent(NOTIFY_GENERIC,"del",storekey,c->db->id);
            server.dirty++;
        }
        decrRefCount(sobj);
        addReplyLongLong(c,outputlen);
    }
    /* Cleanup */
    for (j = 0; j < vectorlen; j++)
        decrRefCount(vector[j].obj);

    decrRefCount(sortval);
    listRelease(operations);
    for (j = 0; j < vectorlen; j++) {
        if (alpha && vector[j].u.cmpobj)
            decrRefCount(vector[j].u.cmpobj);
    }
    zfree(vector);
}

BY, GET을 SORT에서 쓸 수 없는 것은, BY, GET이 다른 KEY를 가져오는 기능을 사용하기 때문입니다. Cluster 에서는 KEY가 여러 서버에 분산되어 있는데, Redis Cluster에서 서로 KEY를 필요에 따라서 가져오는 것은 허용하지 않는 정책을 쓰고 있습니다. 즉 해당 이슈를 해결하기 위해서는 사용자가 직접 해당 KEY들을 가져와서 필요한 연산을 해줘야 합니다.

  • Multi Key 를 사용하는 명령들은 대부분 같은 slot 에 있을 때만 동작합니다.

Cluster 에서 데이터를 각 노드로 분배하는 것은 hash(key) % 16384 이런 식으로 슬롯이 구분되고, 해당 슬롯이 각 노드에 속하게 됩니다. 그래서 기본적으로 Multi Key에서는 각 Key가 다른 slot 에 속해서, 다른 서버에 속할 수 있습니다.

    /* If cluster is enabled perform the cluster redirection here.
     * However we don't perform the redirection if:
     * 1) The sender of this command is our master.
     * 2) The command has no key arguments. */
    if (server.cluster_enabled &&
        !(c->flags & CLIENT_MASTER) &&
        !(c->flags & CLIENT_LUA &&
          server.lua_caller->flags & CLIENT_MASTER) &&
        !(!c->cmd->movablekeys && c->cmd->key_specs_num == 0 &&
          c->cmd->proc != execCommand))
    {
        int hashslot;
        int error_code;
        clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
                                        &hashslot,&error_code);
        if (n == NULL || n != server.cluster->myself) {
            if (c->cmd->proc == execCommand) {
                discardTransaction(c);
            } else {
                flagTransaction(c);
            }
            clusterRedirectClient(c,n,hashslot,error_code);
            c->cmd->rejected_calls++;
            return C_OK;
        }
    }

getNodeByQuery 에서 실제로 error code를 채우고 거기에 따라서 clusterRedirectClient 에서 에러 값을 만들어주게 됩니다.

/* Send the client the right redirection code, according to error_code
 * that should be set to one of CLUSTER_REDIR_* macros.
 *
 * If CLUSTER_REDIR_ASK or CLUSTER_REDIR_MOVED error codes
 * are used, then the node 'n' should not be NULL, but should be the
 * node we want to mention in the redirection. Moreover hashslot should
 * be set to the hash slot that caused the redirection. */
void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code) {
    if (error_code == CLUSTER_REDIR_CROSS_SLOT) {
        addReplyError(c,"-CROSSSLOT Keys in request don't hash to the same slot");
    } else if (error_code == CLUSTER_REDIR_UNSTABLE) {
        /* The request spawns multiple keys in the same slot,
         * but the slot is not "stable" currently as there is
         * a migration or import in progress. */
        addReplyError(c,"-TRYAGAIN Multiple keys request during rehashing of slot");
    } else if (error_code == CLUSTER_REDIR_DOWN_STATE) {
        addReplyError(c,"-CLUSTERDOWN The cluster is down");
    } else if (error_code == CLUSTER_REDIR_DOWN_RO_STATE) {
        addReplyError(c,"-CLUSTERDOWN The cluster is down and only accepts read commands");
    } else if (error_code == CLUSTER_REDIR_DOWN_UNBOUND) {
        addReplyError(c,"-CLUSTERDOWN Hash slot not served");
    } else if (error_code == CLUSTER_REDIR_MOVED ||
               error_code == CLUSTER_REDIR_ASK)
    {
        /* Redirect to IP:port. Include plaintext port if cluster is TLS but
         * client is non-TLS. */
        int use_pport = (server.tls_cluster &&
                         c->conn && connGetType(c->conn) != CONN_TYPE_TLS);
        int port = use_pport && n->pport ? n->pport : n->port;
        addReplyErrorSds(c,sdscatprintf(sdsempty(),
            "-%s %d %s:%d",
            (error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
            hashslot, n->ip, port));
    } else {
        serverPanic("getNodeByQuery() unknown error.");
    }
}

보통 Multi Key는 같은 slot 에 있는 key 들만 허용이 되므로, 이를 위해서는 hash tag를 잘 이용해서 같은 slot에 배치하는 것이 중요합니다. 키 이름에 {…} 이런식으로 붙여주면 해당 값을 Hash하게 되므로 같은 slot 에 데이터를 추가할 수 있습니다. 문제는 또 이런식으로 같은 slot에 많은 데이터가 들어가면 데이터의 분산이 어려워지게 되므로 주의해야 합니다.

의외로 이런 이유로 Cluster 모드로의 이전에 많은 고생을 하게 됩니다. 그래서 아예 처음에 시작할 때 Redis Cluster 를 한 셋만 구성해서 사용하는 방법도 있습니다. Primary-Replica 한 셋으로만 구성해서 시작하면, 어차피 제약은 기존 Cluster 와 동일하기 때문에, 나중에 서버를 추가할 때 큰 문제 없이 추가할 수 도 있습니다.(그러나 위의 좋은 기능들을 못쓰는건 마찬가지라는…)

[입 개발] Spark SQL Query to Snowflake Query

Data Engineering 을 하다보면, 여러가지 툴을 사용하게 되는데, 그러게 되면서 생기는 필수적인 상황이, 툴에 따른 쿼리의 변환입니다. Hive Query를 사용했다면야, Spark으로 바꿀때는 바로 전환이 되니, 아무런 걱정이 없지만, Redshift라든지, SQL만 해도 Mysql과 다른 RDBMS의 쿼리들이 많이 다릅니다. 특히 요새 가장 인기가 있는 Data Engineer Tool이라면, Databricks나 Snowflake가 있을듯 합니다.(현재는 서로 영역이 좀 다르지만, 곧, 피터지는 싸움이 일어날 것으로 보입니다.)

그런데 Spark SQL을 Snowflake SQL로 바꾸거나, 반대로 바꿀려면 하면 쿼리가 조금 다릅니다. 그런데 이럴 때 리서치를 하다보니 신기한 툴이 있었습니다. phdata(https://www.phdata.io) 를 보면 SQLMorph(https://www.phdata.io/blog/sqlmorph-free-sql-translator-to-snowflake/) 라는 툴을 통해서 이렇게 Query Translation을 시켜준다고 합니다. 바로 사용해보니 뭔가 되는 거 같은… 심지어 Web UI와 Python Tool 까지 제공해 줍니다.(Python Tool은 Snowflake와 몇 개의 DBMS를 사용할 수 없는데, 간단한 잡시도로 사용할 수 있긴 합니다.)

그런데 다 되는지 알고 좋아했는데, 실제로 Spark SQL을 Snowflake로 바꾸면, 제대로 바뀌지 않습니다. 예를 들어 Spark SQL에 Structured Type 을 다 flatten 해주는 explode라는 함수가 있는데, 이런 건 변화를 해주지 않습니다. 특히 snowflake 의 flatten 은 사용하는 문법도 달라서 흑흑흑…

그래서 간단히 바뀌는 문법들만 공유하려고 합니다. 여기서 explode 와 get_json_object 형태가 조금 다르지만 아마도 금방 수정할 수 있을 꺼라고 생각합니다.

Spark SQLSnowflake
ARRAYARRAY_CONSTRUCT
explodeflatten
get_json_objectparse_json
date interval dateadd
regexp_extractregexp_substr
collect_setarray_agg