[입 개발] I don’t know DNS Caching

흐음, 입 개발 전문가 CharSyam  입니다. 나름 입 개발을 오래해보긴 하고, DNS 프로토콜도 직접 구현해보고, Dynamic DNS를 Zookeeper 기반으로 만들어보기도 해서 잘 안다고(이렇게 적고 실제로는 일도 모른다고 읽으시면 됩니다.) 생각했는데… 제 상식을 깨는 일이 발생했습니다.(다른 분들의 상식이 아니라 제 상식이니 무시하시면 됩니다.)

아래와 같은 코드가 존재합니다. 여기서 http://www.naver.com 은 몇번 호출이 될까요?

import requests
requests.get('http://www.naver.com')
requests.get('http://www.naver.com')

 한번도 안한다. 한번만 한다. 두번 한다. 세번 한다. 네번 한다. 정답은 설정과 OS에 따라 다르다입니다.(퍽퍽퍽, 죽어!!!) 디폴트 설정이라는 가정하에서는 Windows와 Linux 에서의 설정이 또 다릅니다. 이게 무슨 소리냐 하면 Windows 는 OS 레벨에서의 DNS가 캐싱이 되므로 아마, 위의 코드는 한번도 안할 수도 있습니다.(이전에 했다면…), 처음 실행된다는 가정하에서는 그럼 한번만 되겠죠. 그런데 여기서는 이제 Linux 쪽, 특히 Debian 계열로 한정을 짓는다면, 위의 코드는 4번의 DNS 호출을 하게 됩니다.(왜 두번이 아니라… 그것은 http://www.naver.comhttps://www.naver.com 으로 redirection 되기 때문에~~~), 그런데 엉? 4번이라고? 처음이라도 한번만 되어야 되는게 아니야?

먼저 tcpdump 를 설치합니다.

sudo apt-get install tcpdump

그리고 udp 53번을 모니터링 합니다.

sudo tcpdump -i eth0 udp port 53

그리고 위의 코드를 실행시키면… 다음과 같은 결과가 나옵니다.

01:32:31.303838 IP 192.168.0.2.60630 > acns.uplus.co.kr.domain: 30077+ A? www.naver.com. (31)
01:32:31.303846 IP 192.168.0.2.60630 > pcns.bora.net.domain: 30077+ A? www.naver.com. (31)
01:32:31.303856 IP 192.168.0.2.60630 > pcns.bora.net.domain: 45919+ AAAA? www.naver.com. (31)
01:32:31.306473 IP pcns.bora.net.domain > 192.168.0.2.60630: 30077 3/3/3 CNAME www.naver.com.nheos.com., A 210.89.160.88, A 210.89.164.90 (199)
01:32:31.306983 IP acns.uplus.co.kr.domain > 192.168.0.2.60630: 30077 3/3/3 CNAME www.naver.com.nheos.com., A 125.209.222.142, A 210.89.164.90 (199)
01:32:31.311150 IP pcns.bora.net.domain > 192.168.0.2.60630: 45919 1/1/0 CNAME www.naver.com.nheos.com. (116)
01:33:08.638991 IP 192.168.0.2.60630 > acns.uplus.co.kr.domain: 31222+ A? www.naver.com. (31)
01:33:08.638999 IP 192.168.0.2.60630 > pcns.bora.net.domain: 31222+ A? www.naver.com. (31)
01:33:08.639010 IP 192.168.0.2.60630 > pcns.bora.net.domain: 64566+ AAAA? www.naver.com. (31)
01:33:08.642771 IP pcns.bora.net.domain > 192.168.0.2.60630: 64566 1/1/0 CNAME www.naver.com.nheos.com. (116)
01:33:08.642781 IP pcns.bora.net.domain > 192.168.0.2.60630: 31222 3/3/3 CNAME www.naver.com.nheos.com., A 125.209.222.141, A 210.89.160.88 (199)
01:33:08.643297 IP acns.uplus.co.kr.domain > 192.168.0.2.60630: 31222 3/3/3 CNAME www.naver.com.nheos.com., A 125.209.222.141, A 210.89.160.88 (199)
......

우리는 OS 레벨의 DNS Caching을 기대하지만, 이 얘기는 기본적으로 OS 레벨에서의 DNS 캐시가 꺼져있다라는 얘기가 됩니다.(대부분의 linux에서 꺼져있다라는…) DNS Level Failover를 적용할려고 하다가, Python 에서는 DNS Caching 이 어떻게 이루어지는지를 보려고 하다보니…, 우연히 https://stackoverflow.com/questions/11020027/dns-caching-in-linux 를 찾게 되었는데…(역시 갓 SO, 참고로 JVM 에서는 DNS Caching 이 영구적이라, 특정 옵션을 주지 않으면, 처음 받게 된 ip를 계속 사용하게 되므로, 기본 옵션으로는 DNS Level Failover를 쓸 수 없습니다.)

여기서 “꺼져있다”, “꺼져있다”, “꺼져있다”를 보고 충격을 받았습니다. 그래서 위와 같이 실험을 했더니… 사실이었습니다. 자세한 내용은 위의 SO 글을 읽으시면… 잘 알게 되는데, glibc의 getaddrinfo 자체에서 발생하는 이슈라고 해서, 다음과 같이 코드를 실행해 봤습니다.

#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

int main(int argc, char *argv[]) {
    struct addrinfo hints;
    struct addrinfo *result;

    memset(&hints, 0, sizeof(struct addrinfo));
    hints.ai_family = AF_UNSPEC;
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_flags = 0;
    hints.ai_protocol = 0;

    for (int i = 0; i < atoi(argv[1]); i++) {
        int s = getaddrinfo("www.naver.com", NULL, &hints, &result);
        if (s != 0) {
            fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(s));
            exit(-1);
        }
        printf("%d\n", i);
        sleep(10);
    }
    return 0;
}

실행해보면, 매번 쿼리가 날라가는 것을 볼 수 있습니다. 즉, 이 이야기는, DNS Caching을 app 수준에서 따로 제어하지 않는다면, DNS 쿼리를 매번 호출하게 된다라는 얘기가 됩니다. 보통 DNS 쿼리를 굉장히 자주 날리는 것은 성능상 문제를 일으킬 수 있습니다. 실제로 이걸 잘 하는게 쉽지는 않을듯 하네요.

glibc 코드를 살짝 까보면, getaddrinfo 는 gaih_inet 이라는 함수를 호출해서 결과를 가져옵니다. gaih_inet 는 USE_NSCD가 켜져있으면 NSCD에서 캐시된 결과를 찾는 것으로 보이고, 그게 아니라면, 일단 hosts 파일에 있는지 체크합니다. 이래서 hosts에 등록하면 DNS 쿼리 없이 항상 제일 먼저 가져오게 됩니다. 그 뒤에 옵션이나 상황에 따라, gethostbyname2_r, gethostbyname3_r, gethostbyname4_r 을 콜하게 됩니다. 이 함수들은 실제로 resolv/nss_dns/dns-host.c 에 있는 _nss_dns_gethostbynameX_r 함수들과 매핑이 되고 여기서 DNS Query를 날리고 가져오게 됩니다.

즉, 이 얘기는 OS단에서 해주는게 없을 가능성이 높고, 지금 DNS 쿼리는 여전히 발생하고 있을지 모른다는 얘기다 됩니다.(일단 저는 이렇게 이해했는데… 잘 아시는 분 답변좀 T.T), 이걸 해결하기 위해서는 nscd 를 설치해야만, OS 레벨의 캐싱이 적용이 되게 됩니다. 아니면, app 레벨에서 직접 해줘야…

일단 자바의 경우는 jvm 레벨에서 DNS 캐싱이 적용되어 있습니다. 그래서 DNS Based Failover를 하려면 좀 더 자세히 알아야 합니다. jdk 소스를 보면 InetAddress.java, InetAddressCachePolicy.java 이 있습니다. InetAddress.java 에서 InetAddressCachePolicy 클래스를 사용하는데, 여기의 기본 옵션이 FOREVER 입니다. 그리고 InetAddress 에서 getCachedAddresses 함수를 호출하면서, 매번 위의 Policy를 확인하고, 위의 값들을 조절하면 networkaddress.cache.ttl 는 내부적으로 사용할 TTL 의 값(가져와서 ttl 확인하고 expire 시키네요.) 두 definition networkaddress.cache.ttl 이나 sun.net.inetaddr.ttl 을 먼저 체크해서 하나라도 값이 있으면, 해당 값으로 설정이 되고 없으면 SecurityManager를 체크하는데 이때 SecurityManager도 없으면 기본 TTL이 30초로 설정되게 됩니다.

[입 개발] Kafka 에서 auto.offset.reset 의 사용법

