[입 개발] Redis 7.x 에서의 ShardedPubSub

오래간만에 Redis 7에서 도입될 Sharded PubSub 에 대해서 좀 분석을 해보게 되었습니다. Sharded PubSub 은 기존 Redis Cluster 에서의 PubSub 의 단점을 해결하기 위해서 도입된 기술입니다. 먼저 Sharded Pub/Sub을 이야기하기 전에 Redis Cluster에서의 PubSub을 알아보고 단점을 먼저 확인해봅시다.

기본적으로 Redis Cluster 에서의 Pub/Sub은 모든 노드에 데이터를 뿌리게 됩니다. 그래서 Client #1이 한대의 서버에 publish를 하면 해당 노드는 모든 노드(Primary + Replica)들에 publish를 broadcast 하게 되고, 어떤 노드에 subscribe를 하든 해당 broadcast 되는 메시지를 받을 수 있습니다.

이 broadcast 는 클라이언트가 Primary에 subscribe 를 하든 Replica 에 subscribe 를 하든 상관없습니다. 물론 publish 역시 상관이 없습니다.

그리고 subscribe 하는 클라이언트는 해당 서버에 접속을 하게 되죠. 일단 Redis에서의 pub/sub의 메시지 전달은 at most once 입니다. 즉, 최대 한번, 또는 못받을 수가 있는 구조죠.

그럼 뭐가 성능상 문제냐? 라고 물으신다면, 항상 broadcast 하는 것이 문제입니다. 안그래도 Redis Pub/Sub 자체가 pattern 이라는 걸 지원해서, 모든 채널을 확인해야 하는 이슈가 있기 때문에, 보통 메시지 전달은 채널수 + 채널에 붙은 클라이언트 수 만큼 루프를 돌아야 합니다. 항상 모든 서버에 일단 broadcast 해야 한다는게 그래서 전체 성능에 영향을 미칠 수 가 있는거죠.(Redis는 Single Threaded 라 항상 오래 동작하는 기능은 조심해야 하는…), 클러스터가 커지면 커질수록 문제가 더 발생할 수 있습니다.

그럼 Sharded Pub/Sub은 뭐냐? 말 그대로입니다. Sharded 해서 특정 노드에만 해당 채널의 정보를 publish 하도록 하자가 됩니다. 이 구조는 보통 다음과 같습니다.

이는 기존 Redis Cluster 의 특성을 그대로 이용합니다. 일반적으로 key를 crc16으로 Hash 해서 해당 키가 속한 slot 을 처리하는 서버로만 메시지를 전달하게 되는데 (-MOVED 를 이용합니다.), 이 특성을 이용해서 Pub/Sub도 하나의 Shard 군에서만 처리하자 입니다.(제가 노드가 아니라 Shard 군이라고 한 것에 주의하세요.)

먼저 코드부터 확인해보도록 하겠습니다. pubsubtype 이라는 구조체가 ShardedPubSub을 지원하기 위해서 도입이 되었습니다.

typedef struct pubsubtype {
    int shard;
    dict *(*clientPubSubChannels)(client*);
    int (*subscriptionCount)(client*);
    dict **serverPubSubChannels;
    robj **subscribeMsg;
    robj **unsubscribeMsg;
}pubsubtype;

위의 pubsubtype 으로 두 개의 타입이 생성되어 있음. 기본인 pubSubType 과 ShardedPubSub을 위한 pubSubShardType 은 다음과 같습니다.

pubsubtype pubSubType = {
    .shard = 0,
    .clientPubSubChannels = getClientPubSubChannels,
    .subscriptionCount = clientSubscriptionsCount,
    .serverPubSubChannels = &server.pubsub_channels,
    .subscribeMsg = &shared.subscribebulk,
    .unsubscribeMsg = &shared.unsubscribebulk,
};

/*
 * Pub/Sub type for shard level channels bounded to a slot.
 */
pubsubtype pubSubShardType = {
    .shard = 1,
    .clientPubSubChannels = getClientPubSubShardChannels,
    .subscriptionCount = clientShardSubscriptionsCount,
    .serverPubSubChannels = &server.pubsubshard_channels,
    .subscribeMsg = &shared.ssubscribebulk,
    .unsubscribeMsg = &shared.sunsubscribebulk
};

