[입 개발] Redis 에서 Redis Cluster 로 갈 때 주의해야할 부분들

Redis 를 사용하다 보면, 결국 데이터의 관리를 위해서 Redis Cluster 의 사용을 고민하게 됩니다. 그러면 Redis 를 사용하다가 Redis Cluster 로 가게 될 때 주의해야 할 부분은 어떤 것인지 얘기해보도록 하겠습니다.

  • Redis Cluster 에서는 DB 0만 사용할 수 있습니다.

Redis를 사용하는 많은 경우 select 명령을 통해서 DB를 사용하는 케이스가 많습니다. 예를 들어 0번은 일반적인 캐싱 관련 1번은 유저 관련, 2번은 Follower 관련 이런식으로 나눠서 데이터를 저장하게 됩니다.

코드를 보시면 다음과 같이 select 명령을 cluster 에서는 사용할 수 가 없습니다.

void selectCommand(client *c) {
    int id;

    if (getIntFromObjectOrReply(c, c->argv[1], &id, NULL) != C_OK)
        return;

    if (server.cluster_enabled && id != 0) {
        addReplyError(c,"SELECT is not allowed in cluster mode");
        return;
    }
    if (selectDb(c,id) == C_ERR) {
        addReplyError(c,"DB index is out of range");
    } else {
        addReply(c,shared.ok);
    }
}

즉 다른 DB를 선택해서 저장할 수 없다는 것입니다.

이 문제를 해결하기 위해서는 전부 하나의 DB로 저장하고 적절히 prefix를 통해서 구분해야 합니다. 다만 여러 DB로 나눌 경우에는 하나의 DB내의 Key 전체를 스캔하는 시간이 줄어들게 되지만, 결국 전체를 scan 해야 하는 문제가 발생하긴 합니다.(적절히 Scan 명령을 잘 사용할 수 밖에 없습니다.)

  • Redis Cluster 에서는 SORT 명령을 사용할 수 없습니다.

정확하게 말하면 SORT 명령을 무조건 사용할 수 없다는 아닙니다. 그래서 더 문제가 됩니다. cluster 모드에서는 sort 의 BY 와 GET을 쓸 수 가 없습니다. 그런데 해당 기능들이 Redis를 활용하는 곳에서는 많이 활용하는 기능입니다. sortCommandGeneric 은 꽤 긴 함수이지만 중간 부분만 확인해 보시면 금방 막혀있는 부분을 확인하실 수 있습니다.