최근에 서버로그를 컨슈밍해서 뭔가 작업할 일이 생겼습니다. 데이터를 매일 보면서 실제 얼마나 이벤트가 있었는지 볼려고 하니, 실제 로그의 수와 저장된 이벤트의 수가 엄청 다른 것입니다.

확인을 해보니, 로그가… 몇일 전꺼부터 계속 돌고 있었습니다. -_-;;; 즉, 몇일 치가 하루데이터라고 생각하고 계속 저장되고 있었으니, 생각하는 값보다 훨씬 많이 오버된 T.T

저만 그런 문제가 있는가 해서 뒤져보니, auto.offset.reset 이라는 환경설정 값을 찾을 수 있었습니다. auto.offset.reset 은 다음과 같이 설명이 되어 있습니다.

What to do when there is no initial offset in Zookeeper or if an offset is out of range:
* smallest : automatically reset the offset to the smallest offset
* largest : automatically reset the offset to the largest offset
* anything else: throw exception to the consumer. If this is set to largest, the consumer may lose some messages when the number of partitions, for the topics it subscribes to, changes on the broker. To prevent data loss during partition addition, set auto.offset.reset to smallest

일단 설정이 안되었을 때의 기본 값은 largest 입니다.(아주 옛날 소스는 autooffset.reset 이라는 설정에, latest 라는 설정으로 되어있었지만… 현재는 바뀌어 있습니다.)

smallest 는 가지고 있는 오프셋 값 중에 가장 작은 값을 사용합니다. largest는 반대로 가장 최신 offset 을 사용하게 됩니다.

handleOffsetOutOfRange 에서는 autoOffsetReset 값을 통해서 startTimestamp를 설정하고 실제 newOffset 을 earliestOrLatestOffset 을 통해서 가져오게 됩니다.

<pre>def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long = {
  val startTimestamp = config.autoOffsetReset match {
    case OffsetRequest.SmallestTimeString =&gt; OffsetRequest.EarliestTime
    case OffsetRequest.LargestTimeString =&gt; OffsetRequest.LatestTime
    case _ =&gt; OffsetRequest.LatestTime
  }
  val newOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, startTimestamp, Request.OrdinaryConsumerId)
  val pti = partitionMap(topicAndPartition)
  pti.resetFetchOffset(newOffset)
  pti.resetConsumeOffset(newOffset)
  newOffset
}</pre>

그럼 이 옵션을 언제 적용하게 되느냐? 즉, 바꿔말하면 handleOffsetOutOfRange이 언제 호출되는가를 알아야 합니다. AbstractFetcherThread.scala를 보게 되면 Errors.NONE 일 때는 해당 결과 offset 으로 현재 offset을 업데이트 합니다. partitionMap에 PartitionFetchState 로 그 offset을 저장합니다.

<pre>private def processFetchRequest(fetchRequest: REQ) {
  val partitionsWithError = new mutable.HashSet[TopicAndPartition]
  var responseData: Map[TopicAndPartition, PD] = Map.empty

  try {
    trace("Issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))
    responseData = fetch(fetchRequest)
  } catch {
    case t: Throwable =&gt;
      if (isRunning.get) {
        warn(s"Error in fetch $fetchRequest", t)
        inLock(partitionMapLock) {
          partitionsWithError ++= partitionMap.keys
          // there is an error occurred while fetching partitions, sleep a while
          partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
        }
      }
  }
  fetcherStats.requestRate.mark()

  if (responseData.nonEmpty) {
    // process fetched data
    inLock(partitionMapLock) {

      responseData.foreach { case (topicAndPartition, partitionData) =&gt;
        val TopicAndPartition(topic, partitionId) = topicAndPartition
        partitionMap.get(topicAndPartition).foreach(currentPartitionFetchState =&gt;
          // we append to the log if the current offset is defined and it is the same as the offset requested during fetch
          if (fetchRequest.offset(topicAndPartition) == currentPartitionFetchState.offset) {
            Errors.forCode(partitionData.errorCode) match {
              case Errors.NONE =&gt;
                try {
                  val messages = partitionData.toByteBufferMessageSet
                  val validBytes = messages.validBytes
                  val newOffset = messages.shallowIterator.toSeq.lastOption match {
                    case Some(m: MessageAndOffset) =&gt; m.nextOffset
                    case None =&gt; currentPartitionFetchState.offset
                  }
                  partitionMap.put(topicAndPartition, new PartitionFetchState(newOffset))
                  fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = Math.max(0L, partitionData.highWatermark - newOffset)
                  fetcherStats.byteRate.mark(validBytes)
                  // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
                  processPartitionData(topicAndPartition, currentPartitionFetchState.offset, partitionData)
                } catch {
                  case ime: CorruptRecordException =&gt;
                    // we log the error and continue. This ensures two things
                    // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag
                    // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and
                    // should get fixed in the subsequent fetches
                    logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentPartitionFetchState.offset  + " error " + ime.getMessage)
                  case e: Throwable =&gt;
                    throw new KafkaException("error processing data for partition [%s,%d] offset %d"
                      .format(topic, partitionId, currentPartitionFetchState.offset), e)
                }
              case Errors.OFFSET_OUT_OF_RANGE =&gt;
                try {
                  val newOffset = handleOffsetOutOfRange(topicAndPartition)
                  partitionMap.put(topicAndPartition, new PartitionFetchState(newOffset))
                  error("Current offset %d for partition [%s,%d] out of range; reset offset to %d"
                    .format(currentPartitionFetchState.offset, topic, partitionId, newOffset))
                } catch {
                  case e: Throwable =&gt;
                    error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e)
                    partitionsWithError += topicAndPartition
                }
              case _ =&gt;
                if (isRunning.get) {
                  error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id,
                    partitionData.exception.get))
                  partitionsWithError += topicAndPartition
                }
            }
          })
      }
    }
  }

  if (partitionsWithError.nonEmpty) {
    debug("handling partitions with error for %s".format(partitionsWithError))
    handlePartitionsWithErrors(partitionsWithError)
  }
}</pre>

두 번째로는 Errors.OFFSET_OUT_OF_RANGE 에러가 발생했을 때입니다. 이 에러가 발생하면, 아까 본 handleOffsetOutOfRange 함수를 통해서 다시 시작 newOffset을 설정하게 됩니다. 그럼 이 에러는 언제 발생하게 될까요?

Kafka는 consumer 가 자신이 처리한 위치 까지의 offset을 가지고 있는 구조입니다.(보통 주키퍼에 /consumers/{groupid} 형태로 기록이 되지요.) 그러니, 주키퍼의 해당 consumers 목록에 없다면 해당 설정대로 값을 읽게 됩니다. 두 번째로 이미 데이터가 접근할 수 없는 offset 즉, 이미 지워진 상태면… 해당 값을 읽어올 수 없다면, auto.commit.reset의 설정이 동작하게 됩니다.

저는 처음에 이것이 kafka 내에 설정값이 있고 해당 값보다 크면, 항상 새로 읽을 줄 알았습니다. T.T

그럼, 어떻게 항상 최신 데이터부터만 읽을 것인가? 가장 쉬운 방법은 해당 groupid에 속하는 오프셋을 필요할 때마다 지워주는 것입니다. 이러면 항상 최신의 데이터 부터 읽어오는 것을 알 수 있습니다. 두 번째는 굉장히 빨리 데이터를 지우도록 설정하는 것인데, 이러면 의도하지 않은 동작(그 주기 이전부터 읽어오게 되므로) 원하는 형태는 아닙니다.

[입 개발] Redis AOF Rewrite 설정에 관하여…

오래간만에 Redis 관련 블로깅을 하네요.(요새는 실력 부족이라… 적을 내용이…) 먼저 실제 내용 자체는 지인의 질문에 대한 답변을 해드리다가, 정리할 필요가 있어서 이렇게 적습니다.(그렇습니다. 나중에 제가 까먹을까봐지요. 그 때 소스 다시 보기는 그러니…)

Redis에서 Persist 를 제공하는 방식은 RDB/AOF 두 가지가 있습니다. 뭐, 당연히 이쯤 되면 다들 기본적으로 이게 뭔지는 아실테니… 자세한 내용은 넘어가고요. 원래 AOF는 Redis 의 Comment Handler가 처리하는 명령들 중에 write관련 커맨드들만, 디스크에 Redis Protocol 그대로 저장하는 방식입니다.(현재 이에 대해 압축을 해볼까 하는 얘기들도 오고가고 있습니다만… 일단 이건 논외로…)

그런데 로그중에 다음과 같은 에러를 보셨다고 합니다.

"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis."

사실 Redis는 스레드를 몇 개 사용하긴 합니다. 그 중에 하나가 fsync를 스레드에서 실행하는 거죠. appendfsync 가 everysec 일 때만, 백그라운드로 돕니다. fsync 자체에서 대기하게 되면, redis가 아무작업도 못하게 되니…