이를 지원하기 위해서 redisServer 구조체에도 sharded channels을 구독하는 client의 정보를 저장하고 있는 변수가 추가됨.

dict *pubsubshard_channels;  /* Map channels to list of subscribed clients */
int cluster_allow_pubsubshard_when_down; /* Is pubsubshard allowed when the cluster is down, doesn't affect pubsub global. */

client 구조체에도 이를 지원하기 위한 변수가 추가됨

dict *pubsubshard_channels;  /* shard level channels a client is interested in (SSUBSCRIBE) */

여기서 주의해서 봐야 하는 것은 아래처럼 pubsub_channels 과 pubsubshard_channels 가 keylistDictType 으로 구성되어 있다는 것입니다.

server.pubsub_channels = dictCreate(&keylistDictType);
server.pubsub_patterns = dictCreate(&keylistDictType);
server.pubsubshard_channels = dictCreate(&keylistDictType);

getClientPubSubChannels 와 getClientPubSubShardChannels 는 아래와 같이 서로 다른 변수를 전달함.

dict* getClientPubSubChannels(client *c) {
    return c->pubsub_channels;
}

dict* getClientPubSubShardChannels(client *c) {
    return c->pubsubshard_channels;
}

clientSubscriptionsCount 와 clientShardSubscriptionsCount 는 다음과 같다.

/* Return the number of channels + patterns a client is subscribed to. */
int clientSubscriptionsCount(client *c) {
    return dictSize(c->pubsub_channels) + listLength(c->pubsub_patterns);
}

/* Return the number of shard level channels a client is subscribed to. */
int clientShardSubscriptionsCount(client *c) {
    return dictSize(c->pubsubshard_channels);
}

먼저 subscribe 하는 코드를 살펴보면 다음과 같다. sharded pubsub 을 위해서 ssubscribe 라는 명령이 존재한다. 코드를 보면 cluster_enabled 상태에서는 해당 채널이 등록되어 있지 않으면 slotToChannelAdd 명령을 통해서 cluster 의 slot_to_channels 에 추가하게 됩니다.

/* SSUBSCRIBE channel [channel ...] */
void ssubscribeCommand(client *c) {
    if (c->flags & CLIENT_DENY_BLOCKING) {
        /* A client that has CLIENT_DENY_BLOCKING flag on
         * expect a reply per command and so can not execute subscribe. */
        addReplyError(c, "SSUBSCRIBE isn't allowed for a DENY BLOCKING client");
        return;
    }

    for (int j = 1; j < c->argc; j++) {
        /* A channel is only considered to be added, if a
         * subscriber exists for it. And if a subscriber
         * already exists the slotToChannel doesn't needs
         * to be incremented. */
        if (server.cluster_enabled &
            (dictFind(*pubSubShardType.serverPubSubChannels, c->argv[j]) == NULL)) {
            slotToChannelAdd(c->argv[j]->ptr);
        }
        pubsubSubscribeChannel(c, c->argv[j], pubSubShardType);
    }
    c->flags |= CLIENT_PUBSUB;
}

그리고

/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
 * 0 if the client was already subscribed to that channel. */
int pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) {
    dictEntry *de;
    list *clients = NULL;
    int retval = 0;

    /* Add the channel to the client -> channels hash table */
    if (dictAdd(type.clientPubSubChannels(c),channel,NULL) == DICT_OK) {
        retval = 1;
        incrRefCount(channel);
        /* Add the client to the channel -> list of clients hash table */
        de = dictFind(*type.serverPubSubChannels, channel);
        if (de == NULL) {
            clients = listCreate();
            dictAdd(*type.serverPubSubChannels, channel, clients);
            incrRefCount(channel);
        } else {
            clients = dictGetVal(de);
        }
        listAddNodeTail(clients,c);
    }
    /* Notify the client */
    addReplyPubsubSubscribed(c,channel,type);
    return retval;
}