void sortCommandGeneric(client *c, int readonly) {
    list *operations;
    unsigned int outputlen = 0;
    int desc = 0, alpha = 0;
    long limit_start = 0, limit_count = -1, start, end;
    int j, dontsort = 0, vectorlen;
    int getop = 0; /* GET operation counter */
    int int_conversion_error = 0;
    int syntax_error = 0;
    robj *sortval, *sortby = NULL, *storekey = NULL;
    redisSortObject *vector; /* Resulting vector to sort */

    /* Create a list of operations to perform for every sorted element.
     * Operations can be GET */
    operations = listCreate();
    listSetFreeMethod(operations,zfree);
    j = 2; /* options start at argv[2] */

    /* The SORT command has an SQL-alike syntax, parse it */
    while(j < c->argc) {
        int leftargs = c->argc-j-1;
        if (!strcasecmp(c->argv[j]->ptr,"asc")) {
            desc = 0;
        } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
            desc = 1;
        } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
            alpha = 1;
        } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
            if ((getLongFromObjectOrReply(c, c->argv[j+1], &limit_start, NULL)
                 != C_OK) ||
                (getLongFromObjectOrReply(c, c->argv[j+2], &limit_count, NULL)
                 != C_OK))
            {
                syntax_error++;
                break;
            }
            j+=2;
        } else if (readonly == 0 && !strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
            storekey = c->argv[j+1];
            j++;
        } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
            sortby = c->argv[j+1];
            /* If the BY pattern does not contain '*', i.e. it is constant,
             * we don't need to sort nor to lookup the weight keys. */
            if (strchr(c->argv[j+1]->ptr,'*') == NULL) {
                dontsort = 1;
            } else {
                /* If BY is specified with a real patter, we can't accept
                 * it in cluster mode. */
                if (server.cluster_enabled) {
                    addReplyError(c,"BY option of SORT denied in Cluster mode.");
                    syntax_error++;
                    break;
                }
            }
            j++;
        } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
            if (server.cluster_enabled) {
                addReplyError(c,"GET option of SORT denied in Cluster mode.");
                syntax_error++;
                break;
            }
            listAddNodeTail(operations,createSortOperation(
                SORT_OP_GET,c->argv[j+1]));
            getop++;
            j++;
        } else {
            addReplyErrorObject(c,shared.syntaxerr);
            syntax_error++;
            break;
        }
        j++;
    }

    /* Handle syntax errors set during options parsing. */
    if (syntax_error) {
        listRelease(operations);
        return;
    }

    /* Lookup the key to sort. It must be of the right types */
    if (!storekey)
        sortval = lookupKeyRead(c->db,c->argv[1]);
    else
        sortval = lookupKeyWrite(c->db,c->argv[1]);
    if (sortval && sortval->type != OBJ_SET &&
                   sortval->type != OBJ_LIST &&
                   sortval->type != OBJ_ZSET)
    {
        listRelease(operations);
        addReplyErrorObject(c,shared.wrongtypeerr);
        return;
    }

    /* Now we need to protect sortval incrementing its count, in the future
     * SORT may have options able to overwrite/delete keys during the sorting
     * and the sorted key itself may get destroyed */
    if (sortval)
        incrRefCount(sortval);
    else
        sortval = createQuicklistObject();


    /* When sorting a set with no sort specified, we must sort the output
     * so the result is consistent across scripting and replication.
     *
     * The other types (list, sorted set) will retain their native order
     * even if no sort order is requested, so they remain stable across
     * scripting and replication. */
    if (dontsort &&
        sortval->type == OBJ_SET &&
        (storekey || c->flags & CLIENT_LUA))
    {
        /* Force ALPHA sorting */
        dontsort = 0;
        alpha = 1;
        sortby = NULL;
    }
    /* Destructively convert encoded sorted sets for SORT. */
    if (sortval->type == OBJ_ZSET)
        zsetConvert(sortval, OBJ_ENCODING_SKIPLIST);

    /* Obtain the length of the object to sort. */
    switch(sortval->type) {
    case OBJ_LIST: vectorlen = listTypeLength(sortval); break;
    case OBJ_SET: vectorlen =  setTypeSize(sortval); break;
    case OBJ_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break;
    default: vectorlen = 0; serverPanic("Bad SORT type"); /* Avoid GCC warning */
    }

    /* Perform LIMIT start,count sanity checking. */
    start = (limit_start < 0) ? 0 : limit_start;
    end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
    if (start >= vectorlen) {
        start = vectorlen-1;
        end = vectorlen-2;
    }
    if (end >= vectorlen) end = vectorlen-1;

    /* Whenever possible, we load elements into the output array in a more
     * direct way. This is possible if:
     *
     * 1) The object to sort is a sorted set or a list (internally sorted).
     * 2) There is nothing to sort as dontsort is true (BY <constant string>).
     *
     * In this special case, if we have a LIMIT option that actually reduces
     * the number of elements to fetch, we also optimize to just load the
     * range we are interested in and allocating a vector that is big enough
     * for the selected range length. */
    if ((sortval->type == OBJ_ZSET || sortval->type == OBJ_LIST) &&
        dontsort &&
        (start != 0 || end != vectorlen-1))
    {
        vectorlen = end-start+1;
    }

    /* Load the sorting vector with all the objects to sort */
    vector = zmalloc(sizeof(redisSortObject)*vectorlen);
    j = 0;

    if (sortval->type == OBJ_LIST && dontsort) {
        /* Special handling for a list, if 'dontsort' is true.
         * This makes sure we return elements in the list original
         * ordering, accordingly to DESC / ASC options.
         *
         * Note that in this case we also handle LIMIT here in a direct
         * way, just getting the required range, as an optimization. */
        if (end >= start) {
            listTypeIterator *li;
            listTypeEntry entry;
            li = listTypeInitIterator(sortval,
                    desc ? (long)(listTypeLength(sortval) - start - 1) : start,
                    desc ? LIST_HEAD : LIST_TAIL);

            while(j < vectorlen && listTypeNext(li,&entry)) {
                vector[j].obj = listTypeGet(&entry);
                vector[j].u.score = 0;
                vector[j].u.cmpobj = NULL;
                j++;
            }
            listTypeReleaseIterator(li);
            /* Fix start/end: output code is not aware of this optimization. */
            end -= start;
            start = 0;
        }
    } else if (sortval->type == OBJ_LIST) {
        listTypeIterator *li = listTypeInitIterator(sortval,0,LIST_TAIL);
        listTypeEntry entry;
        while(listTypeNext(li,&entry)) {
            vector[j].obj = listTypeGet(&entry);
            vector[j].u.score = 0;
            vector[j].u.cmpobj = NULL;
            j++;
        }
        listTypeReleaseIterator(li);
    } else if (sortval->type == OBJ_SET) {
        setTypeIterator *si = setTypeInitIterator(sortval);
        sds sdsele;
        while((sdsele = setTypeNextObject(si)) != NULL) {
            vector[j].obj = createObject(OBJ_STRING,sdsele);
            vector[j].u.score = 0;
            vector[j].u.cmpobj = NULL;
            j++;
        }
        setTypeReleaseIterator(si);
    } else if (sortval->type == OBJ_ZSET && dontsort) {
        /* Special handling for a sorted set, if 'dontsort' is true.
         * This makes sure we return elements in the sorted set original
         * ordering, accordingly to DESC / ASC options.
         *
         * Note that in this case we also handle LIMIT here in a direct
         * way, just getting the required range, as an optimization. */

        zset *zs = sortval->ptr;
        zskiplist *zsl = zs->zsl;
        zskiplistNode *ln;
        sds sdsele;
        int rangelen = vectorlen;

        /* Check if starting point is trivial, before doing log(N) lookup. */
        if (desc) {
            long zsetlen = dictSize(((zset*)sortval->ptr)->dict);

            ln = zsl->tail;
            if (start > 0)
                ln = zslGetElementByRank(zsl,zsetlen-start);
        } else {
            ln = zsl->header->level[0].forward;
            if (start > 0)
                ln = zslGetElementByRank(zsl,start+1);
        }

        while(rangelen--) {
            serverAssertWithInfo(c,sortval,ln != NULL);
            sdsele = ln->ele;
            vector[j].obj = createStringObject(sdsele,sdslen(sdsele));
            vector[j].u.score = 0;
            vector[j].u.cmpobj = NULL;
            j++;
            ln = desc ? ln->backward : ln->level[0].forward;
        }
        /* Fix start/end: output code is not aware of this optimization. */
        end -= start;
        start = 0;
    } else if (sortval->type == OBJ_ZSET) {
        dict *set = ((zset*)sortval->ptr)->dict;
        dictIterator *di;
        dictEntry *setele;
        sds sdsele;
        di = dictGetIterator(set);
        while((setele = dictNext(di)) != NULL) {
            sdsele =  dictGetKey(setele);
            vector[j].obj = createStringObject(sdsele,sdslen(sdsele));
            vector[j].u.score = 0;
            vector[j].u.cmpobj = NULL;
            j++;
        }
        dictReleaseIterator(di);
    } else {
        serverPanic("Unknown type");
    }
    serverAssertWithInfo(c,sortval,j == vectorlen);

    /* Now it's time to load the right scores in the sorting vector */
    if (!dontsort) {
        for (j = 0; j < vectorlen; j++) {
            robj *byval;
            if (sortby) {
                /* lookup value to sort by */
                byval = lookupKeyByPattern(c->db,sortby,vector[j].obj,storekey!=NULL);
                if (!byval) continue;
            } else {
                /* use object itself to sort by */
                byval = vector[j].obj;
            }

            if (alpha) {
                if (sortby) vector[j].u.cmpobj = getDecodedObject(byval);
            } else {
                if (sdsEncodedObject(byval)) {
                    char *eptr;

                    vector[j].u.score = strtod(byval->ptr,&eptr);
                    if (eptr[0] != '\0' || errno == ERANGE ||
                        isnan(vector[j].u.score))
                    {
                        int_conversion_error = 1;
                    }
                } else if (byval->encoding == OBJ_ENCODING_INT) {
                    /* Don't need to decode the object if it's
                     * integer-encoded (the only encoding supported) so
                     * far. We can just cast it */
                    vector[j].u.score = (long)byval->ptr;
                } else {
                    serverAssertWithInfo(c,sortval,1 != 1);
                }
            }

            /* when the object was retrieved using lookupKeyByPattern,
             * its refcount needs to be decreased. */
            if (sortby) {
                decrRefCount(byval);
            }
        }


        server.sort_desc = desc;
        server.sort_alpha = alpha;
        server.sort_bypattern = sortby ? 1 : 0;
        server.sort_store = storekey ? 1 : 0;
        if (sortby && (start != 0 || end != vectorlen-1))
            pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
        else
            qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
    }

    /* Send command output to the output buffer, performing the specified
     * GET/DEL/INCR/DECR operations if any. */
    outputlen = getop ? getop*(end-start+1) : end-start+1;
    if (int_conversion_error) {
        addReplyError(c,"One or more scores can't be converted into double");
    } else if (storekey == NULL) {
        /* STORE option not specified, sent the sorting result to client */
        addReplyArrayLen(c,outputlen);
        for (j = start; j <= end; j++) {
            listNode *ln;
            listIter li;

            if (!getop) addReplyBulk(c,vector[j].obj);
            listRewind(operations,&li);
            while((ln = listNext(&li))) {
                redisSortOperation *sop = ln->value;
                robj *val = lookupKeyByPattern(c->db,sop->pattern,
                    vector[j].obj,storekey!=NULL);

                if (sop->type == SORT_OP_GET) {
                    if (!val) {
                        addReplyNull(c);
                    } else {
                        addReplyBulk(c,val);
                        decrRefCount(val);
                    }
                } else {
                    /* Always fails */
                    serverAssertWithInfo(c,sortval,sop->type == SORT_OP_GET);
                }
            }
        }
    } else {
        robj *sobj = createQuicklistObject();

        /* STORE option specified, set the sorting result as a List object */
        for (j = start; j <= end; j++) {
            listNode *ln;
            listIter li;

            if (!getop) {
                listTypePush(sobj,vector[j].obj,LIST_TAIL);
            } else {
                listRewind(operations,&li);
                while((ln = listNext(&li))) {
                    redisSortOperation *sop = ln->value;
                    robj *val = lookupKeyByPattern(c->db,sop->pattern,
                        vector[j].obj,storekey!=NULL);

                    if (sop->type == SORT_OP_GET) {
                        if (!val) val = createStringObject("",0);

                        /* listTypePush does an incrRefCount, so we should take care
                         * care of the incremented refcount caused by either
                         * lookupKeyByPattern or createStringObject("",0) */
                        listTypePush(sobj,val,LIST_TAIL);
                        decrRefCount(val);
                    } else {
                        /* Always fails */
                        serverAssertWithInfo(c,sortval,sop->type == SORT_OP_GET);
                    }
                }
            }
        }
        if (outputlen) {
            setKey(c,c->db,storekey,sobj);
            notifyKeyspaceEvent(NOTIFY_LIST,"sortstore",storekey,
                                c->db->id);
            server.dirty += outputlen;
        } else if (dbDelete(c->db,storekey)) {
            signalModifiedKey(c,c->db,storekey);
            notifyKeyspaceEvent(NOTIFY_GENERIC,"del",storekey,c->db->id);
            server.dirty++;
        }
        decrRefCount(sobj);
        addReplyLongLong(c,outputlen);
    }
    /* Cleanup */
    for (j = 0; j < vectorlen; j++)
        decrRefCount(vector[j].obj);

    decrRefCount(sortval);
    listRelease(operations);
    for (j = 0; j < vectorlen; j++) {
        if (alpha && vector[j].u.cmpobj)
            decrRefCount(vector[j].u.cmpobj);
    }
    zfree(vector);
}

BY, GET을 SORT에서 쓸 수 없는 것은, BY, GET이 다른 KEY를 가져오는 기능을 사용하기 때문입니다. Cluster 에서는 KEY가 여러 서버에 분산되어 있는데, Redis Cluster에서 서로 KEY를 필요에 따라서 가져오는 것은 허용하지 않는 정책을 쓰고 있습니다. 즉 해당 이슈를 해결하기 위해서는 사용자가 직접 해당 KEY들을 가져와서 필요한 연산을 해줘야 합니다.

  • Multi Key 를 사용하는 명령들은 대부분 같은 slot 에 있을 때만 동작합니다.