appendfsync everysec

하여튼 fsync가 큐에 들어갔는데, 아직 실행이 안끝났다. 디스크가 바쁘거나, 뭔가 이슈가 있을꺼라라는 뜻이죠. 실제로 디스크가 좀 이상한 느낌이라고…

삼천포로 빠졌는데, 다시 AOF rewrite 얘기로 넘어가면, 실제로 모든 write 성 작업이 저장되기 때문에 AOF 파일이 한없이 커질 수가 있습니다. 그래서 특정시점에, 해당 파일 사이즈가 얼마 이상이면, rdb 생성 처럼 fork 후에 현재 메모리 정보를 전부 AOF 형식으로 다시 쓰게 됩니다.(그럼 사이즈가 줄어들 가능성이 있겠죠?, 변경이나 삭제가 한번도 없었다면, 줄어들지 않을수도…)

그럼 당연한 의문이 생깁니다. 어느 정도 되어야 rewrite가 일어날까? 관련 옵션이 바로 아래 두가지 입니다.

auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb

일단 rewrite가 rdb 처럼 fork를 이용하므로 사용하지 않고 싶다면 위의 auto-aof-rewrite-percentage 값을 0으로 설정하면 됩니다. auto-aof-rewrite-min-size 는 일단 최소 이것보다는 사이즈가 커야 rewrite 하라는 뜻입니다.

내부적으로 코드를 살펴보면 다음과 같습니다.

         /* Trigger an AOF rewrite if needed */
         if (server.rdb_child_pid == -1 &&
             server.aof_child_pid == -1 &&
             server.aof_rewrite_perc &&
             server.aof_current_size > server.aof_rewrite_min_size)
         {
            long long base = server.aof_rewrite_base_size ?
                            server.aof_rewrite_base_size : 1;
            long long growth = (server.aof_current_size*100/base) - 100;
            if (growth >= server.aof_rewrite_perc) {
                serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
                rewriteAppendOnlyFileBackground();
            }
         }

내부적으로 aof_current_size 와 aof_rewrite_base_size 를 사용하는데… AOF를 처음 만들면… 마지막으로 만들어졌던 AOF 파일의 크기가 aof_rewrite_base_size 가 되고 현재 AOF 파일의 크기가 aof_current_size 가 됩니다. 즉 auto-aof-rewrite-percentage 이 100이면, 이전에 만들어진 AOF가 1G 였다면, 2G가 되면(즉 100% 커지면) 하라는 뜻이 됩니다. 그리고 위의 조건 문에 의해서 0이면 실제로 AOF rewrite는 동작하지 않습니다.

[입개발] Redis Command 구조체에 대해서…

오늘 올라온 Redis 버그 중에 클러스터에서 geo command가 redirect를 먹지 않는다 라는 버그가 있었습니다. 그런데 antirez가 아주 쉽게 버그를 수정하면서 패치가 되었다고 답변을 답니다.(이 버그는 cluster 모드일때만 발생합니다.) 그리고 해당 글은 꼭 끝까지 보셔야 합니다!!!

https://github.com/antirez/redis/issues/2671

그리고 그 패치라는 것도 다음과 같습니다.
https://github.com/antirez/redis/commit/5c4fcaf3fe448c5575a9911edbcd421c6dbebb54

     {"geoadd",geoaddCommand,-5,"wm",0,NULL,1,1,1,0,0},
     {"georadius",georadiusCommand,-6,"r",0,NULL,1,1,1,0,0},
     {"georadiusbymember",georadiusByMemberCommand,-5,"r",0,NULL,1,1,1,0,0},
-    {"geohash",geohashCommand,-2,"r",0,NULL,0,0,0,0,0},
-    {"geopos",geoposCommand,-2,"r",0,NULL,0,0,0,0,0},
-    {"geodist",geodistCommand,-4,"r",0,NULL,0,0,0,0,0},
+    {"geohash",geohashCommand,-2,"r",0,NULL,1,1,1,0,0},
+    {"geopos",geoposCommand,-2,"r",0,NULL,1,1,1,0,0},
+    {"geodist",geodistCommand,-4,"r",0,NULL,1,1,1,0,0},
     {"pfselftest",pfselftestCommand,1,"r",0,NULL,0,0,0,0,0},
     {"pfadd",pfaddCommand,-2,"wmF",0,NULL,1,1,1,0,0},
     {"pfcount",pfcountCommand,-2,"r",0,NULL,1,1,1,0,0},

0, 0, 0 이 1, 1, 1 로만 바뀐거죠. 그럼 도대체 저 값들은 뭘 의미하는 걸까요? 그걸 위해서 이제 Redis Command 구조체를 알아보도록 하겠습니다.

먼저 Redis Command 구조체는 다음과 같습니다.

struct redisCommand {
    char *name;
    redisCommandProc *proc;
    int arity;
    char *sflags; /* Flags as string representation, one char per flag. */
    int flags;    /* The actual flags, obtained from the 'sflags' field. */
    /* Use a function to determine keys arguments in a command line.
     * Used for Redis Cluster redirect. */
    redisGetKeysProc *getkeys_proc;
    /* What keys should be loaded in background when calling this command? */
    int firstkey; /* The first argument that's a key (0 = no keys) */
    int lastkey;  /* The last argument that's a key */
    int keystep;  /* The step between first and last key */
    long long microseconds, calls;
};

먼저 위의 샘플 커맨드와 하나를 비교해보면

     {"pfadd",pfaddCommand,-2,"wmF",0,NULL,1,1,1,0,0},

name 과 proc는 각각 명확합니다. name은 사용자로 부터 입력받는 명령어이고, proc는 해당 명령어에 연결된 함수포인터입니다. 즉 name으로 사용자가 입력하면 proc가 실행되는 것이죠.

이제 arity 부터가 영어를 알면 쉽게 알 수 있습니다. 그렇습니다. 영어를 모르면 어렵지만…(전 방금 사전을…)
다음과 같은 뜻이 있습니다.

(logic, mathematics, computer science) The number of arguments or operands a function or operation takes. For a relation, the number of domains in the corresponding Cartesian product.

즉 파라메터의 개수입니다. 다음 코드를 보시죠. arity가 양수일 때는 파라매터 개수랑 동일해야 하고(고정적), 음수이면, 절대값으로 이값보다는 많거나 같아야 하는거죠(동적). 즉 -2 면 파라매터가 2개 이상이어야 하는거고, 2면 딱 2개여야 하는겁니다.

    } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
               (c->argc < -c->cmd->arity)) {
        flagTransaction(c);
        addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
            c->cmd->name);
        return REDIS_OK;
    }

sflags는 명령어의 속성을 문자열로 나타냅니다. 각 속성은 populateCommandTable 함수에 잘 나타나 있습니다.

            case 'w': c->flags |= REDIS_CMD_WRITE; break;
            case 'r': c->flags |= REDIS_CMD_READONLY; break;
            case 'm': c->flags |= REDIS_CMD_DENYOOM; break;
            case 'a': c->flags |= REDIS_CMD_ADMIN; break;
            case 'p': c->flags |= REDIS_CMD_PUBSUB; break;
            case 's': c->flags |= REDIS_CMD_NOSCRIPT; break;
            case 'R': c->flags |= REDIS_CMD_RANDOM; break;
            case 'S': c->flags |= REDIS_CMD_SORT_FOR_SCRIPT; break;
            case 'l': c->flags |= REDIS_CMD_LOADING; break;
            case 't': c->flags |= REDIS_CMD_STALE; break;
            case 'M': c->flags |= REDIS_CMD_SKIP_MONITOR; break;
            case 'k': c->flags |= REDIS_CMD_ASKING; break;
            case 'F': c->flags |= REDIS_CMD_FAST; break;

flags는 값이 모두 0입니다. 이 값은 로딩시에 sflags 로 부터 계산해서 만들어집니다. 위의 populateCommandTable 함수를 보시면 sflags 속성에 따라서 c->flags를 만드는 코드가 바로 보입니다. 왜 이렇게 구현했을까요? 아마 숫자값으로 설정되어 있는것 보다, 문자열이 보기 쉬워서가 아닐까 합니다.

getkeys_proc 는 파라매터만 가지고는 key를 찾기가 힘들때, 보조적으로 사용하는 함수의 함수포인터입니다. 위의 예에서는 전부 NULL 이지만, 다음과 같은 커맨들들이 있습니다.

    {"zunionstore",zunionstoreCommand,-4,"wm",0,zunionInterGetKeys,0,0,0,0,0},
    {"zinterstore",zinterstoreCommand,-4,"wm",0,zunionInterGetKeys,0,0,0,0,0},

zunionInterGetKeys 의 형태를 살펴보면 다음과 같습니다.