publish 는 spublish 라는 전용 커맨드가 있습니다. publishCommand 는 clusterPropagatePublish 라는 모든 Shard에 데이터를 보내는 것과 달리 spublishCommand 는 clusterPropagatePublishShard 함수는 해당 shard 에만 메세지를 보내게 됩니다.(이 부분이 가장 큰 차이입니다.) – 기존의 Pub/Sub은 모든 노드에 데이터를 보내기 때문에 아무 노드에만 Subscribe 를 해도 메시지를 전달 받을 수 있었지만, Sharded Pub/Sub에서는 이게 불가능합니다.

/* SPUBLISH <channel> <message> */
void spublishCommand(client *c) {
    int receivers = pubsubPublishMessageInternal(c->argv[1], c->argv[2], pubSubShardType);
    if (server.cluster_enabled) {
        clusterPropagatePublishShard(c->argv[1], c->argv[2]);
    } else {
        forceCommandPropagation(c,PROPAGATE_REPL);
    }
    addReplyLongLong(c,receivers);
}

여기서 살짝 publishCommand 함수를 살펴보면(Sharded PubSub이 아닌 경우) 다음과 같습니다. 거의 동일합니다.

/* PUBLISH <channel> <message> */
void publishCommand(client *c) {
    if (server.sentinel_mode) {
        sentinelPublishCommand(c);
        return;
    }

    int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
    if (server.cluster_enabled)
        clusterPropagatePublish(c->argv[1],c->argv[2]);
    else
        forceCommandPropagation(c,PROPAGATE_REPL);
    addReplyLongLong(c,receivers);
}

pubsubPublishMessageInternal 는 세 번째 파라매터로 처음에 소개한 pubsubtype 을 받게 되고 코드는 다음과 같습니다. pattern 을 사용하는 방식은 여기 코드에서도 나오지만 shard type을 지원하지 않습니다. 이는 pattern형식은 전체 노드를 subscribe 하지 않고서는 지원할 수 없기 때문입니다. (패턴의 hash 값이 어떤 slot에 들어갈지를 알 수 없음.)

/*
 * Publish a message to all the subscribers.
 */