Cluster 에서 데이터를 각 노드로 분배하는 것은 hash(key) % 16384 이런 식으로 슬롯이 구분되고, 해당 슬롯이 각 노드에 속하게 됩니다. 그래서 기본적으로 Multi Key에서는 각 Key가 다른 slot 에 속해서, 다른 서버에 속할 수 있습니다.

    /* If cluster is enabled perform the cluster redirection here.
     * However we don't perform the redirection if:
     * 1) The sender of this command is our master.
     * 2) The command has no key arguments. */
    if (server.cluster_enabled &&
        !(c->flags & CLIENT_MASTER) &&
        !(c->flags & CLIENT_LUA &&
          server.lua_caller->flags & CLIENT_MASTER) &&
        !(!c->cmd->movablekeys && c->cmd->key_specs_num == 0 &&
          c->cmd->proc != execCommand))
    {
        int hashslot;
        int error_code;
        clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
                                        &hashslot,&error_code);
        if (n == NULL || n != server.cluster->myself) {
            if (c->cmd->proc == execCommand) {
                discardTransaction(c);
            } else {
                flagTransaction(c);
            }
            clusterRedirectClient(c,n,hashslot,error_code);
            c->cmd->rejected_calls++;
            return C_OK;
        }
    }

getNodeByQuery 에서 실제로 error code를 채우고 거기에 따라서 clusterRedirectClient 에서 에러 값을 만들어주게 됩니다.

/* Send the client the right redirection code, according to error_code
 * that should be set to one of CLUSTER_REDIR_* macros.
 *
 * If CLUSTER_REDIR_ASK or CLUSTER_REDIR_MOVED error codes
 * are used, then the node 'n' should not be NULL, but should be the
 * node we want to mention in the redirection. Moreover hashslot should
 * be set to the hash slot that caused the redirection. */
void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code) {
    if (error_code == CLUSTER_REDIR_CROSS_SLOT) {
        addReplyError(c,"-CROSSSLOT Keys in request don't hash to the same slot");
    } else if (error_code == CLUSTER_REDIR_UNSTABLE) {
        /* The request spawns multiple keys in the same slot,
         * but the slot is not "stable" currently as there is
         * a migration or import in progress. */
        addReplyError(c,"-TRYAGAIN Multiple keys request during rehashing of slot");
    } else if (error_code == CLUSTER_REDIR_DOWN_STATE) {
        addReplyError(c,"-CLUSTERDOWN The cluster is down");
    } else if (error_code == CLUSTER_REDIR_DOWN_RO_STATE) {
        addReplyError(c,"-CLUSTERDOWN The cluster is down and only accepts read commands");
    } else if (error_code == CLUSTER_REDIR_DOWN_UNBOUND) {
        addReplyError(c,"-CLUSTERDOWN Hash slot not served");
    } else if (error_code == CLUSTER_REDIR_MOVED ||
               error_code == CLUSTER_REDIR_ASK)
    {
        /* Redirect to IP:port. Include plaintext port if cluster is TLS but
         * client is non-TLS. */
        int use_pport = (server.tls_cluster &&
                         c->conn && connGetType(c->conn) != CONN_TYPE_TLS);
        int port = use_pport && n->pport ? n->pport : n->port;
        addReplyErrorSds(c,sdscatprintf(sdsempty(),
            "-%s %d %s:%d",
            (error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
            hashslot, n->ip, port));
    } else {
        serverPanic("getNodeByQuery() unknown error.");
    }
}

보통 Multi Key는 같은 slot 에 있는 key 들만 허용이 되므로, 이를 위해서는 hash tag를 잘 이용해서 같은 slot에 배치하는 것이 중요합니다. 키 이름에 {…} 이런식으로 붙여주면 해당 값을 Hash하게 되므로 같은 slot 에 데이터를 추가할 수 있습니다. 문제는 또 이런식으로 같은 slot에 많은 데이터가 들어가면 데이터의 분산이 어려워지게 되므로 주의해야 합니다.

의외로 이런 이유로 Cluster 모드로의 이전에 많은 고생을 하게 됩니다. 그래서 아예 처음에 시작할 때 Redis Cluster 를 한 셋만 구성해서 사용하는 방법도 있습니다. Primary-Replica 한 셋으로만 구성해서 시작하면, 어차피 제약은 기존 Cluster 와 동일하기 때문에, 나중에 서버를 추가할 때 큰 문제 없이 추가할 수 도 있습니다.(그러나 위의 좋은 기능들을 못쓰는건 마찬가지라는…)

[입 개발] Spark SQL Query to Snowflake Query

Data Engineering 을 하다보면, 여러가지 툴을 사용하게 되는데, 그러게 되면서 생기는 필수적인 상황이, 툴에 따른 쿼리의 변환입니다. Hive Query를 사용했다면야, Spark으로 바꿀때는 바로 전환이 되니, 아무런 걱정이 없지만, Redshift라든지, SQL만 해도 Mysql과 다른 RDBMS의 쿼리들이 많이 다릅니다. 특히 요새 가장 인기가 있는 Data Engineer Tool이라면, Databricks나 Snowflake가 있을듯 합니다.(현재는 서로 영역이 좀 다르지만, 곧, 피터지는 싸움이 일어날 것으로 보입니다.)

그런데 Spark SQL을 Snowflake SQL로 바꾸거나, 반대로 바꿀려면 하면 쿼리가 조금 다릅니다. 그런데 이럴 때 리서치를 하다보니 신기한 툴이 있었습니다. phdata(https://www.phdata.io) 를 보면 SQLMorph(https://www.phdata.io/blog/sqlmorph-free-sql-translator-to-snowflake/) 라는 툴을 통해서 이렇게 Query Translation을 시켜준다고 합니다. 바로 사용해보니 뭔가 되는 거 같은… 심지어 Web UI와 Python Tool 까지 제공해 줍니다.(Python Tool은 Snowflake와 몇 개의 DBMS를 사용할 수 없는데, 간단한 잡시도로 사용할 수 있긴 합니다.)

그런데 다 되는지 알고 좋아했는데, 실제로 Spark SQL을 Snowflake로 바꾸면, 제대로 바뀌지 않습니다. 예를 들어 Spark SQL에 Structured Type 을 다 flatten 해주는 explode라는 함수가 있는데, 이런 건 변화를 해주지 않습니다. 특히 snowflake 의 flatten 은 사용하는 문법도 달라서 흑흑흑…

그래서 간단히 바뀌는 문법들만 공유하려고 합니다. 여기서 explode 와 get_json_object 형태가 조금 다르지만 아마도 금방 수정할 수 있을 꺼라고 생각합니다.

Spark SQLSnowflake
ARRAYARRAY_CONSTRUCT
explodeflatten
get_json_objectparse_json
date interval dateadd
regexp_extractregexp_substr
collect_setarray_agg

[입 개발] Spark 에서 Database 빨리 덤프하는 법(Parallelism)

전통적으로 Hadoop 기반에서 Database를 덤프할 때는 sqoop을 많이 사용합니다. 그런데… Spark에서는 그냥 바로 database에서 jdbc를 통해서 데이터를 읽을 수도 있습니다.(의외로 이걸 모르는 경우가 많습니다.)

그래서 우리는 다음과 같이 Database에서 데이터를 덤프합니다.

    val df = spark.read
      .format("jdbc")
      .option("url", mysqlUrl)
      .option("driver", "com.mysql.cj.jdbc.Driver")
      .option("user", mysqlUsername)
      .option("password", mysqlPassword)
      .option("useSSL", "false")
      .option("dbtable", table)
      .load()

그런데 위의 코드는 데이터가 적을 때는 잘 동작하지만, 데이터 량이 많아질 수록 점점 느려집니다. 데이터량이 몇백만 몇천만 밖에 안되는 거 같은데, 위의 코드는 몇십분씩 동작합니다.(사실… 돌다가 죽어서 다시 재실행 하는 시간이 더 큰…) 그런데도 잘 동작하지 않습니다.

이건 결국 여러개의 Spark Executor에서 돌지 못하고 한넘이 너무 많은 데이터를 다루다가 결국 메모리가 터져버리는 케이스가 대부분입니다. 그럼 어떻게 해야 할까요? Spark 은 원래 여러 Executor로 실행하기 위해서 쓰는 거 아닌가요 라고 생각할 수 있습니다. 넵넵 맞습니다. 맞구요.(퍽퍽)

그래서 Spark JDBC에는 옵션으로 numPatitions 이라는 것을 제공합니다.

    val df = spark.read
      .format("jdbc")
      .option("url", mysqlUrl)
      .option("driver", "com.mysql.cj.jdbc.Driver")
      .option("user", mysqlUsername)
      .option("password", mysqlPassword)
      .option("useSSL", "false")
      .option("dbtable", table)
      .option("numPartitions", numPartitions)
      .load()

오옷 numPartitions만 주면 내부적으로 나눠져서 제대로 동작할 듯 합니다. 그런데 실제로 돌려보면 제대로 동작하지 않습니다. 흑… 원칙적으로 numPatitions가 우리의 해결책이 맞습니다. 다만, 이 옵션을 위해서 추가로 설정해줘야 할 값들이 있습니다. Database를 기준으로 하기 때문에, 어떤 column을 기준으로 분할 할 것인지, 그리고 그 구간을 어떻게 할 것인지를 정해줘야 합니다.

그래서 partitionColumn, lowerBound, upperBound 라는 값이 있습니다. 그러면 저 사이의 값들을 numPartitions 만큼 나눠져서 가져오게 됩니다. 다만 여기서 upperBound 값이 너무 크면 실제 데이터들이 적게 나눠질 수 있습니다. 그런데 또 upperBound 값을 너무 작게 잡으면, 데이터를 다 덤프하지 못할 수 있습니다. 그래서 저는 다음과 같이 구하고 있습니다.

max 개수를 가져오고, 그걸 upperBound 로 설정하고 있습니다.

    val partitionSize = 2000000

    val sizeDF = spark.read
      .format("jdbc")
      .option("url", mysqlUrl)
      .option("driver", "com.mysql.cj.jdbc.Driver")
      .option("user", mysqlUsername)
      .option("password", mysqlPassword)
      .option("useSSL", "false")
      .option("query", s"select max($partitionKey) from $table")
      .load()

    val maxId = sizeDF.collect()(0)(0).toString.toLong
    val numPartitions = (maxId / partitionSize) + 1

    val df = spark.read
      .format("jdbc")
      .option("url", mysqlUrl)
      .option("driver", "com.mysql.cj.jdbc.Driver")
      .option("user", mysqlUsername)
      .option("password", mysqlPassword)
      .option("useSSL", "false")
      .option("dbtable", table)
      .option("partitionColumn", partitionKey)
      .option("numPartitions", numPartitions)
      .option("lowerBound", 1)
      .option("upperBound", maxId)
      .load()

그렇지만 데이터가 너무 크면 partition 개수가 또 너무 많아질 수 있으므로 여기에 대한 적절한 조절이 필요합니다.

[입 컨설팅] 오일나우에서의 Redis 사용 방법 개선하기 – PART #1

다음 블로그는 Open Up의 도움을 받아서 작성되었습니다.

오일나우는 운전자에게 필요한 정보를 제공하는 서비스를 제공하는 스타트업입니다. 오일나우는 다음과 같은 정보들을 제공하고 있습니다.

오일나우Oilnow
운전자에게 딱 맞는 유용한 정보를 추천해드립니다.
– 알고리즘 적용을 통해 내 위치 인근 가장 가까우면서도 저렴한 주유소를 자동 추천해드립니다.
– 주유 패턴 분석을 통해 유류비 절감을 도와 드립니다.
– 주유비 할인 카드 및 자동차 보험 정보를 비롯해 다양한 금융 상품 정보들을 추천해드립니다.

오일나우의 자세한 정보는 오일나우 팀 블로그에서 확인하실 수 있습니다.

Open Up의 지원을 받아서 해당 오일나우라는 회사에서 Redis 를 쓰는 데 있어서 이슈가 될 만한 부분을 확인하고 개선 방향을 조언하는 시간을 가졌습니다.

일단 오일나우에서는 Redis를 주유소 정보를 확인하기 위해서 Geo Query를 사용하는 형태로 많이 사용하고 있었습니다. Redis 는 Geo Query를 지원하는데 반경 N 킬로미터 이 내의 정보등을 쉽게 구현할 수 있습니다.

Redis 에서 지원하는 Geo Query 관련 Command 는 다음과 같습니다. 보통 GEOADD로 데이터를 추가하고 GEORADIUS 함수를 통해서 관련 정보를 찾을 수 가 있습니다.

예제는 다음과 같습니다.

redis> GEOADD Sicily 13.361389 38.115556 "Palermo" 15.087269 37.502669 "Catania"
(integer) 2
redis> GEORADIUS Sicily 15 37 200 km WITHDIST
1) 1) "Palermo"
   2) "190.4424"