/* Helper function to extract keys from following commands:
 * ZUNIONSTORE <destkey> <num-keys> <key> <key> ... <key> <options>
 * ZINTERSTORE <destkey> <num-keys> <key> <key> ... <key> <options> */
int *zunionInterGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
    int i, num, *keys;
    REDIS_NOTUSED(cmd);

    num = atoi(argv[2]->ptr);
    /* Sanity check. Don't return any key if the command is going to
     * reply with syntax error. */
    if (num > (argc-3)) {
        *numkeys = 0;
        return NULL;
    }

    /* Keys in z{union,inter}store come from two places:
     * argv[1] = storage key,
     * argv[3...n] = keys to intersect */
    keys = zmalloc(sizeof(int)*(num+1));

    /* Add all key positions for argv[3...n] to keys[] */
    for (i = 0; i < num; i++) keys[i] = 3+i;

    /* Finally add the argv[1] key position (the storage key target). */
    keys[num] = 1;
    *numkeys = num+1;  /* Total keys = {union,inter} keys + storage key */
    return keys;
}

그리고 getkeys_proc 를 쓰는 곳은 db.c 에서 getKeysFromCommand 에서 다음과 같이 사용하고 있습니다.

int *getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
    if (cmd->getkeys_proc) {
        return cmd->getkeys_proc(cmd,argv,argc,numkeys);
    } else {
        return getKeysUsingCommandTable(cmd,argv,argc,numkeys);
    }
}

그런데 재미있는 것은 getkeys_proc 는 실제로 코드는 예전부터 존재했으나, 거의 쓰이지 않던 파라매터라는 것입니다. 실제로 현재 버전에서도 commandCommand 와 cluster 정보를 얻기위해서만 쓰고 있습니다.

firstkey, lastkey, keystep 은 각각, 첫번째 파라매터가 key 된다. 마지막 파라매터가 key 가 된다. keystep은 1이면 [key, key, key] 형태고, 2이면 [key, val, key, val, key, val] 형태가 되는 것을 의미합니다. 그래서 이 값이, 0, 0, 0 이면 geohash가 다음과 같은 결과를 냅니다.

127.0.0.1:6379> GEOHASH Sicily Palermo Catania
(empty list or set)

위의 값이 1로 셋팅이 되면… 다음과 같이 redirect 결과가 나옵니다.

127.0.0.1:6381> GEOHASH Sicily Palermo Catania
(error) MOVED 10713 127.0.0.1:6380

물론 둘 다 해당 서버인 6380에서 실행하면 정상적인 결과가 나옵니다.

127.0.0.1:6380> GEOHASH Sicily Palermo Catania
1) "sqc8b49rny0"
2) "sqdtr74hyu0"

그런데 왜 이게 이런 차이가 날까요? 아까 살짝 언급한게 “getkeys_proc 는 commandCommand 와 cluster 정보를 얻기위해서만 쓰고 있습니다.” 라고 말했습니다. 저 firstkey, lastkey, keystep 도 상단의 getKeysUsingCommandTable 즉 getKeysFromCommand 에서만 사용되고 있습니다.

다시말하면, 결국 cluster 에서 뭔가 정보를 얻을때, firstkey, lastkey, keystep을 쓴다는 것입니다. 그냥 커맨드가 독립적으로 실행될 때는 안쓴다는 T.T

코드를 찾아보면 cluster.c 의 getNodeByQuery 에서 위의 getKeysFromCommand 를 쓰고 있습니다. 어떤용도로 쓰고 있을까요?

geohash 명령을 cluster 모드에서 받았다고 할때… 실제로 getKeysFromCommand 는 getKeysUsingCommandTable를 호출하게 되는데 cmd->firstkey 는 0이므로 NULL이 리턴됩니다. numkey도 0이 셋팅됩니다.

int *getKeysUsingCommandTable(struct redisCommand *cmd,robj **argv, int argc, int *numkey>
    int j, i = 0, last, *keys;
    REDIS_NOTUSED(argv);

    if (cmd->firstkey == 0) {
        *numkeys = 0;
        return NULL;
    }
    last = cmd->lastkey;
    if (last < 0) last = argc+last;
    keys = zmalloc(sizeof(int)*((last - cmd->firstkey)+1));
    for (j = cmd->firstkey; j <= last; j += cmd->keystep) {
        redisAssert(j < argc);
        keys[i++] = j;
    }
    *numkeys = i;
    return keys;
}

그래서 getNodeByQuery에서는 다음과 같은 코드가 실행됩니다.

clusterNode *n = NULL
keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys);

//......

if (n == NULL) return myself;

즉 numkey가 0이라 노드를 찾는 작업을 하지 않고 n == NULL 이라서 자기자신에 속한다고 리턴해버립니다. 그래서 자기가 붙은 노드에서 검색하고 결과가 없어서 빈 값이 넘어가게 되는겁니다.

실제로 processCommand 소스를 보시면… 아래에 getNodeByQuery 를 호출하고, myself 가 아닐때에는 hashslot을 가진 서버로 clusterRedirectClient를 호출해서 보내게 되는데… 아까 위의 결과에서 myself가 리턴되기 때문에… 해당 서버에서 실행되게 되어서 결과가 없는 것입니다.

    if (server.cluster_enabled &&
        !(c->flags & REDIS_MASTER) &&
        !(c->flags & REDIS_LUA_CLIENT &&
          server.lua_caller->flags & REDIS_MASTER) &&
        !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0))
    {
        int hashslot;

        if (server.cluster->state != REDIS_CLUSTER_OK) {
            flagTransaction(c);
            clusterRedirectClient(c,NULL,0,REDIS_CLUSTER_REDIR_DOWN_STATE);
            return REDIS_OK;
        } else {
            int error_code;
            clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_cod>
            if (n == NULL || n != server.cluster->myself) {
                flagTransaction(c);
                clusterRedirectClient(c,n,hashslot,error_code);
                return REDIS_OK;
            }
        }
    } 

딱, 이렇습니다라고 말해야 하지만… 사실 여기는 훼이크가 있습니다. 위의 processCommand의 코드를 자세히 살펴보면… 아래와 같은 코드가 있습니다.

    !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0)

네 그렇습니다. getkeys_proc 가 없고, firstkey가 0이 아니어야 아래의 코드를 타는 것입니다. 미리 조건을 걸러낸것입니다. 그래서 실제로… 위의 코드를 타지 않고… 아예 바로 해당 서버에서 실행됩니다. 즉 myself를 리턴한것과 동일한 효과가 나는 것이지요. firstkey가 0이면 파라매터가 없는 명령으로 인식되어서 그냥 해당 서버에서 실행이 되는 것입니다.

살짝 속으셨다고 분노하시겠지만, 결국은 해당 코드도 그렇게 동작하게 됩니다. 🙂 호호호호… 그러면 즐거운 하루되시길…

[입 개발] Redis internal : Redis Dictionary Type 에 관해서

Redis 는 기본적으로 Hash 형태로 데이터를 관리하지만, 내부적으로 관리가 필요한 정보들 역시, 내부적으로는 Dictionary 라고 부르는 Hash 형태로 관리하고 있습니다. 그런데 이 Dictionary 에 대한 핸들링이, 관리되는 데이터의 종류에 따라서 다르게 처리되어야 할 때가 있습니다. 아마 오늘 글을 그냥 Redis를 쓰시는 분 입장에서는 별 내용이 없고, Redis 소스를 건드리시는 분들에게는 아주 살짝 도움이 될듯합니다.

보통 대부분의 언어에서는 함수 오버라이딩등을 이용한 폴리모피즘을 이용해서 이런 방식의 요구사항을 좀 수월하게 처리하게 되어 있습니다. 그러나 C 에서는… 기본적으로 이런 방식이 제공되지 않지만, 함수 포인터를 이용해서 이런 방식을 구현할 수 있습니다. 가장 유명한 예가, 리눅스 커널의 VFS 같은 부분을 보면 됩니다.

cluster.c 의 clusterInit 코드를 보면 다음과 같이 dictCreate 함수를 이용해서 dictionary를 생성하는 것을 볼 수 있습니다.

    server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL);
    server.cluster->nodes_black_list =
        dictCreate(&clusterNodesBlackListDictType,NULL);

dictCreate 함수를 살펴보면 다음과 같이 dictType, privDataPtr 두개의 파라매터를 가지고, 결과로 dict 를 넘겨줍니다.

dict *dictCreate(dictType *type,
        void *privDataPtr)
{
    dict *d = zmalloc(sizeof(*d));

    _dictInit(d,type,privDataPtr);
    return d;
}

먼저 dict 구조체부터 살펴보도록 하겠습니다.