int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type) {
    int receivers = 0;
    dictEntry *de;
    dictIterator *di;
    listNode *ln;
    listIter li;

    /* Send to clients listening for that channel */
    de = dictFind(*type.serverPubSubChannels, channel);
    if (de) {
        list *list = dictGetVal(de);
        listNode *ln;
        listIter li;

        listRewind(list,&li);
        while ((ln = listNext(&li)) != NULL) {
            client *c = ln->value;
            addReplyPubsubMessage(c,channel,message);
            updateClientMemUsage(c);
            receivers++;
        }
    }

    if (type.shard) {
        /* Shard pubsub ignores patterns. */
        return receivers;
    }

    /* Send to clients listening to matching channels */
    di = dictGetIterator(server.pubsub_patterns);
    if (di) {
        channel = getDecodedObject(channel);
        while((de = dictNext(di)) != NULL) {
            robj *pattern = dictGetKey(de);
            list *clients = dictGetVal(de);
            if (!stringmatchlen((char*)pattern->ptr,
                                sdslen(pattern->ptr),
                                (char*)channel->ptr,
                                sdslen(channel->ptr),0)) continue;

            listRewind(clients,&li);
            while ((ln = listNext(&li)) != NULL) {
                client *c = listNodeValue(ln);
                addReplyPubsubPatMessage(c,pattern,channel,message);
                updateClientMemUsage(c);
                receivers++;
            }
        }
        decrRefCount(channel);
        dictReleaseIterator(di);
    }

여기서 볼 부분은 실제로 위의 해당 channel 이 속한 cluster 로 clusterSendPublish를 통해서 전달되게 됩니다. 여기서 clusterGetNodesServingMySlots 에서는 해당 slot 을 가진 primary 와 replica를 모두 전달받게 됩니다. 즉, 보통 최소 2개 정도가 정상입니다.

/* -----------------------------------------------------------------------------
 * CLUSTER Pub/Sub shard support
 *
 * Publish this message across the slot (primary/replica).
 * -------------------------------------------------------------------------- */
void clusterPropagatePublishShard(robj *channel, robj *message) {
    list *nodes_for_slot = clusterGetNodesServingMySlots(server.cluster->myself);
    if (listLength(nodes_for_slot) != 0) {
        listIter li;
        listNode *ln;
        listRewind(nodes_for_slot, &li);
        while((ln = listNext(&li))) {
            clusterNode *node = listNodeValue(ln);
            if (node != myself) {
                clusterSendPublish(node->link, channel, message, CLUSTERMSG_TYPE_PUBLISHSHARD);
            }
        }
    }
    listRelease(nodes_for_slot);
}

clusterSendPublish 함수는 다음과 같이 publish 메세지를 해당 slot 을 가진 노드에 전달합니다.

/* Send a PUBLISH message.
 *
 * If link is NULL, then the message is broadcasted to the whole cluster.
 *
 * Sanitizer suppression: In clusterMsgDataPublish, sizeof(bulk_data) is 8.
 * As all the struct is used as a buffer, when more than 8 bytes are copied into
 * the 'bulk_data', sanitizer generates an out-of-bounds error which is a false
 * positive in this context. */
REDIS_NO_SANITIZE("bounds")
void clusterSendPublish(clusterLink *link, robj *channel, robj *message, uint16_t type) {
    unsigned char *payload;
    clusterMsg buf[1];
    clusterMsg *hdr = (clusterMsg*) buf;
    uint32_t totlen;
    uint32_t channel_len, message_len;

    channel = getDecodedObject(channel);
    message = getDecodedObject(message);
    channel_len = sdslen(channel->ptr);
    message_len = sdslen(message->ptr);

    clusterBuildMessageHdr(hdr,type);
    totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
    totlen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len;

    hdr->data.publish.msg.channel_len = htonl(channel_len);
    hdr->data.publish.msg.message_len = htonl(message_len);
    hdr->totlen = htonl(totlen);

    /* Try to use the local buffer if possible */
    if (totlen < sizeof(buf)) {
        payload = (unsigned char*)buf;
    } else {
        payload = zmalloc(totlen);
        memcpy(payload,hdr,sizeof(*hdr));
        hdr = (clusterMsg*) payload;
    }
    memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
    memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
        message->ptr,sdslen(message->ptr));

    if (link)
        clusterSendMessage(link,payload,totlen);
    else
        clusterBroadcastMessage(payload,totlen);

    decrRefCount(channel);
    decrRefCount(message);
    if (payload != (unsigned char*)buf) zfree(payload);
}

그래서 Sharded Pub/Sub 은 ssubscribe, spublish 등의 명령을 가지고 있습니다. 그래서 테스트를 해보면 다음과 같은 결과를 볼 수 있습니다. ssubscribe test 를 하면 7003 으로 -MOVED가 전송됩니다. 7003으로 접속해서 다시 ssubscribe를 하면 성공함을 볼 수 있습니다.

➜  redis git:(unstable) ✗ telnet 0 7005
Trying 0.0.0.0...
Connected to 0.
Escape character is '^]'.
ssubscribe test
-MOVED 6918 127.0.0.1:7003
quit
+OK
Connection closed by foreign host.
➜  redis git:(unstable) ✗ telnet 0 7003
Trying 0.0.0.0...
Connected to 0.
Escape character is '^]'.
ssubscribe test
*3
$10
ssubscribe
$4
test
:1

spublish 도 마찬가지 입니다.

➜  ~  telnet 0 7002
Trying 0.0.0.0...
Connected to 0.
Escape character is '^]'.
spublish test 123
-MOVED 6918 127.0.0.1:7003
quit
+OK
Connection closed by foreign host.
➜  ~ telnet 0 7003
Trying 0.0.0.0...
Connected to 0.
Escape character is '^]'.
spublish test 123
:1

실제로 spublish 를 하면 이제 해당 shard 군만 받게 되는 것을 볼 수 있습니다. 이로 인해서 broadcast 를 줄여서, 실제 부하를 줄이겠다는게 Redis 7.x에서 나오는 Sharded Pub/Sub 입니다. 다만 제가 보기에는 Sharded Pub/Sub도 명확한 한계가 있는 방식입니다. 물론, 이전의 방식이 더 좋다는 것은 아니지만, Redis 가 진짜 대규모의 Pub/Sub을 처리하기 위해서는 좀 더 많은 구조적인 개선이 필요할듯 합니다.