2) 1) "Catania"
   2) "56.4413"
redis> GEORADIUS Sicily 15 37 200 km WITHCOORD
1) 1) "Palermo"
   2) 1) "13.36138933897018433"
      2) "38.11555639549629859"
2) 1) "Catania"
   2) 1) "15.08726745843887329"
      2) "37.50266842333162032"
redis> GEORADIUS Sicily 15 37 200 km WITHDIST WITHCOORD
1) 1) "Palermo"
   2) "190.4424"
   3) 1) "13.36138933897018433"
      2) "38.11555639549629859"
2) 1) "Catania"
   2) "56.4413"
   3) 1) "15.08726745843887329"
      2) "37.50266842333162032"
redis> 

그런데 Redis 6.2.0 부터는 GEORADIUS 관련 함수들이 Deprecated 될지도 모르기 때문에 GEOSEARCH나 GEOSEARCHSTORE 함수로 변경하는 것을 권장합니다.

redis> GEOADD Sicily 13.361389 38.115556 "Palermo" 15.087269 37.502669 "Catania"
(integer) 2
redis> GEOADD Sicily 12.758489 38.788135 "edge1" 17.241510 38.788135 "edge2"
(integer) 2
redis> GEOSEARCH Sicily FROMLONLAT 15 37 BYRADIUS 200 km ASC
1) "Catania"
2) "Palermo"
redis> GEOSEARCH Sicily FROMLONLAT 15 37 BYBOX 400 400 km ASC WITHCOORD WITHDIST
1) 1) "Catania"
   2) "56.4413"
   3) 1) "15.08726745843887329"
      2) "37.50266842333162032"
2) 1) "Palermo"
   2) "190.4424"
   3) 1) "13.36138933897018433"
      2) "38.11555639549629859"
3) 1) "edge2"
   2) "279.7403"
   3) 1) "17.24151045083999634"
      2) "38.78813451624225195"
4) 1) "edge1"
   2) "279.7405"
   3) 1) "12.7584877610206604"
      2) "38.78813451624225195"
redis> 

오일 나우에서 크게 문제되는 부분은 없었지만, 차후에 문제가 될만한 부분이 KEYS 명령의 사용 개수가 지속적으로 증가하는 것을 발견했습니다. Redis 모니터링을 위해서 KEYS 명령이 사용되고 있었고, 현재는 그렇게까지 많은 정보가 들어가 있지 않았기 때문에, 많은 시간이 걸리지 않았지만, KEYS 명령의 usec_per_call 값이 17811 으로 굉장히 높은 수치였습니다. Redis 에서의 명령의 시간의 측정은 마이크로초 단위인데 즉 1/100000 초입니다. 즉 자주 사용하는 GEORADIUS가 206.43 마이크로 초 인것에 비해서 그거보다 거의 80~90배 정도 느린 속도입니다. 마이크로 초 단위이므로 GEORADIUS는 현재 1초에 500개 정도를 처리할 수 있지만, KEYS는 1초에 5개 밖에 사용할 수 업습니다. 즉, 전체적으로 RADIUS 성능을 떨어뜨리고 있었습니다. 그래서 이를 SCAN으로 변경하도록 가이드를 해서 해당 이슈를 해결 하도록 하였습니다.

그 외에 오일 나우에서 관심을 가지던 부분은 보안 관련 부분이었는데, AWS 의 ElastiCache를 사용하고 있기 때문에, 기본적으로 해당 서비스가 외부에 노출되지 않아서 큰 문제는 없었습니다. 일부의 사용자들이 Redis를 EC2에 바로 돌리면서 해당 포트를 외부에 노출하는 경우가 많은데, Redis의 경우 6.x 부터는 ACL을 지원하기는 하지만, Store 형태의 모든 서비스는 직접적으로 외부에 노출되는 것은 굉장히 보안에 취약합니다. 오일나우에서는 ElastiCache를 사용함으로써 처음부터 이런 보안 이슈를 피하고 있었습니다.