typedef struct dict {
    dictType *type;
    void *privdata;
    dictht ht[2];
    long rehashidx; /* rehashing not in progress if rehashidx == -1 */
    int iterators; /* number of iterators currently running */
} dict;

dict 구조체는 dictType 과 dictht 두개를 가집니다. 여기서 dictht는 dict의 테이블 확장시에 사용하기 위한 것입니다. 그럼 이 dict 안에 있는 dictType은 뭘까요? dictType은 다음과 같이 정의되어 있습니다.

typedef struct dictType {
    unsigned int (*hashFunction)(const void *key);
    void *(*keyDup)(void *privdata, const void *key);
    void *(*valDup)(void *privdata, const void *obj);
    int (*keyCompare)(void *privdata, const void *key1, const void *key2);
    void (*keyDestructor)(void *privdata, void *key);
    void (*valDestructor)(void *privdata, void *obj);
} dictType;

dictType 을 보시면 hashFuction, keyDup, valDup, keyCompare, keyDestructor, valDestructor 같은 값들이 정의되어 있습니다. 즉, 함수포인터를 가지고 있기 때문에, 여기서 가리키는 함수의 동작이 다르면, 같은 형태로 보이더라도 서로 다르게 동작하게 할 수 있습니다.

그리고 현재 Redis 에는 다음과 같이 8개의 dictType이 정의되어 있습니다.

  • dictType setDictType;
  • dictType clusterNodesDictType;
  • dictType clusterNodesBlackListDictType;
  • dictType dbDictType;
  • dictType shaScriptObjectDictType;
  • dictType hashDictType;
  • dictType replScriptCacheDictType;

dictType 구조체 안에서 특히 keyCompare 를 통해서 해당 키를 찾아내게 됩니다.
예를 들어서, 내부적으로 실제 키들을 저장하는 곳에서는 dbDictType 을 사용합니다. dbDictType을 보면 다음과 같습니다.

/* Db->dict, keys are sds strings, vals are Redis objects. */
dictType dbDictType = {
    dictSdsHash,                /* hash function */
    NULL,                       /* key dup */
    NULL,                       /* val dup */
    dictSdsKeyCompare,          /* key compare */
    dictSdsDestructor,          /* key destructor */
    dictRedisObjectDestructor   /* val destructor */
};

dbDictType에서는 Key의 비교를 위해서는 dictSdsKeyCompare 를 사용하고, dictSdsKeyCompare는 다음과 같이 구현되어 있습니다.

int dictSdsKeyCompare(void *privdata, const void *key1,
        const void *key2)
{
    int l1,l2;
    DICT_NOTUSED(privdata);

    l1 = sdslen((sds)key1);
    l2 = sdslen((sds)key2);
    if (l1 != l2) return 0;
    return memcmp(key1, key2, l1) == 0;
}

그리고 Redis Command 를 저장하는 곳에서 사용하는 commandTableDictType 에서는 대소문자 구분이 필요없을 때는 dictSdsKeyCaseCompare 를 사용합니다.

int dictSdsKeyCaseCompare(void *privdata, const void *key1,
        const void *key2)
{
    DICT_NOTUSED(privdata);

    return strcasecmp(key1, key2) == 0;
}

위와 같은 형태에서 dictFind를 살펴보면 내부적으로 dictType 함수포인터를 이용하게 됩니다.

dictEntry *dictFind(dict *d, const void *key)
{
    dictEntry *he;
    unsigned int h, idx, table;

    if (d->ht[0].size == 0) return NULL; /* We don't have a table at all */
    if (dictIsRehashing(d)) _dictRehashStep(d);
    h = dictHashKey(d, key);
    for (table = 0; table <= 1; table++) {
        idx = h & d->ht[table].sizemask;
        he = d->ht[table].table[idx];
        while(he) {
            if (dictCompareKeys(d, key, he->key))
                return he;
            he = he->next;
        }
        if (!dictIsRehashing(d)) return NULL;
    }
    return NULL;
}

#define dictCompareKeys(d, key1, key2) \
    (((d)->type->keyCompare) ? \
        (d)->type->keyCompare((d)->privdata, key1, key2) : \
        (key1) == (key2))

위의 dictCompareKeys가 내부적으로 dictType의 keyCompare를 이용하는 것을 볼 수 있습니다. 그런데 여기서 조심해야 하는 것들이 있습니다. 실제 dictFind 에서 key를 넘겨줄 때, dictSdsKeyCompare 는 key가 무조건 sds type이어야 하지만, dictSdsKeyCaseCompare 에서는 실제 값만 비교하므로, 단순 string 이어도 가능합니다. 말 그대로 가능만 합니다. 그런데, 뭔가 잘못 쓰기 시작하면… Redis가 뻥뻥 죽어나가는 걸 보실 수 있을껍니다.

그래서 cluster.c 소스를 보면, 무조건 key가 sds 형태여야 하기 때문에 발생하는 사소한 오버헤드도 존재합니다. 뭐, 그러나 자주 발생하지는 않으니… 그냥 패스를 ㅎㅎㅎ

[입개발] Redis Scan은 어떻게 동작할까? PART #3(결)

PART #1, PART #2를 보면 결국 Redis Scan에서의 Cursor는 bucket 을 검색해야할 다음 index 값이라고 볼 수 있습니다. 그런데 실제로 실행시켜보면, 0, 1, 2 이렇게 증가하지 않고…

그 이유중에 하나는… 실제 Cursor 값이 다음 index의 reverse 값을 취하고 있기 때문입니다. 이걸 보기 전에 먼저 다시 한번 Scan의 핵심 함수인 distScan을 살펴보도록 하겠습니다.(젤 뒤만 보면 됩니다.)

unsigned long dictScan(dict *d,
                       unsigned long v,
                       dictScanFunction *fn,
                       void *privdata)
{
    dictht *t0, *t1;
    const dictEntry *de;
    unsigned long m0, m1;

    if (dictSize(d) == 0) return 0;

    if (!dictIsRehashing(d)) {
        t0 = &(d->ht[0]);
        m0 = t0->sizemask;

        /* Emit entries at cursor */
        de = t0->table[v & m0];
        while (de) {
            fn(privdata, de);
            de = de->next;
        }

    } else {
      ......    
    }

    /* Set unmasked bits so incrementing the reversed cursor
     * operates on the masked bits of the smaller table */
    v |= ~m0;

    /* Increment the reverse cursor */
    v = rev(v);
    v++;
    v = rev(v);

    return v;
}

한 이터레이션이 끝나고 나면 m0 의 bitwise NOT을 or 하고 reverse를 취한 다음 1을 더하고 다시 reverse를 취합니다. 일단 bucket이 4개만 있다고 가정하고, rehashing은 빼고 생각해보도록 합니다. 먼저 여기서 reverse는 비트를 쭈욱 세워놓고, 그걸 거꾸로 뒤집는 것입니다.
그래서 0의 rev(0) 은 그대로 0이고, rev(1)은 8000000000000000(16진수), rev(2)는 4000000000000000(16진수) 가 됩니다.(아 이걸 출력을 64bit 라 64bit hex로 찍어야 하는데 32bit로 찍었다가… 잘못된 이해를 ㅋㅋㅋ)

처음에는 v(cursor) 가 0입니다. scan이 끝나고 (0 |= ~3) = -4, 그 뒤에 rev(-4)는 3fffffffffffffff(16진수) 가 됩니다. 여기에 1을 더하면 4000000000000000 여기서 다시 rev(4000000000000000)가 되면 2가 나오게 됩니다.

/* Function to reverse bits. Algorithm from:
 * http://graphics.stanford.edu/~seander/bithacks.html#ReverseParallel */
static unsigned long rev(unsigned long v) {
    unsigned long s = 8 * sizeof(v); // bit size; must be power of 2
    unsigned long mask = ~0;
    while ((s >>= 1) > 0) {
        mask ^= (mask << s);
        v = ((v >> s) & mask) | ((v << s) & ~mask);
    }
    return v;
}

그런데 왜 reverse를 취하는 것일까요? 이것은 실제 적으로 1씩 증가하는 형태라면… cursor가 언제 끝나는지 알려주기가 애매해서 입니다. 즉 끝났다는 값을 다시 줘야 하는데, 그것보다는 0으로 시작해서 다시 0으로 끝날 수 있도록 reverse 형태를 취하는 것이죠.

PART #1, PART #2, PART #3 의 이유로 해서 SCAN은 다음과 같은 제약 사항을 가집니다.
1. count 값을 줄 수 있지만, 딱 그 개수를 보장하지 않는다.
2. 이미 scan 이 지나간 인덱스에 있는 index 에 나중에 추가된 아이템은 iteration 중에 데이터가 나오지 못한다.(Cursor가 이미 지나갔으므로…)
3. 해당 코드의 설명을 보면… 몇몇 데이터가 중복 될 수 있다는데… 이 부분은 저도 잘 이해가 안가는… 코멘트에 보면… hash table이 확장될때, 줄어들 때, rehashing 할 때 다시 스캔하지 않는다고 되어있는데… 이 부분은 잘 모르겠네요. ㅎㅎㅎ

