[입 개발] Redis Pub/Sub 시스템은 일반적인 Message Queue와 다르다.

Redis Issue 에 재미있는 질문이 올라왔습니다. https://github.com/antirez/redis/issues/998 Redis Pub/Sub 에 maximum 값을 설정하고 싶다는 것입니다. 아마도 이 분은 Redis Pub/Sub을 일종의 Message Queue로 착각하고 있는 것입니다.

뭐, 처음부터 집고 넘어가자면, 당연히 Pub/Sub System과 Message Queue는 다른 것입니다. 먼저, Pub/Sub System은 현재 채널에 가입한 Subscriber 들 모두에게 특정 이벤트를 전달하는 것입니다. 그리고 메시지 큐는 보통 일반적으로 작업을 큐잉하고 있다가 요청하는 곳에 데이터를 전달하기 위해서 보관하는 시스템입니다. 그런데 일반적으로 Pub/Sub을 제공하는 시스템들이, 일종의 메시지 큐처럼 데이터를 보관하는 역할을 한다는 것이, 단순히 Redis를 사용할 경우, 착각하게 됩니다.

redis에서 어떤 client 도 subscribe 하지 않은 경우 publish 하게 되면 다음과 같은 결과를 얻게 됩니다.

redis 127.0.0.1:6379> publish abc 123
(integer) 0

리턴값은 몇개의 subscriber 에게 메시지를 전달했다는 결과를 나타냅니다. 즉, redis를 가지고 단순히 미리 publish 해두고 나중에 다른 subscriber가 subscribe 할 경우에는 앞에 publish한 데이터는 전부 유실되게 됩니다.

해당 publish 코드를 살펴보더라도, 단순히 channel을 찾아서 연결된 클라이언트 정보만 체크하지 따로 저장하는 부분은 없습니다. 즉 딱 기본적인 pub/sub 에만 집중한 형태입니다.

/* Publish a message */
int pubsubPublishMessage(robj *channel, robj *message) {
    int receivers = 0;
    struct dictEntry *de;
    listNode *ln;
    listIter li;

    /* Send to clients listening for that channel */
    de = dictFind(server.pubsub_channels,channel);
    if (de) {
        list *list = dictGetVal(de);
        listNode *ln;
        listIter li;

        listRewind(list,&li);
        while ((ln = listNext(&li)) != NULL) {
            redisClient *c = ln->value;

            addReply(c,shared.mbulkhdr[3]);
            addReply(c,shared.messagebulk);
            addReplyBulk(c,channel);
            addReplyBulk(c,message);
            receivers++;
        }
    }
    /* Send to clients listening to matching channels */
    if (listLength(server.pubsub_patterns)) {
        listRewind(server.pubsub_patterns,&li);
        channel = getDecodedObject(channel);
        while ((ln = listNext(&li)) != NULL) {
            pubsubPattern *pat = ln->value;

            if (stringmatchlen((char*)pat->pattern->ptr,
                                sdslen(pat->pattern->ptr),
                                (char*)channel->ptr,
                                sdslen(channel->ptr),0)) {
                addReply(pat->client,shared.mbulkhdr[4]);
                addReply(pat->client,shared.pmessagebulk);
                addReplyBulk(pat->client,pat->pattern);
                addReplyBulk(pat->client,channel);
                addReplyBulk(pat->client,message);
                receivers++;
            }
        }
        decrRefCount(channel);
    }
    return receivers;
}
Advertisements