1편은 여기까지 적고 다음 내용들은 2편에서 추가하도록 하겠습니다.

[입 개발] bits로 시간을 얼마나 표현할 수 있을까?

갑자기 bit를 얼마나 할당하는가에 따라서 얼만큼의 시간을 표현할 수 있을까라는 생각이 들었습니다. 흔히 unix timestamp 라고 해서 1970년 1월 1일 0시 부터 현재까지를 Second 단위로 표현한 것입니다. unix timestamp 는 4 bytes 로 표현되고, 이를 2038년 1월 19일 03:14:07 에 overflow 가 난다고 합니다. 그럼 이를 어떻게 계산할 수 있을까요? 사실 원리는 간단합니다. 일단 다음과 같이 시간을 정리해 봅시다. (윤달은 일단 빼고 계산합니다.)

1 minute60 seconds
1 hour60 * 60 = 3600 seconds
1 day60 * 60 * 24 = 86400 seconds
1 weak60 * 60 * 24 * 7= 604800 seconds
1 year60 * 60 * 24 * 365 = 31536000 seconds

이제 이런 시간 테이블이 있으니 생각해보면 4 bytes는 32 bits 이므로 2^32 승입니다. 이것을 간단히 구하면 다음과 같습니다.

>>> pow(2, 32) / 60 / 60 / 24
49710.26962962963
>>> pow(2, 32) / 60 / 60 / 24 / 365
136.19251953323186

4 bytes 로는 Day 로는 49710일, Year 로는 136년을 커버할 수 있습니다. 이러면 1970 + 136 해서 2106년이 나옵니다. 뭔가 이상하지 않나요? 위에서 제가 2038년에 1월 19일 이라고 했는데, 왜 2106년이 나올까요? 사실 이건 간단합니다. unix timestamp 는 time_t(부호가 있는 4 bytes)를 이용하기 때문입니다. 그래서 실제로 32 bits가 아니라 31 bits만 사용하게 됩니다. 31bit 로 계산하면 다음과 같이 68년 1970 + 68년 해서 2038년에 만료가 되는 것입니다.

>>> pow(2, 31) / 60 / 60 / 24
24855.134814814814
>>> pow(2, 31) / 60 / 60 / 24 / 365
68.09625976661593

그렇다면 이제 이걸 위해서 시간 값을 사용하면 bit를 얼마나 할당해야 할까요? unix timestamp 는 seconds 단위지만, 현재는 milliseconds 단위는 최소 커버해야 합니다. 여기서 공식은 다음과 같습니다.

>>> pow(2, N) / 1000 / 60 / 60 / 24 / 365

계산해보면 다음과 같습니다.

BitsYear
37 bits4.35
38 bits8.71
39 bits17.43
40 bits34.86
41 bits69.73
42 bits139.46
43 bits278.92

보통 자신만의 unique key를 만들기 위해서 시간 값을 집어넣는데, 현재 시간을 epoch을 잡는다면 위와 같이 사용이 가능합니다. 서비스가 얼마나 갈지는 모르지만, 최소 39 bit 는 되어야 key 사용이 가능할듯 하네요. 안전하게 할려면 40 bit 이상은 가야할듯 합니다. twitter 의 snowflake 는 42 bit를 사용하고 있습니다.

[입개발] Spark Kafka Streaming 에서의 BackPressure 에 대한 아주 간단한 정리.

Kafka Streaming 처리를 할 때, 자주 문제가 되는 부분은 보통 다음과 같은 것들이 있습니다.

  • 잘못된 입력의 데이터
  • 갑자기 들어온 많은 양의 데이터

잘못된 입력의 데이터는 각자 잘 알아서 처리를 하면되지만? 갑자기 들어오는 많은 양의 데이터는 어떻게 해야할까요? 간단한 방법은 처리하는 서버 대수를 늘리는 것입니다. 그런데 평소에는 2대면 충분한데, 피크때는 10대 정도가 필요한 상황이면 어떻게 해야할까요? 방법은 간단합니다. 그냥 항상 10대를 돌리면 됩니다. 돈만 많다면…(그러나 저는 거지입니다. T.T)

두번째 방법은 Spark의 Auto Scaling 을 이용하는 방법입니다. spark.streaming.dynamicAllocation 설정을 이용해서 Scaling이 가능합니다. 아래와 같은 설정을 이용합니다.(이 방법은 DStream을 사용할때만 유용하고, Structured Streaming 하고는 상관없을 수 있습니다.)

spark.dynamicAllocation.enabled=false
spark.streaming.dynamicAllocation.enabled=true
spark.streaming.dynamicAllocation.minExecutors=2
spark.streaming.dynamicAllocation.maxExecutors=8

#default
spark.streaming.dynamicAllocation.scalingInterval=60
spark.streaming.dynamicAllocation.scaleUpRatio=0.9
spark.streaming.dynamicAllocation.scaleDownRatio=0.3

원래 spark의 Dynamic Resource Allocation(DRA)이 Streaming 과는 상성이 맞지 않았습니다. Spark의 DRA는 Idle Timedㅡㄹ 체크하는데 Spark의 Streaming은 Micro 배치라 계속 일정시간 마다 작업을 하게되어서 Executor를 놓지 않아서, 늘어는 나도 줄어들지 않게 됩니다.

그래서 Spark 2.x에 나온것이 spark.streaming.dynamicAllocation 이 생겼습니다. spark.streaming.dynamicAllocation 정책은 처음부터 리소스를 전부 가져가지 않고, 다음과 같이 ratio를 계산해서 이를 이용하게 됩니다.

ratio = processing time / processing interval
  • ratio > spark.streaming.dynamicAllocation.scaleUpRatio => executor 1개 추가
  • ratio < spark.streaming.dynamicAllocation.scaleDownRatio => executor 1개 제거

이 값이 minExecutors, maxExecutors 설정의 최대치 까지 적용이 됩니다. 다만 이처리는 ExecutorAllocationManager 에서 처리되는데, 하나의 배치 안에서 증가/추가되지 않고, onBatchCompleted 가 호출될때 마다 실행시간 정보가 반영되어서 이번 Micro 배치가 끝나고 다음번에 영향을 주게 됩니다. 즉 하나의 배치 타이밍만에서만 크게 생기면 효과를 보기 힘들지만, 특정 시간동안 이슈가 있다면 서서히 증가해서 서서히 줄어들게 됩니다.

spark streaming dynamicAllocation을 쓸려면 기존의 spark.dynamicAllocation.enabled=false로 설정해야 합니다.

이렇게 Auto Scaling을 설정하면 모든게 해결될 것 같지만, 그렇지 않습니다. 일단 대부분의 Auto Scaling은 min/max 설정이 있습니다. 이 얘기는 해당 수준보다 더 데이터가 들어오면 결국은 뭔가 데이터 처리에 문제가 장애가 발생할 수 있다는 얘기가 됩니다. 그리고 비용도 더 든다는 얘깁니다. 그리고 또 문제가 되는것은 위의 Auto Scaling 설정도 위의 설정이 반영되는 것은, 엄청 많은 데이터가 들어온다면, 최소한 한번 그 많은 양을 처리해야 합니다.

그래서 이제 도리어 생각을 살짝 바꿔보면, 그냥 아주 느리게 평소에 처리할 수 있는 양 까지만 처리하면 되지 않을까라는 생각을 할 수 있습니다. 그래서 일정 수준까지만 처리하겠다라는 개념이 BackPressure 입니다.

보통 Spark에서 Kafka Streaming 에서 배치 때 처리하는 양은 지난번 처리한 마지막 offset 에서 현재 마지막 offset 까지를 가져와서 처리를 하게 됩니다. 즉, 한번에 1000개씩 처리하던 배치가 1분만에 10000 개가 들어오면, 그 배치 기간에는 10000개를 처리하게 됩니다. 이런 경우가 발생하는 경우가 다음과 같은 두 가지 경우가 있습니다.

  • offset 정책이 earliest 면서 offset 정보가 없어서 처음으로 시작될 때
  • 장애나, 트래픽이 늘어나서, 마지막 처리한 offset 과 현재의 offset 정보 차이가 많은 경우.

BackPressure를 설정하면, 딱 그만큼만 계속 가져오게 됩니다. 즉 1분마다 처리되는 Streaming 인데, BackPressure가 1000으로 설정되면 1분마다 1000개씩만 처리하게 됩니다. 전체 처리는 늦어지지만, 처리하는 입장에서는 부하가 늘어나지 않습니다. Spark 에서 Kafka Streaming은 DStream 과 Structured Streaming에서 셋팅 방법이 좀 다릅니다.(자세한 옵션을 다루지는 않습니다.)