그럼 Redis Scan 에 대한 부분은 마치도록 하겠습니다.

[입개발] Redis Scan은 어떻게 동작할까? PART #2

지난 Part #1 에서는 기본적인 Redis 의 Scan 동작과 테이블에 대해서 알아보았습니다. 이번에는 Redis Scan의 동작을 더 분석하기 위해서 기본적으로 Redis Hash Table의 Rehash 과 그 상황에서 Scan이 어떻게 동작하는지 알아보도록 하겠습니다.

먼저, Redis Hash Table의 Rehashing에 대해서 알아보도록 하겠습니다. 전 편에서도 간단하게 언급했지만 Redis Hash Table은 보통 Dynamic Bucket 에 충돌은 list 로 처리하는 방식입니다.

redis_hash_1

처음에는 4개의 Bucket으로 진행하면 Hash 값에 bitmask를 씌워서 Hash Table 내의 index를 결정합니다. 그런데, 이대로 계속 데이터가 증가하면, 당연히 충돌이 많고, List가 길어지므로, 탐색 시간이 오래걸리게 되어서 문제가 발생합니다. Redis는 이를 해결하기 위해서 hash table의 사이즈를 2배로 늘리는 정책을 취합니다.

redis_hash_expand

2배로 테이블이 늘어나면서, bitmask는 하나더 사용하도록 됩니다. 이렇게 테이블이 확장되면 Rehash를 하게 됩니다. 그래야만 검색시에 제대로 찾을 수 있기 때문입니다. 먼저 Table을 확장할 때 사용하는 것이 _dictExpandIfNeeded 합수입니다. dictIsRehashing는 이미 Rehash 중인지를 알려주는 함수이므로, Rehashing 중이면 이미 테이블이 확장된 상태이므로 그냥 DICT_OK를 리턴합니다.

먼저 hash table에서 hash table의 사용 정도가 dict_force_resize_ratio 값 보다 높으면 2배로 확장하게 됩니다.

/* Expand the hash table if needed */
static int _dictExpandIfNeeded(dict *d)
{
    /* Incremental rehashing already in progress. Return. */
    if (dictIsRehashing(d)) return DICT_OK;

    /* If the hash table is empty expand it to the initial size. */
    if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);

    /* If we reached the 1:1 ratio, and we are allowed to resize the hash
     * table (global setting) or we should avoid it but the ratio between
     * elements/buckets is over the "safe" threshold, we resize doubling
     * the number of buckets. */
    if (d->ht[0].used >= d->ht[0].size &&
        (dict_can_resize ||
         d->ht[0].used/d->ht[0].size > dict_force_resize_ratio))
    {
        return dictExpand(d, d->ht[0].used*2);
    }
    return DICT_OK;
}

실제로 _dictExpandIfNeeded 는 _dictKeyIndex 함수에서 호출하게 됩니다. 이렇게 테이블이 확장되면 Rehash를 해야 합니다. Rehash 라는 것은 테이블의 Bucket 크기가 커졌고 bitmask 가 달라졌으니… mask 0011이 전부 3번째 index였다면 이중에서 111은 7번째로, 011은 3번째로 옮기는 것입니다. 여기서 Redis의 특징이 하나 있습니다. 한꺼번에 모든 테이블을 Rehashing 해야 하면 당연히 시간이 많이 걸립니다. O(n)의 시간이 필요합니다. 그래서 Redis는 rehash flag와 rehashidx라는 변수를 이용해서, hash table에서 하나씩 Rehash하게 됩니다. 즉, 확장된 크기가 8이라면 이전 크기 총 4번의 Rehash 스텝을 통해서 Rehashing이 일어나게 됩니다. (이로 인해서 뒤에서 설명하는 특별한 현상이 생깁니다.)

그리고 현재 rehashing 중인것을 체크하는 함수가 dictIsRehashing 함수입니다. rehashidx 가 -1이 아니면 Rehashing 중인 상태입니다.

#define dictIsRehashing(d) ((d)->rehashidx != -1)

그리고 위의 _dictExpandIfNeeded 에서 호출하는 실제 hash table의 크기를 증가시키는 dictExpand 함수에서 rehashidx를 0으로 설정합니다.

/* Expand or create the hash table */
int dictExpand(dict *d, unsigned long size)
{
    dictht n; /* the new hash table */
    unsigned long realsize = _dictNextPower(size);

    /* the size is invalid if it is smaller than the number of
     * elements already inside the hash table */
    if (dictIsRehashing(d) || d->ht[0].used > size)
        return DICT_ERR;

    /* Allocate the new hash table and initialize all pointers to NULL */
    n.size = realsize;
    n.sizemask = realsize-1;
    n.table = zcalloc(realsize*sizeof(dictEntry*));
    n.used = 0;

    /* Is this the first initialization? If so it's not really a rehashing
     * we just set the first hash table so that it can accept keys. */
    if (d->ht[0].table == NULL) {
        d->ht[0] = n;
        return DICT_OK;
    }

    /* Prepare a second hash table for incremental rehashing */
    d->ht[1] = n;
    d->rehashidx = 0;
    return DICT_OK;
}

위의 함수를 잘 살펴보면 dict 구조체 안의 ht[1] = n으로 할당하는 코드가 있습니다. 이 얘기는 hash table이 두 개라는 것입니다. 먼저 dict 구조체를 살펴보면 다음과 같습니다.

typedef struct dictht {
    dictEntry **table;
    unsigned long size;
    unsigned long sizemask;
    unsigned long used;
} dictht;

typedef struct dict {
    dictType *type;
    void *privdata;
    dictht ht[2];
    long rehashidx; /* rehashing not in progress if rehashidx == -1 */
    int iterators; /* number of iterators currently running */
} dict;

실제로, redis의 rehashing 중에는 Hash Table이 두개가 존재합니다. 이것은 앞에 설명했듯이… 한번에 rehash step이 끝나지 않고, 매번 하나의 bucket 별로 rehashing을 하기 때문입니다. 즉 hash table의 확장이 일어나면 다음과 같이 두 개의 hash table 이 생깁니다.

redis_hash_expand_1

그리고 한스텝이 자나갈 때 마다 하나의 Bucket 단위로 해싱이 됩니다. 즉 첫번째 rehash step에서는 다음과 같이 ht[0]에 있던 데이터들이 ht[1]으로 나뉘어서 들어가게 됩니다.

redis_hash_rehash_1

두 번째, 세 번째, 네 번째 rehash 스텝이 끝나면 완료되게 됩니다.

redis_hash_rehash_2

그럼 의문이 생깁니다. Rehashing 중에 추가 되는 데이터는? 또는 삭제나 업데이트는? 추가 되는 데이터는 이 때는 무조건 ht[1]으로 들어가게 됩니다.(또 해싱 안해도 되게…), 두 번째로, 검색이나, 업데이트는, 이 때 ht[0], ht[1]을 모두 탐색하게 됩니다.(어쩔 수 없겠죠?)

dictRehash 함수에서 이 rehash step을 처리하게 됩니다. dictRehash 함수의 파라매터 n은 이 스텝을 몇번이나 할 것인가 이고, 실제로 수행할 hash table의 index는 함수중에서 ht[0]의 table이 NULL인 부분을 스킵하면서 찾게 됩니다. 그리고 ht[0]의 used 값이 0이면 rehash가 모두 끝난것이므로 ht[1]을 ht[0]로 변경하고 rehashidx를 다시 -1로 셋팅하면서 종료하게 됩니다.

/* Performs N steps of incremental rehashing. Returns 1 if there are still
 * keys to move from the old to the new hash table, otherwise 0 is returned.
 * Note that a rehashing step consists in moving a bucket (that may have more
 * than one key as we use chaining) from the old to the new hash table. */
int dictRehash(dict *d, int n) {
    if (!dictIsRehashing(d)) return 0;

    while(n--) {
        dictEntry *de, *nextde;

        /* Check if we already rehashed the whole table... */
        if (d->ht[0].used == 0) {
            zfree(d->ht[0].table);
            d->ht[0] = d->ht[1];
            _dictReset(&d->ht[1]);
            d->rehashidx = -1;
            return 0;
        }

        /* Note that rehashidx can't overflow as we are sure there are more
         * elements because ht[0].used != 0 */
        assert(d->ht[0].size > (unsigned long)d->rehashidx);
        while(d->ht[0].table[d->rehashidx] == NULL) d->rehashidx++;
        de = d->ht[0].table[d->rehashidx];
        /* Move all the keys in this bucket from the old to the new hash HT */
        while(de) {
            unsigned int h;

            nextde = de->next;
            /* Get the index in the new hash table */
            h = dictHashKey(d, de->key) & d->ht[1].sizemask;
            de->next = d->ht[1].table[h];
            d->ht[1].table[h] = de;
            d->ht[0].used--;
            d->ht[1].used++;
            de = nextde;
        }
        d->ht[0].table[d->rehashidx] = NULL;
        d->rehashidx++;
    }
    return 1;
}

이제 다시 scan으로 돌아오면… Rehashing 중의 dictScan 함수는 다음과 같습니다.

    } else {
        t0 = &d->ht[0];
        t1 = &d->ht[1];

        /* Make sure t0 is the smaller and t1 is the bigger table */
        if (t0->size > t1->size) {
            t0 = &d->ht[1];
            t1 = &d->ht[0];
        }

        m0 = t0->sizemask;
        m1 = t1->sizemask;

        /* Emit entries at cursor */
        de = t0->table[v & m0];
        while (de) {
            fn(privdata, de);
            de = de->next;
        }

        /* Iterate over indices in larger table that are the expansion
         * of the index pointed to by the cursor in the smaller table */
        do {
            /* Emit entries at cursor */
            de = t1->table[v & m1];
            while (de) {
                fn(privdata, de);
                de = de->next;
            }

            /* Increment bits not covered by the smaller mask */
            v = (((v | m0) + 1) & ~m0) | (v & m0);

            /* Continue while bits covered by mask difference is non-zero */
        } while (v & (m0 ^ m1));
    }

실제로 이미 Rehashing이 된 bucket의 경우는 ht[0] 작은 hash table에는 이미 index의 값이 NULL이므로 실제로 돌지 않지만, 아직 rehash되지 않은 bucket의 경우는 ht[0] 와 ht[1] 의 두 군데, 즉 총 세 군데에 데이터가 존재할 수 있습니다. 그래서 먼저 ht[0]의 bucket을 돌고 나서, ht[1]을 찾게 됩니다. 여기서 당연히 ht[1]에서는 두 군데를 검색해야 하므로 두 번 돌게 됩니다.

v = (((v | m0) + 1) & ~m0) | (v & m0);

즉 위의 식은 만약 v가 0이고 m0 = 3, m1 = 7이라고 하면… (((0 | 3) + 1) & ~3) | (0 & 3) 이 되게 됩니다. ~3은 Bitwise NOT 3이 되므로 -4 가 나오고, (4 & -4) | 0 이므로 결론은 4 & -4 입니다. 3은 00000011, bitwise NOT하면 11111100 이 되므로, 즉 다시 풀면, 00000100 & 11111100 해서 00000100 즉 4가 나오게됩니다. 처음에는 index 0, 두번째는 index 4 가 되는 거죠. 그래서 첫 루프를 돌게 됩니다. 다시 4 & (m0 ^ m1) == 4 이므로 …

이제 두 번째 루프에서 다시 (((4 | 3) + 1) & -4) | (4 & 3) 이므로… 4 | 3 = 7, 4 & 3 = 0 이므로… 다시 한번 정리하면 ((7+1) & -4) | 0 이므로 결론은 8 & -4 = 4 가 되므로 00001000 & 111111100 이 되므로 v 는 이번에는 00001000 즉 8이 됩니다. 즉 한번 돌 때 마다, ht[0]의 size 만큼 증가하게 됩니다.(다들 한방에 이해하실 텐데… 이걸 설명한다고…) 그래서 그 다음번에는 8 & 4 가 되므로 루프가 끝나게 됩니다. 즉, 0, 4 이렇게 ht[1]에서 두 번 읽어야 하니, 두 번 읽는 코드를 만들어둔거죠.

이제 다음편에는 cursor 가 어떻게 만들어지는가에 대해서 간단하게 설명하도록 하겠습니다. (다음편은 짧을듯…)

[입 개발] Redis Scan은 어떻게 동작할까? PART #1

처음부터 꾸준히 입만 놀리는 입개발(or 혀로그래머) CharSyam입니다. Redis의 기능중에 사람들이 쓰면 안되지만, 그 단맛에 끌려 어쩔 수 없이 치게 되는 명령이 KEYS 입니다. KEYS를 쓰는 순간, Redis는 이 명령을 처리하기 위해서 멈춰버립니다. 특히 트래픽이 많은 서버는… 이 KEYS 명령 하나에 많은 장애를 내게 됩니다. 그런데… 어느 순간… Redis에 Scan이라는 명령이 생겼습니다. KEYS의 단점을 없애면서도, 느리지 않은… 그렇다면, Redis에서는 어떻게 Scan 이 동작하게 될까요?

Scan의 내부 동작을 알기 전에… 먼저 Redis가 어떻게 데이터를 저장하는지 부터 다시 한번 집고 넘어가야 합니다. Redis 의 가장 기초적인 자료구조는 KV 즉 Key/Value 형태를 저장하는 것입니다.(String 타입이라고도 합니다.) 이를 위해 Redis는 Bucket을 활용한 Chained Linked List 구조를 사용합니다. 최초에는 4개의 Bucket에서 사용하여… 같은 Bucket에 들어가는 Key는 링크드 리스트 형태로 저장하는 거죠. 즉 다음 그림과 같습니다.

redis_hash_1

이 Chained Linked List에는 다음과 같은 약점이 있습니다. 즉 한 Bucket 안에 데이터가 많아지면 결국 탐색 속도가 느려집니다. 이를 위해서 Redis는 특정 사이즈가 넘을때마다 Bucket을 두 배로 확장하고, Key들을 rehash 하게 됩니다. 먼저 이 때 Key의 Hash로 사용하는 해시함수는 다음과 같습니다. MurmurHash2를 사용합니다.

/* MurmurHash2, by Austin Appleby
 * Note - This code makes a few assumptions about how your machine behaves -
 * 1. We can read a 4-byte value from any address without crashing
 * 2. sizeof(int) == 4
 *
 * And it has a few limitations -
 *
 * 1. It will not work incrementally.
 * 2. It will not produce the same results on little-endian and big-endian
 *    machines.
 */
unsigned int dictGenHashFunction(const void *key, int len) {
    /* 'm' and 'r' are mixing constants generated offline.
     They're not really 'magic', they just happen to work well.  */
    uint32_t seed = dict_hash_function_seed;
    const uint32_t m = 0x5bd1e995;
    const int r = 24;

    /* Initialize the hash to a 'random' value */
    uint32_t h = seed ^ len;

    /* Mix 4 bytes at a time into the hash */
    const unsigned char *data = (const unsigned char *)key;

    while(len >= 4) {
        uint32_t k = *(uint32_t*)data;

        k *= m;
        k ^= k >> r;
        k *= m;

        h *= m;
        h ^= k;

        data += 4;
        len -= 4;
    }

    /* Handle the last few bytes of the input array  */
    switch(len) {
    case 3: h ^= data[2] << 16;
    case 2: h ^= data[1] << 8;
    case 1: h ^= data[0]; h *= m;
    };

    /* Do a few final mixes of the hash to ensure the last few
     * bytes are well-incorporated. */
    h ^= h >> 13;
    h *= m;
    h ^= h >> 15;

    return (unsigned int)h;
}

그리고 hash 값이 들어가야 할 hash table 내의 index를 결정하는 방법은 다음과 같습니다.

/* Returns the index of a free slot that can be populated with
 * a hash entry for the given 'key'.
 * If the key already exists, -1 is returned.
 *
 * Note that if we are in the process of rehashing the hash table, the
 * index is always returned in the context of the second (new) hash table. */
static int _dictKeyIndex(dict *d, const void *key)
{
    ......
    h = dictHashKey(d, key);
    for (table = 0; table <= 1; table++) {
        idx = h & d->ht[table].sizemask;
        ......
    }
    return idx;
}

table에는 Key를 찾기위해 비트 연산을 하기 위한 sizemask가 들어가 있습니다. 초기에는 table의 bucket이 4개 이므로 sizemask는
이진수로 11 즉 3의 값이 셋팅됩니다. 즉 해시된 결과 & 11의 연산결과로 들어가야 하는 Bucket이 결정되게 됩니다.

여기서 Key 가 많아지면 Redis는 Table의 사이즈를 2배로 늘리게 됩니다. 그러면 당연히 sizemask도 커지게 됩니다. Table size가 8이면 sizemask는 7이 됩니다.

먼저 간단하게 말하자면, Scan의 원리는 이 Bucket을 한 턴에 하나씩 순회하는 것입니다. 그래서 아래 그림과 같이 처음에는 Bucket Index 0를 읽고 데이터를 던져주는 것입니다.