DStream에서의 설정

spark.streaming.backpressure.enabled=true
spark.streaming.backpressure.initialRate=100000
spark.streaming.receiver.maxRate=100000
spark.streaming.kafka.maxRatePerPartition=20000

Structured Streaming 에서의 설정

코드에서 추가해야 합니다.

option("maxOffsetsPerTrigger", "100000")

Structured Streaming에서 주의할 점

Structured Streaming 에서는 checkpoint를 spark에서 관리를 하면서 다음 Offsets을 미리 만들어두고 진행하게 됩니다. 이게 생기게 되면, 해당 Offset 을 가져오려고 시도를 하고 maxOffsetsPerTrigger가 이 다음에 설정되면 해당 배치에는 적용이 되지 않고 그 다음 배치부터 적용되게 됩니다.(이 때는 최종 commit 된 다음 번호의 offset을 지워주면 됩니다.)

다음 offset 을 계산하는 방법에서 RateLimit을 설정해두는데, 이게 항상 현재 Kafka의 마지막 Offset이 되게 됩니다. 그런데 maxOffsetsPerTrigger 가 설정되면, RateLimit 가 해당 값으로 설정되서, 다음 Offset 이 현재 처리된 Offset + rateLImit 로 설정이 됩니다. 이를 이용해서 일종의 BackPressure가 설정이 되게 되는 것입니다. 해당 내용은 이전 블로그를 참고하세요. [입 개발] Spark Structured Streaming 에서 Offset 은 어떻게 관리되는가(아주 간략한 버전)? | Charsyam’s Blog (wordpress.com)

그런데 DStream 방식과 Structured Streaming에서 방식이 달라지는데, 이와 관련해서 정리를 하다보니 [SPARK-24815] Structured Streaming should support dynamic allocation – ASF JIRA (apache.org) 현재 dynamic allocation이 Structured Streaming 하고는 뭔가 어울리지 않는 것 같습니다. 그런데 또 코드를 보면 설정이 상관이 없는건 아닌듯 한… 코드를 보면 BackPressure 설정도 maxOffsetPerTrigger 는 kafka에만 존재하는 값이라 external/kafka-0-10-sql 안에 존재하고 있습니다.

뭔가… 정리하다 보니 이상하게 끝나버리지만… 대충 이렇습니다.

[입 개발] Redis가 maxmemory 보다 더 썼다가 OOM 에러를 던져요!!!

Redis를 운영하다보면 항상 어려운 문제는 memory 관리입니다. In-Memory Cache 다 보니, 메모리 보다 더 많은 데이터를 써서 swap이 발생하면 해당 메모리 page를 접근할 때 마다 swap out 이 발생해서, 속도에 엄청난 영향을 주게 됩니다. 또한 더 많은 메모리를 쓰면, 메모리 문제로 장애가 발생할 수 가 있습니다. 그런데 오늘 팀에서 다음과 같은 에러가 발생했다고 보고를 해주셨습니다.

redis.clients.jedis.exceptions.JedisDataException: OOM command not allowed when used memory > 'maxmemory'.

Redis는 이런 메모리 관리를 위해서 두 가지 옵션을 제공하고 있습니다. 첫 번째는 maxmemory 설정이고 두 번째는 maxmemory policy 입니다.

maxmemory는 메모리를 이것 이상 사용하지 않도록 설정하는 옵션입니다. 내부적으로 메모리 할당에 zmalloc 이라는 함수를 이용하는데, 이 안에서 메모리 할당 사이즈를 계산하고 이를 이용합니다. 다만 Redis에서 할당을 요청하는 값이기 때문에, 실제 메모리 page는 일반적으로 더 사용하게 되고 이로 인해 실제 물리 메모리 사용량은 계산한 값보다 훨씬 더 많이 사용하고 있을 수 있습니다. 이를 통해서 필요할 때, 사용하던 메모리를 반납해서 메모리 용량을 확보하게 되는데 이를 eviction 이라고 합니다.

maxmemory-policy 는 메모리 사용량이 maxmemory 보다 커졌을 때 어떤 정책을 취할 것인가를 정해둔 정책입니다. 다음과 같은 정책 값들이 있습니다.

정책명내용
noevictioneviction 작업을 하지 않고, 바로 write 작업시에 에러를 리턴한다.
volatile-lruexpire set(expire를 건 키집합) 에 대해 LRU 방식을 이용해서 키를 제거한다. 정확한 LRU 방식이 아니라 유사한 방식을 사용
volatile-lfuexpire set 에 대해 LFU 유사한 방식을 이용해서 키를 제거한다.
volatile-randomexpire set 에 대해 랜덤하게 키를 제거한다.
volatile-ttlexpire set에 대해서 ttl이 적게 남은 순으로 키를 제거한다.
allkeys-lru모든 키에 대해서 LRU 유사한 방식으로 키를 제거한다.
allkeys-lfu모든 키에 대해서 LFU 유사한 방식으로 키를 제거한다.
allkeys-random모든 키에 대해서 랜덤하게 키를 제거한다.

여기서 LRU는 Least Recently Used, LFU는 Least Frequently Used 입니다.

각각 conf 에 다음과 같이 설정이 가능합니다.

maxmemory <bytes>
maxmemory-policy <policy>

그럼 Redis 는 언제 이 eviction을 실행하려고 할까요? eviction이 발생하는 사용자가 Command를 실행하려고 할 때 입니다. 만약 현재 사용중인 메모리(used memory)가 maxmemory 설정보다 크다면 발생하게 됩니다. 먼저 processCommand 함수를 살펴봅시다.

int processCommand(client *c) {
    ......
    int is_denyoom_command = (c->cmd->flags & CMD_DENYOOM) ||
                             (c->cmd->proc == execCommand && 
                             (c->mstate.cmd_flags & CMD_DENYOOM));

    ......
        /* Handle the maxmemory directive.
     *
     * Note that we do not want to reclaim memory if we are here re-entering
     * the event loop since there is a busy Lua script running in timeout
     * condition, to avoid mixing the propagation of scripts with the
     * propagation of DELs due to eviction. */
    if (server.maxmemory && !server.lua_timedout) {
        int out_of_memory = (performEvictions() == EVICT_FAIL);
        /* performEvictions may flush slave output buffers. This may result
         * in a slave, that may be the active client, to be freed. */
        if (server.current_client == NULL) return C_ERR;

        int reject_cmd_on_oom = is_denyoom_command;
        /* If client is in MULTI/EXEC context, queuing may consume an unlimited
         * amount of memory, so we want to stop that.
         * However, we never want to reject DISCARD, or even EXEC (unless it
         * contains denied commands, in which case is_denyoom_command is already
         * set. */
        if (c->flags & CLIENT_MULTI &&
            c->cmd->proc != execCommand &&
            c->cmd->proc != discardCommand &&
            c->cmd->proc != resetCommand) {
            reject_cmd_on_oom = 1;
        }

        if (out_of_memory && reject_cmd_on_oom) {
            rejectCommand(c, shared.oomerr);
            return C_OK;
        }

        /* Save out_of_memory result at script start, otherwise if we check OOM
         * until first write within script, memory used by lua stack and
         * arguments might interfere. */
        if (c->cmd->proc == evalCommand || c->cmd->proc == evalShaCommand) {
            server.lua_oom = out_of_memory;
        }
    }
    ......
}

위의 코드를 살펴보면 server.maxmemory 값이 설정되어 있고, server.lua_timeout 이 0이어야만 실행이 되게 되어있습니다. 여기서 out_of_memory 변수에 performEDvcitons() 의 결과 값이 EVICT_FAIL 인지 체크하고 해당 이슈가 실패하면, 해당 command 가 is_denyoom_command 인지를 체크해서 is_denyoom_command 라면, shared.oomerr 를 리턴하게 됩니다.

shared.oomerr 는 다음과 같이 정의되어 있습니다. 아까 어디선가 본듯한 문구이죠?

    shared.oomerr = createObject(OBJ_STRING,sdsnew(
        "-OOM command not allowed when used memory > 'maxmemory'.\r\n"));

그럼 이제 performEvictions 함수를 살펴봅시다.

/* Check that memory usage is within the current "maxmemory" limit.  If over
 * "maxmemory", attempt to free memory by evicting data (if it's safe to do so).
 *
 * It's possible for Redis to suddenly be significantly over the "maxmemory"
 * setting.  This can happen if there is a large allocation (like a hash table
 * resize) or even if the "maxmemory" setting is manually adjusted.  Because of
 * this, it's important to evict for a managed period of time - otherwise Redis
 * would become unresponsive while evicting.
 *
 * The goal of this function is to improve the memory situation - not to
 * immediately resolve it.  In the case that some items have been evicted but
 * the "maxmemory" limit has not been achieved, an aeTimeProc will be started
 * which will continue to evict items until memory limits are achieved or
 * nothing more is evictable.
 *
 * This should be called before execution of commands.  If EVICT_FAIL is
 * returned, commands which will result in increased memory usage should be
 * rejected.
 *
 * Returns:
 *   EVICT_OK       - memory is OK or it's not possible to perform evictions now
 *   EVICT_RUNNING  - memory is over the limit, but eviction is still processing
 *   EVICT_FAIL     - memory is over the limit, and there's nothing to evict
 * */
int performEvictions(void) {
    if (!isSafeToPerformEvictions()) return EVICT_OK;

    int keys_freed = 0;
    size_t mem_reported, mem_tofree;
    long long mem_freed; /* May be negative */
    mstime_t latency, eviction_latency;
    long long delta;
    int slaves = listLength(server.slaves);
    int result = EVICT_FAIL;

    if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL) == C_OK)
        return EVICT_OK;

    if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION)
        return EVICT_FAIL;  /* We need to free memory, but policy forbids. */

    unsigned long eviction_time_limit_us = evictionTimeLimitUs();

    mem_freed = 0;

    latencyStartMonitor(latency);

    monotime evictionTimer;
    elapsedStart(&evictionTimer);

    while (mem_freed < (long long)mem_tofree) {
        int j, k, i;
        static unsigned int next_db = 0;
        sds bestkey = NULL;
        int bestdbid;
        redisDb *db;
        dict *dict;
        dictEntry *de;

        if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
            server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
        {
            struct evictionPoolEntry *pool = EvictionPoolLRU;

            while(bestkey == NULL) {
                unsigned long total_keys = 0, keys;

                /* We don't want to make local-db choices when expiring keys,
                 * so to start populate the eviction pool sampling keys from
                 * every DB. */
                for (i = 0; i < server.dbnum; i++) {
                    db = server.db+i;
                    dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ?
                            db->dict : db->expires;
                    if ((keys = dictSize(dict)) != 0) {
                        evictionPoolPopulate(i, dict, db->dict, pool);
                        total_keys += keys;
                    }
                }
                if (!total_keys) break; /* No keys to evict. */

                /* Go backward from best to worst element to evict. */
                for (k = EVPOOL_SIZE-1; k >= 0; k--) {
                    if (pool[k].key == NULL) continue;
                    bestdbid = pool[k].dbid;

                    if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
                        de = dictFind(server.db[pool[k].dbid].dict,
                            pool[k].key);
                    } else {
                        de = dictFind(server.db[pool[k].dbid].expires,
                            pool[k].key);
                    }

                    /* Remove the entry from the pool. */
                    if (pool[k].key != pool[k].cached)
                        sdsfree(pool[k].key);
                    pool[k].key = NULL;
                    pool[k].idle = 0;

                    /* If the key exists, is our pick. Otherwise it is
                     * a ghost and we need to try the next element. */
                    if (de) {
                        bestkey = dictGetKey(de);
                        break;
                    } else {
                        /* Ghost... Iterate again. */
                    }
                }
            }
        }

        /* volatile-random and allkeys-random policy */
        else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
                 server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM)
        {
            /* When evicting a random key, we try to evict a key for
             * each DB, so we use the static 'next_db' variable to
             * incrementally visit all DBs. */
            for (i = 0; i < server.dbnum; i++) {
                j = (++next_db) % server.dbnum;
                db = server.db+j;
                dict = (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ?
                        db->dict : db->expires;
                if (dictSize(dict) != 0) {
                    de = dictGetRandomKey(dict);
                    bestkey = dictGetKey(de);
                    bestdbid = j;
                    break;
                }
            }
        }

        /* Finally remove the selected key. */
        if (bestkey) {
            db = server.db+bestdbid;
            robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
            propagateExpire(db,keyobj,server.lazyfree_lazy_eviction);
            /* We compute the amount of memory freed by db*Delete() alone.
             * It is possible that actually the memory needed to propagate
             * the DEL in AOF and replication link is greater than the one
             * we are freeing removing the key, but we can't account for
             * that otherwise we would never exit the loop.
             *
             * Same for CSC invalidation messages generated by signalModifiedKey.
             *
             * AOF and Output buffer memory will be freed eventually so
             * we only care about memory used by the key space. */
            delta = (long long) zmalloc_used_memory();
            latencyStartMonitor(eviction_latency);
            if (server.lazyfree_lazy_eviction)
                dbAsyncDelete(db,keyobj);
            else
                dbSyncDelete(db,keyobj);
            latencyEndMonitor(eviction_latency);
            latencyAddSampleIfNeeded("eviction-del",eviction_latency);
            delta -= (long long) zmalloc_used_memory();
            mem_freed += delta;
            server.stat_evictedkeys++;
            signalModifiedKey(NULL,db,keyobj);
            notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
                keyobj, db->id);
            decrRefCount(keyobj);
            keys_freed++;

            if (keys_freed % 16 == 0) {
                /* When the memory to free starts to be big enough, we may
                 * start spending so much time here that is impossible to
                 * deliver data to the replicas fast enough, so we force the
                 * transmission here inside the loop. */
                if (slaves) flushSlavesOutputBuffers();

                /* Normally our stop condition is the ability to release
                 * a fixed, pre-computed amount of memory. However when we
                 * are deleting objects in another thread, it's better to
                 * check, from time to time, if we already reached our target
                 * memory, since the "mem_freed" amount is computed only
                 * across the dbAsyncDelete() call, while the thread can
                 * release the memory all the time. */
                if (server.lazyfree_lazy_eviction) {
                    if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) {
                        break;
                    }
                }

                /* After some time, exit the loop early - even if memory limit
                 * hasn't been reached.  If we suddenly need to free a lot of
                 * memory, don't want to spend too much time here.  */
                if (elapsedUs(evictionTimer) > eviction_time_limit_us) {
                    // We still need to free memory - start eviction timer proc
                    if (!isEvictionProcRunning) {
                        isEvictionProcRunning = 1;
                        aeCreateTimeEvent(server.el, 0,
                                evictionTimeProc, NULL, NULL);
                    }
                    break;
                }
            }
        } else {
            goto cant_free; /* nothing to free... */
        }
    }
    /* at this point, the memory is OK, or we have reached the time limit */
    result = (isEvictionProcRunning) ? EVICT_RUNNING : EVICT_OK;

cant_free:
    if (result == EVICT_FAIL) {
        /* At this point, we have run out of evictable items.  It's possible
         * that some items are being freed in the lazyfree thread.  Perform a
         * short wait here if such jobs exist, but don't wait long.  */
        if (bioPendingJobsOfType(BIO_LAZY_FREE)) {
            usleep(eviction_time_limit_us);
            if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) {
                result = EVICT_OK;
            }
        }
    }

    latencyEndMonitor(latency);
    latencyAddSampleIfNeeded("eviction-cycle",latency);
    return result;
}


여기서 먼저 살펴볼 것은 리턴 값입니다. EVICT_OK는 used_memory가 maxmemory 보다 줄어들었거나, 아직 eviction을 수행할 수 가 없다는 것입니다. EVICT_RUNNING은 현재 eviction 이 계속 진행중이라는 뜻입니다. Single Threaded인 Redis가 어떻게 이렇게 동작할 수 있는지는 뒤에서 설명하겠습니다. 그리고 마지막으로 EVICT_FAIL은 여전히 메모리를 maxmemory 보다 많이 사용하지만, 현재 더 eviction해서 메모리를 줄일 데이터가 없다는 뜻입니다. 이 EVICT_FAIL이 나면 Redis는 OOM 에러를 던질 수 있습니다.

코드를 살펴보면 먼저 isSafeToPerformEvictions 함수가 나옵니다. 현재 eviction을 수행할 수 있는 상태인지를 살펴봅니다. 그래서 무시해도 되는 상황이면 eviction을 진행하지 않도록 0을 리턴합니다. 1이면 eviction 을 진행합니다.