redis_scan_0

        t0 = &(d->ht[0]);
        m0 = t0->sizemask;

        /* Emit entries at cursor */
        de = t0->table[v & m0];
        while (de) {
            fn(privdata, de);
            de = de->next;
        }

다음 편에서는 실제 사용되는 cursor 가 어떤 식으로 만들어지는 지, 그 외의 예외 케이스는 어떤 것이 있는지… 그리고 scan 을 사용할 때 주의사항에는 어떤 것이 있는지 알아보도록 하겠습니다.(이거 적고, 안 쓸지도 ㅎㅎㅎ)

[Python] Simple Fabric 코드…

간단하게 Fabric 을 좀 봤는데… 좋네요. 다음과 같은 형태로 사용할 수 있는데, 다음과 같이 사용할 수 있습니다.

sudo pip install fabric
from fabric import tasks
from fabric.state import env
from fabric.api import run

def uptime():
    res = run('uptime')
    return res

ret = tasks.execute(uptime)
print ret

위에서 uptime() 에서 return 을 한 결과가 밑에 json 형태로 보여줍니다. 그것들이 tasks.execute의 결과에
호스트별로 맵 형태로 결과가 넘어옵니다.

[localhost] Executing task 'uptime'
[localhost] run: uptime
[localhost] out: 22:13  up 16 days, 21:25, 9 users, load averages: 1.55 1.88 1.96
[localhost] out:

{'localhost': '22:13  up 16 days, 21:25, 9 users, load averages: 1.55 1.88 1.96'}

[입 개발] Memcached 의 delete 에 대한 변천사…

최근에 twemproxy 에서 delete 가 제대로 안된다는 제보를 듣고, 코드를 살펴보길 시작했었는데요. 여기서 재미난 것을 발견해서 포스팅을 할려고 합니다.(오래간만의 포스팅이네요.)

먼저 최근의 memcached 소스의 delete 를 보면 다음과 같습니다. 코드를 보면 ntokens 가 3보다 클 경우, 뒤에 0이 나오거나 noreply 가 있을때만 허용이 되고 있습니다.

즉 다음과 같은 경우가 허용이 됩니다. 다만 number는 0만 허용이 됩니다.

delete <key>
delete <key> <number>
delete <key> <noreply>
delete <key> <number> <noreply>

다음 코드를 보면… 쉽게 이해가 되실껍니다.

static void process_delete_command(conn *c, token_t *tokens, const size_t ntokens) {
    char *key;
    size_t nkey;
    item *it;

    assert(c != NULL);

    if (ntokens > 3) {
        bool hold_is_zero = strcmp(tokens[KEY_TOKEN+1].value, "0") == 0;
        bool sets_noreply = set_noreply_maybe(c, tokens, ntokens);
        bool valid = (ntokens == 4 && (hold_is_zero || sets_noreply))
            || (ntokens == 5 && hold_is_zero && sets_noreply);
        if (!valid) {
            out_string(c, "CLIENT_ERROR bad command line format.  "
                       "Usage: delete <key> [noreply]");
            return;
        }
    }

    key = tokens[KEY_TOKEN].value;
    nkey = tokens[KEY_TOKEN].length;

    if(nkey > KEY_MAX_LENGTH) {
        out_string(c, "CLIENT_ERROR bad command line format");
        return;
    }

    if (settings.detail_enabled) {
        stats_prefix_record_delete(key, nkey);
    }

    it = item_get(key, nkey);
    if (it) {
        MEMCACHED_COMMAND_DELETE(c->sfd, ITEM_key(it), it->nkey);

        pthread_mutex_lock(&c->thread->stats.mutex);
        c->thread->stats.slab_stats[it->slabs_clsid].delete_hits++;
        pthread_mutex_unlock(&c->thread->stats.mutex);

        item_unlink(it);
        item_remove(it);      /* release our reference */
        out_string(c, "DELETED");
    } else {
        pthread_mutex_lock(&c->thread->stats.mutex);
        c->thread->stats.delete_misses++;
        pthread_mutex_unlock(&c->thread->stats.mutex);

        out_string(c, "NOT_FOUND");
    }
}

그런데 java의 spymemcached 나 ruby 라이브러리를 보면 실제 위의 파트를 보내지
않습니다. 그런데 python의 경우는 time을 지정할 경우는 값이 전달되면서 실제로
memcached에서 delete가 실패하게 됩니다. 디폴트로는 0이 전달되게 되므로 사실 큰 문제는 없습니다.

    def delete_multi(self, keys, time=0, key_prefix='', noreply=False):
        """Delete multiple keys in the memcache doing just one query.
        >>> notset_keys = mc.set_multi({'a1' : 'val1', 'a2' : 'val2'})
        >>> mc.get_multi(['a1', 'a2']) == {'a1' : 'val1','a2' : 'val2'}
        1
        >>> mc.delete_multi(['key1', 'key2'])
        1
        >>> mc.get_multi(['key1', 'key2']) == {}
        1
        This method is recommended over iterated regular L{delete}s as
        it reduces total latency, since your app doesn't have to wait
        for each round-trip of L{delete} before sending the next one.
        @param keys: An iterable of keys to clear
        @param time: number of seconds any subsequent set / update
        commands should fail. Defaults to 0 for no delay.
        @param key_prefix: Optional string to prepend to each key when
            sending to memcache.  See docs for L{get_multi} and
            L{set_multi}.
        @param noreply: optional parameter instructs the server to not send the
            reply.
        @return: 1 if no failure in communication with any memcacheds.
        @rtype: int
        """

        self._statlog('delete_multi')

        server_keys, prefixed_to_orig_key = self._map_and_prefix_keys(
            keys, key_prefix)

        # send out all requests on each server before reading anything
        dead_servers = []

        rc = 1
        for server in six.iterkeys(server_keys):
            bigcmd = []
            write = bigcmd.append
            extra = ' noreply' if noreply else ''
            if time is not None:
                for key in server_keys[server]:  # These are mangled keys
                    write("delete %s %d%s\r\n" % (key, time, extra))
            else:
                for key in server_keys[server]:  # These are mangled keys
                    write("delete %s%s\r\n" % (key, extra))
            try:
                server.send_cmds(''.join(bigcmd))
            except socket.error as msg:
                rc = 0
                if isinstance(msg, tuple):
                    msg = msg[1]
                server.mark_dead(msg)
                dead_servers.append(server)

        # if noreply, just return
        if noreply:
            return rc

        # if any servers died on the way, don't expect them to respond.
        for server in dead_servers:
            del server_keys[server]

        for server, keys in six.iteritems(server_keys):
            try:
                for key in keys:
                    server.expect("DELETED")
            except socket.error as msg:
                if isinstance(msg, tuple):
                    msg = msg[1]
                server.mark_dead(msg)
                rc = 0
        return rc

그런데 이게 twemproxy 에서는 문제를 일으킵니다. 현재 twemproxy는 다음과 같은 룰만 허용합니다.

delete <key>

즉 뒤에 0이 붙으면… 그냥 잘라버립니다. T.T 그래서 실제로 python-memcache를 쓸 경우, delete_multi를 하면 실제 delete가 안되는 거죠. 흑흑흑…

그런데… 여기서 이유를 찾은게 문제가 아니라… 왜 이 코드가 있을까요? 이 의문을 가진건 위의 memcached 코드에 0이 아니면 에러가 나게 된 코드가… github을 보면… 이 코드가 2009년 11월 25일에 커밋된 코드라는 겁니다.(지금이 2014년인데!!!)

그래서 memcached 코드를 1.4.0 부터 현재 버전까지 뒤져봤습니다. 해당 코드는 그대로입니다. -_- 약간 변경이 있긴한데… 위의 time을 쓰는 코드는 아니었습니다. 그럼 이건 뭐지 할 수 있습니다.

실제로 1.2.7 소스를 보니 defer_delete 라고 해서 시간을 주면 해당 시간 뒤에 삭제되는 명령이 있었습니다. 정확히는 to_delete라는 곳에 넣어두고, item의 expire time을 지워져야할 시간으로 셋팅해주는 겁니다.

그러니 그 관련 코드가 아직까지도 python-memcache에 생존해 있던 것이죠 T.T 흑흑흑, 아마 현재는 거의 아무도 안쓰지 않을가 싶은… 다른 언어도 만들어서 쓸 수 있지만… 최신 버전의 memcached에서는 불가능 하다는…

그냥 이런 이슈가 있다는 걸… 알고 넘어가시면 좋을듯 합니다. 실제로 delete key time noreply 로 현재의 memcached에 호출하면, 실제로는 내부적으로 에러로 처리되서 실제로 안지워지지만, 응답은 먹히는 버그가 있습니다만… 이건 noreply 명령이 여러개일때 실제로 어는것이 에러가 난지를 알 수가 없기 때문에 그냥 known issue로 하고… 못 고친다는…