/* Check if it's safe to perform evictions.
 *   Returns 1 if evictions can be performed
 *   Returns 0 if eviction processing should be skipped
 */
static int isSafeToPerformEvictions(void) {
    /* - There must be no script in timeout condition.
     * - Nor we are loading data right now.  */
    if (server.lua_timedout || server.loading) return 0;

    /* By default replicas should ignore maxmemory
     * and just be masters exact copies. */
    if (server.masterhost && server.repl_slave_ignore_maxmemory) return 0;

    /* When clients are paused the dataset should be static not just from the
     * POV of clients not being able to write, but also from the POV of
     * expires and evictions of keys not being performed. */
    if (checkClientPauseTimeoutAndReturnIfPaused()) return 0;

    return 1;
}

두번째로는 getMaxmemoryState 를 호출합니다. getMaxmemoryState는 현재 사용한 메모리 정보를 zmalloc_used_memory() 함수를 통해서 가져옵니다.

/* Get the memory status from the point of view of the maxmemory directive:
 * if the memory used is under the maxmemory setting then C_OK is returned.
 * Otherwise, if we are over the memory limit, the function returns
 * C_ERR.
 *
 * The function may return additional info via reference, only if the
 * pointers to the respective arguments is not NULL. Certain fields are
 * populated only when C_ERR is returned:
 *
 *  'total'     total amount of bytes used.
 *              (Populated both for C_ERR and C_OK)
 *
 *  'logical'   the amount of memory used minus the slaves/AOF buffers.
 *              (Populated when C_ERR is returned)
 *
 *  'tofree'    the amount of memory that should be released
 *              in order to return back into the memory limits.
 *              (Populated when C_ERR is returned)
 *
 *  'level'     this usually ranges from 0 to 1, and reports the amount of
 *              memory currently used. May be > 1 if we are over the memory
 *              limit.
 *              (Populated both for C_ERR and C_OK)
 */
int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level) {
    size_t mem_reported, mem_used, mem_tofree;

    /* Check if we are over the memory usage limit. If we are not, no need
     * to subtract the slaves output buffers. We can just return ASAP. */
    mem_reported = zmalloc_used_memory();
    if (total) *total = mem_reported;

    /* We may return ASAP if there is no need to compute the level. */
    int return_ok_asap = !server.maxmemory || mem_reported <= server.maxmemory;
    if (return_ok_asap && !level) return C_OK;

    /* Remove the size of slaves output buffers and AOF buffer from the
     * count of used memory. */
    mem_used = mem_reported;
    size_t overhead = freeMemoryGetNotCountedMemory();
    mem_used = (mem_used > overhead) ? mem_used-overhead : 0;

    /* Compute the ratio of memory usage. */
    if (level) {
        if (!server.maxmemory) {
            *level = 0;
        } else {
            *level = (float)mem_used / (float)server.maxmemory;
        }
    }

    if (return_ok_asap) return C_OK;

    /* Check if we are still over the memory limit. */
    if (mem_used <= server.maxmemory) return C_OK;

    /* Compute how much memory we need to free. */
    mem_tofree = mem_used - server.maxmemory;

    if (logical) *logical = mem_used;
    if (tofree) *tofree = mem_tofree;

    return C_ERR;
}

그리고 혹시나 maxmemory_policy 가 MAXMEMORY_NO_EVICTION이면 eviction을 안하니 바로 EVICT_FAIL 로 리턴합나다. 첫 번째로 OOMERR를 볼 수 있는 상황입니다.(maxmemroy_policy가 noeviction 일때…)

이제는 while 루프를 돌면서 메모리를 해제하게 됩니다. mem_tofree는 used – maxmemory 값 즉, 지금 얼마나 메모리를 해제해야 하는지를 나타내는 값이며, mem_freed 는 현재까지 확보한 메모리 크기입니다.

    while (mem_freed < (long long)mem_tofree) {
        ......
    }

이제 maxmemory-policy 정책에 따라서 조금 달라지게 됩니다. volatile 계열은 expire set 만(expire 로 ttl이 걸린 key들), ALLKEY 는 모든 key들을 대상으로 하기 때문에, 어떤 db에서 이를 처리할지가 결정되게 됩니다. Redis는 내부적으로 expire set을 관리하기 위해서 expires 라는 내부 변수를 가지고 있습니다. 전체 데이터는 dict 안에 있습니다.

Redis 는 eviction 작업을 좀 쉽게 하기 위해서 먼저 일부를 샘플링해서 eviciton 대상 pool을 만들고 이것을 eviction 하게 됩니다.

maxmemory-policy 가 LRU/LFU 종류거나 volatile_ttl 이면 다음과 같이 동작합니다.

현재 발견된 bestkey(eviction 대상) 가 없다면 evictionPoolPopulate 함수를 통해서 대상 풀을 먼저 만들고 여기서 bestkey를 찾아보게 됩니다.

만약에 maxmemory-policy가 RANDOM 계열이면, 정말 랜덤키를 가져와서 지우게 됩니다.

위의 두 경우에 bestkey를 찾는 것이 실패하면 cant_free로 점프하게 되고 EVICT_FAIL을 리턴하게 됩니다. 이 때 LAZY_FREE를 쓰면 조금 다르게 동작할 수 도 있습니다.(LAZY_FREE면 잠시 sleep 후에도 메모리 상황이 used_memory > maxmemory 이면 EVICT_FAIL 아니면 EVICT_OK를 던집니다. LAZY_FREE니 잠시 기다려보고 줄어들면 OK라는 거죠.)

짧게 Redis가 메모리가 부족할 때 eviction을 어떻게 처리하는지를 살펴보았습니다. 뭐 OOM 에러가 난다는 것은, 메모리가 부족한 상황이라는 것이므로, maxmemory-policy 정책을 바꿀 수 있다면… 바꾸건…(데이터가 날아가도 된다면, volatile 말고 allkey 로…) 아니면 메모리를 증설하는 것이 좋은 방법입니다.

[입 개발] Databricks Terraform Provider databricks_aws_s3_mount 와 resource databricks_dbfs_file

  • databricks terraform proivder 에서 databricks_aws_s3_mount 인데 datatbricks_s3_mount 라고 문서에 오타나있음.
  • databricks_dbfs_file 에서 0.2.9 까지는 다음과 같은 옵션이 필수(content_b64_md5)
resource "databricks_dbfs_file" "my_dbfs_file" {
  source = pathexpand("README.md")
  content_b64_md5 = md5(filebase64(pathexpand("README.md")))
  path = "/sri/terraformdbfs/example/README.md"
  overwrite = true
  mkdirs = true
  validate_remote_file = true
}
  • databricks_dbfs_file 에서 0.3.0 부터는 필요없어짐
resource "databricks_dbfs_file" "this" {
  source = "${path.module}/main.tf"
  path = "/tmp/main.tf"
}
  • databricks_aws_s3_mount 는 cluster_id 나 instance_profile 중에 하나가 있어야 하고 cluster_id 가 비면, 클러스터를 생성하고 mount 한 다음 종료한다.
  • databricks_aws_s3_mount 는 해당 cluster와 instance_profile 의 권한으로 mount 를 하므로, 다른 사용자는 해당 내용의 권한이 없으면 볼 수 없다.
  • databricks_dbfs_file 로 추가한 것은 누구나 볼 수 있다.

[입 개발] EMR에서는 sc.addFile, Databricks에서는 그냥 dbfs 폴더를 이용하자.

기존에 특정 파일을 Spark 클러스터에서 쓰기 위해서는 다음과 같은 방법을 이용했습니다. S3에 파일을 올리고 이를 addFile 한 후 해당 경로에 있다고 하고 사용하는 방식입니다.

EMR

val FileName = "FileName.dat"
val S3_PATH = s"s3://abcd/efg/hijk/$FileName"
spark.sparkContext.addFile(S3_PATH)

ReadFile(FileName)

Databricks 로 가면 위의 방식을 더 이상 사용할 수 없습니다. java.io.FileNotFound Exception이 발생하게 됩니다. 이를 위해서 어떻게 해야할까요? dbfs 가 Spark 클러스터에 마운트 된다는 것을 이용합니다. dbfs 에 파일을 올려두고 다음과 같이 사용하면 됩니다. 경로의 매칭이 어떻게 되는지만 주의하시면 됩니다.

Databricks

val FileName = "FileName.dat"
val DBFS_PATH = s"/dbfs/abcd/efg/hijk/$FileName"
ReadFile(DBFS_PATH)