[책 리뷰] 비주얼 컴플렉시티

해당 리뷰는 “한빛미디어”에서 제공하는 도서를 이용하여 리뷰를 진행하였습니다.

 

사실 책을 받기 전에 제목과 샘플만 보고 느낀점은 “꼭 읽어봐야 할 것 같다” 뭔가 도움이 되겠구나 라는 느낌이었습니다.

그러나 책을 받고 실제로 읽어보면서.. 제 머리속에서 뭔가 큰 소리가 들렸습니다.
“큰일났다… 엄청 어렵구나…”

이 책의 제목처럼 비주얼 컴플렉시티는 “복잡한 정보를 효과적으로 표현하는 놀라운 시각화 기법” 이라는 부제를 가지고 있는데요. 그러나… 실제로 관련 지식이 없는 사람에게는 일종의 시각화에 대한 철학책 처럼 보일 수 있습니다.(네, 저한테 이렇게 보인다는 거죠.)

여러가지 주제와 이 내용을 어떻게 표현할지에 대한 내용들도 있고, 실제로 이런 정보를 이렇게 표현했다라는 부분도 있습니다. 예를 들어… 중간에 테러리스트를 표시하는 부분은 여러가지 이미지로(네트웍을 표현하는 이미지들) 해당 내용을 표시합니다. 물론 실제 나타내는 의미나 주제도 다 다르죠.

visual_1

네 이런 정보가 가득합니다. 솔직하게 제가 이쪽 분야에 대해서 전혀 알지를 못하기 때문에… 책을 그림 책 보듯이 읽을 수 밖에 없었습니다. 그냥 이렇게도 표현할 수 있다라는 느낌으로… 다만… 그림책으로 보더라도 상당히 코퀄리티의 느낌을 받을 수 있습니다. 이렇게 그래픽으로 대부분 표현해야 하는 내용들이다 보니… 전부 컬러라는…

다만 순수 기술쪽만 보다가 이런 종류의 책을 보니… 이해하지는 못해도… 뭔가 멋져 보인다는 것만… (이번 리뷰는 망했어요. T.T)

저에게 좌절만 안겨준 책이지만… 반대로 이런 식으로 데이터를 표현한다는 것은… 좀 색다른 경험이긴 합니다.

[책 리뷰] 클라우드 시스템을 관리하는 기술

해당 리뷰는 “한빛미디어”에서 제공하는 도서를 이용하여 리뷰를 진행하였습니다.

딱 책을 받았을 때의 첫 느낌은… “어렵겠다” 였습니다. 일단 목차만 봐도 실제로 굉장히 많은 내용을 담고 있는 것으로 보이고, 저자가 구글에서 “Site resilience Engineer” 로 일했기 때문에, 구글 내부의 특별한 기술들을 가지고 있지 않을까 생각했습니다.(외국은 업무에 따라서 부르는 명칭이 많이 다른데, 위의 직종은 시스템이 장애가 나더라도 잘 운영될 수 있도록 하거나, 서비스의 확장에 무리없도록 해주는 서버 엔지니어를 말합니다. – DevOps 쪽 역할이라고 할수 있습니다.)

일단 내용 자체는 첫 느낌 그대로 방대하고 어렵습니다입니다. 저 역시 서버 엔지니어로 경력이 조금 있긴 하지만, 한번에 제대로 이해하기는 어려운 내용들이었습니다.(실제 내용이 Software Engineer 보다는 System Engineer 쪽에 좀 더 취중해 있는 내용이라, 어떻게 보면 모르는게 당연합니다.)

Global DNS 시스템을 구성하는 방법이라든지…(Kakao의 Anycast 와 유사합니다. http://tech.kakao.com/2014/05/29/anycast/) 또는 소프트웨어 로드 밸런서가 좋을지 하드웨어 로드 밸런서가 좋을지 등등 굉장히 여러부분을 다루고 있습니다.

이쪽 계통에서 일을 하고 있는 사람이나, 관심이 있는 사람이나 충분히 읽어보면 꽤 도움이 될만한 서적입니다.(초급자보다는 어느정도 지식이 있는 사람들에게 좀 더 좋을듯 합니다.) 다만, 워낙 많은 내용을 다룰려고 하다보니, 아주 상세한 내용보다는, 이런것들을 이런데 쓴다 정도의 개념만 설명하고 넘어가는 경우가 많습니다.

읽다보면 “사례 연구” 라고 해서 회색으로 표시된 부분들이 있는데, 자세히 읽어보고 넘어가시는게 좋습니다. 실제로 대규모 서비스를 운영하는 곳에서, 어떤 문제들이 있는지에 대해서 아주 가볍게 말하고 넘어가는데… 꼭 알아두어야 하는 부분들이지만, 반대로… 왜 그런가에 대한… 이유가 충분히 나오지는 않는… 경우도, 또한 구글의 엔지니어였기 때문에, 구글 수준에서는 쉽게 해결할 수 있겠지만, 일반적인 경우에는 해결하기 어려운 문제들도 꽤 있습니다.

결론적으로 내용이 방대하고 어려운부분(기반지식이 없다면…) 이 이 책의 장점이자, 단점이지 않을까 싶습니다. 그러나, 실제로 이정도 규모의 서비스를 경험해보기가 쉽지 않기 때문에, 그리고 이정도로 방대하게 풀어낸 책은 아직 없는듯 합니다. 꼭 이해하지 못하더라도 한번 읽어보고, 이해할려고 노력해보는 것이 필요한 책입니다.

[입 개발] Redis Cluster 에서의 Pub/Sub은 어떻게 동작할까?

저도 잘 모르고 있다가, 오늘 질문을 받고 급하게 찾아보고 아는 척을 한게 있습니다. 저야 뭐, 레디스 클러스터를 잘 안쓰고 있어서… 예전에 살짝 살펴보고 관심이 많이 떨어져 있기도 한…(비겁한 변명입니다.)

오늘의 질문은 “Redis Cluster 에서 Pub/Sub이 동작하는가?” 였습니다. 아마 다들 아시고 있으실 것 같지만… 일단 답 부터 하면 “동작합니다. 아무 노드에서나 subscribe 하고 아무 노드에서나 publish 하면 다 받을 수 있습니다.” 가 답입니다.

그럼 어떻게 구현이 되어 있을가요?

먼저 subscribe 동작 부터 살펴보면 subscribe 동작은 기존의 redis 와 동일합니다.

void subscribeCommand(client *c) {
   int j;

   for (j = 1; j < c->argc; j++)
        pubsubSubscribeChannel(c,c->argv[j]);
        c->flags |= CLIENT_PUBSUB;
   }
}

int pubsubSubscribeChannel(client *c, robj *channel) {
    dictEntry *de;
    list *clients = NULL;
    int retval = 0;

    /* Add the channel to the client->channels hash table */
    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
        retval = 1;
        incrRefCount(channel);
        /* Add the client to the channel->list of clients hash table */
        de = dictFind(server.pubsub_channels,channel);
        if (de == NULL) {
            clients = listCreate();
            dictAdd(server.pubsub_channels,channel,clients);
            incrRefCount(channel);
        } else {
            clients = dictGetVal(de);
        }
        listAddNodeTail(clients,c);
    }
    /* Notify the client */
    addReply(c,shared.mbulkhdr[3]);
    addReply(c,shared.subscribebulk);
    addReplyBulk(c,channel);
    addReplyLongLong(c,clientSubscriptionsCount(c));
    return retval;
}

즉 클라이언트는 클러스터내의 아무(마스터중에서) Redis 서버에 접속해서 subscribe 명령을 날립니다.
그러면 해당 Redis 서버에는 server.pubsub_channels 라는 dict 안에 해당 channel 이 생성되게 됩니다.

이제 중요한 부분은 Publish 부분입니다. 그러나 간단합니다. 사실 Redis Cluster Spec 이나 github issue #1927 이런 걸 보면 Cluster Bus니 하면서 굉장히 복잡하게 보이지만… 그냥 publish는 모든 마스터에 이 채널에 이런 메시지가 왔다라고 전달하게 됩니다. 그러면 해당 서버는 다시 해당 채널에 등록된 클라이언트들에게 메시지를 보내는 아주 간단한 구조입니다.

void publishCommand(client *c) {
    int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
    if (server.cluster_enabled)
        clusterPropagatePublish(c->argv[1],c->argv[2]);
    else
        forceCommandPropagation(c,PROPAGATE_REPL);
    addReplyLongLong(c,receivers);
}

void clusterPropagatePublish(robj *channel, robj *message) {
    clusterSendPublish(NULL, channel, message);
}

void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
    unsigned char buf[sizeof(clusterMsg)], *payload;
    clusterMsg *hdr = (clusterMsg*) buf;
    uint32_t totlen;
    uint32_t channel_len, message_len;

    channel = getDecodedObject(channel);
    message = getDecodedObject(message);
    channel_len = sdslen(channel->ptr);
    message_len = sdslen(message->ptr);

    clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_PUBLISH);
    totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
    totlen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len;

    hdr->data.publish.msg.channel_len = htonl(channel_len);
    hdr->data.publish.msg.message_len = htonl(message_len);
    hdr->totlen = htonl(totlen);

    /* Try to use the local buffer if possible */
    if (totlen < sizeof(buf)) {
        payload = buf;
    } else {
        payload = zmalloc(totlen);
        memcpy(payload,hdr,sizeof(*hdr));
        hdr = (clusterMsg*) payload;
    }
    memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
    memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
        message->ptr,sdslen(message->ptr));

    if (link)
        clusterSendMessage(link,payload,totlen);
    else
        clusterBroadcastMessage(payload,totlen);

    decrRefCount(channel);
    decrRefCount(message);
    if (payload != buf) zfree(payload);
}

void clusterBroadcastMessage(void *buf, size_t len) {
    dictIterator *di;
    dictEntry *de;

    di = dictGetSafeIterator(server.cluster->nodes);
    while((de = dictNext(di)) != NULL) {
        clusterNode *node = dictGetVal(de);

        if (!node->link) continue;
        if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
            continue;
        clusterSendMessage(node->link,buf,len);
    }
    dictReleaseIterator(di);
}

아래로 내려가면 최종적으로 clusterSendMessage를 모든 노드에 보내는 것을 알 수 있습니다. 맨 마지막 코드에서 보면 자기자신은 제외하는데, 이미 젤 위의 publishCommand 에서 자기 자신에 존재하는 채널을 찾아서 이미 보내고 있습니다.

그래서 결국 아무 Redis 서버에나 subscribe를 하고 아무 Redis 서버에서 publish를 하면 pub/sub 이 동작합니다. 그런데!!! 그런데!!! 그런데!!! 과연 이런 구조가 아무런 문제가 없을까요? 하나 더 집어드리자면… 간단하게 생각해서 A노드에만 subscribe 된 B라는 채널이 있고 마스터가 10대라면… 의미없는 9대의 서버에도 현재는 모두 메시지를 전달해야 합니다. 밴드위스 낭비와, 노드가 많아질때마다 성능상 문제가 있을 수도 있습니다. 여기에 대해서 걱정하는 글들도 있긴합니다만, 현재까지는… 좋은 방법이 구현되어 있지는 않습니다. 일단 해당 관련 내용은 다음 글들을 참고해보시기 바랍니다.

issue 2672
issue 122

[책 리뷰] 초보자를 위한 안드로이드 스튜디오

[해당 리뷰는 한빛미디어에서 진행하는 한빛리더스모임에 의해서 제공받은 책으로 진행했습니다.]

 

먼저 이 책은, “따라하기” 식으로 구성되어 있습니다. 처음부터, 끝까지 따라가면 안드로이드 스튜디오에 대한 기본적인 지식을 익히면서, 간단한(정말 간단하지는 T.T) 앱들을 구현하고, 실제 구글스토어에 올려볼 수 있습니다.

따라하기 식이다 보니, 안드로이드 프로그래밍 자체에 대해서 깊은 지식을 알려주지는 않지만 반대로,  쉽게 간단한 프로그램을 만들 수 있습니다.(좀 더 깊은 지식이 필요하다면 안드로이드 프로그래밍 전문서를 읽는 것을 추천합니다.)

그런데 뭔가 이상한걸 느끼지 않으셨나요? 이 책의 제목은 “초보자를 위한 안드로이드 스튜디오” 입니다. 제목만 보면 안드로이드 스튜디오만 설명할 것 같지만… 실제로 책의 앞부분에는 안드로이드 스튜디오를 이용한 방법이 많이 설명되고 있고, 책 전체적으로 따라하기가 안드로이드 스튜디오를 통해서 이루어지고 있습니다. 그렇지만, 안드로이드 개발에 대해서도 다루고 있는 책입니다.

책의 원서를 찾아보면, 실제로 제목이 안드로이드 스튜디오를 이용한 안드로이드 개발의 교과서 라는 이름입니다.(읽다보니, 웬지 개발서적인데… 이름이 이상해서 결국 인터넷 검색을 통해서 원서를 찾아봤습니다. 역시… 원서이름은… 둘다 포함하는)

원서는 여기서(http://www.amazon.co.jp/Android-Studio%E3%81%A7%E3%81%AF%E3%81%98%E3%82%81%E3%82%8BAndroid%E3%82%A2%E3%83%97%E3%83%AA%E9%96%8B%E7%99%BA%E3%81%AE%E6%95%99%E7%A7%91%E6%9B%B8-%EF%BD%9EAndroid-Studio-%E6%95%99%E7%A7%91%E6%9B%B8%E3%82%B7%E3%83%AA%E3%83%BC%E3%82%BA/dp/483995643X/ref=sr_1_7?ie=UTF8&qid=1456153918&sr=8-7&keywords=Android+Studio)

그렇다고, 완전 초보자용 도서도 아닙니다. 내용을 제대로 이해하기 위해서는
안드로이드에 대한 기본적인 지식이 필요합니다. 실제 예제도 채팅 클라이언트와 간단한 벽돌깨기 같은 게임을 만드는 예제가 있습니다.

실제로 예제를 설명하면서 필요한 지식도 어느정도 잘 설명하고 있습니다.(일본 서적들이 대부분 이런 부분에서 강점을 보여주더군요.) 채팅앱의 경우에, 사실 네트웍 동작이나, 실제 내부 저장같은 부분은 다 빠져있고, 실제 화면에 보이는 부분들만 있는게 좀 아쉽긴 합니다. 반대로 음성인식 API 사용이 있으니 이건 좋네요.

마지막으로 해당 앱을 구글스토어에 어떻게 올리는지에 대해서도 설명하고 있습니다. 코드 사이닝을 하고, 각 해상도에 맞는 아이콘을 등록하고, 이미지를 올리는… 이런게 귀찮지만, 꼭 알아둬야만 하는 내용들인데… 그런 것도 꼼꼼히 알려주고 있습니다.

 

[입 개발] Kafka 에서 auto.offset.reset 의 사용법

최근에 서버로그를 컨슈밍해서 뭔가 작업할 일이 생겼습니다. 데이터를 매일 보면서 실제 얼마나 이벤트가 있었는지 볼려고 하니, 실제 로그의 수와 저장된 이벤트의 수가 엄청 다른 것입니다.

확인을 해보니, 로그가… 몇일 전꺼부터 계속 돌고 있었습니다. -_-;;; 즉, 몇일 치가 하루데이터라고 생각하고 계속 저장되고 있었으니, 생각하는 값보다 훨씬 많이 오버된 T.T

저만 그런 문제가 있는가 해서 뒤져보니, auto.offset.reset 이라는 환경설정 값을 찾을 수 있었습니다. auto.offset.reset 은 다음과 같이 설명이 되어 있습니다.

What to do when there is no initial offset in Zookeeper or if an offset is out of range:
* smallest : automatically reset the offset to the smallest offset
* largest : automatically reset the offset to the largest offset
* anything else: throw exception to the consumer. If this is set to largest, the consumer may lose some messages when the number of partitions, for the topics it subscribes to, changes on the broker. To prevent data loss during partition addition, set auto.offset.reset to smallest

일단 설정이 안되었을 때의 기본 값은 largest 입니다.(아주 옛날 소스는 autooffset.reset 이라는 설정에, latest 라는 설정으로 되어있었지만… 현재는 바뀌어 있습니다.)

smallest 는 가지고 있는 오프셋 값 중에 가장 작은 값을 사용합니다. largest는 반대로 가장 최신 offset 을 사용하게 됩니다.

handleOffsetOutOfRange 에서는 autoOffsetReset 값을 통해서 startTimestamp를 설정하고 실제 newOffset 을 earliestOrLatestOffset 을 통해서 가져오게 됩니다.

<pre>def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long = {
  val startTimestamp = config.autoOffsetReset match {
    case OffsetRequest.SmallestTimeString =&gt; OffsetRequest.EarliestTime
    case OffsetRequest.LargestTimeString =&gt; OffsetRequest.LatestTime
    case _ =&gt; OffsetRequest.LatestTime
  }
  val newOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, startTimestamp, Request.OrdinaryConsumerId)
  val pti = partitionMap(topicAndPartition)
  pti.resetFetchOffset(newOffset)
  pti.resetConsumeOffset(newOffset)
  newOffset
}</pre>

그럼 이 옵션을 언제 적용하게 되느냐? 즉, 바꿔말하면 handleOffsetOutOfRange이 언제 호출되는가를 알아야 합니다. AbstractFetcherThread.scala를 보게 되면 Errors.NONE 일 때는 해당 결과 offset 으로 현재 offset을 업데이트 합니다. partitionMap에 PartitionFetchState 로 그 offset을 저장합니다.

<pre>private def processFetchRequest(fetchRequest: REQ) {
  val partitionsWithError = new mutable.HashSet[TopicAndPartition]
  var responseData: Map[TopicAndPartition, PD] = Map.empty

  try {
    trace("Issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))
    responseData = fetch(fetchRequest)
  } catch {
    case t: Throwable =&gt;
      if (isRunning.get) {
        warn(s"Error in fetch $fetchRequest", t)
        inLock(partitionMapLock) {
          partitionsWithError ++= partitionMap.keys
          // there is an error occurred while fetching partitions, sleep a while
          partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
        }
      }
  }
  fetcherStats.requestRate.mark()

  if (responseData.nonEmpty) {
    // process fetched data
    inLock(partitionMapLock) {

      responseData.foreach { case (topicAndPartition, partitionData) =&gt;
        val TopicAndPartition(topic, partitionId) = topicAndPartition
        partitionMap.get(topicAndPartition).foreach(currentPartitionFetchState =&gt;
          // we append to the log if the current offset is defined and it is the same as the offset requested during fetch
          if (fetchRequest.offset(topicAndPartition) == currentPartitionFetchState.offset) {
            Errors.forCode(partitionData.errorCode) match {
              case Errors.NONE =&gt;
                try {
                  val messages = partitionData.toByteBufferMessageSet
                  val validBytes = messages.validBytes
                  val newOffset = messages.shallowIterator.toSeq.lastOption match {
                    case Some(m: MessageAndOffset) =&gt; m.nextOffset
                    case None =&gt; currentPartitionFetchState.offset
                  }
                  partitionMap.put(topicAndPartition, new PartitionFetchState(newOffset))
                  fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = Math.max(0L, partitionData.highWatermark - newOffset)
                  fetcherStats.byteRate.mark(validBytes)
                  // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
                  processPartitionData(topicAndPartition, currentPartitionFetchState.offset, partitionData)
                } catch {
                  case ime: CorruptRecordException =&gt;
                    // we log the error and continue. This ensures two things
                    // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag
                    // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and
                    // should get fixed in the subsequent fetches
                    logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentPartitionFetchState.offset  + " error " + ime.getMessage)
                  case e: Throwable =&gt;
                    throw new KafkaException("error processing data for partition [%s,%d] offset %d"
                      .format(topic, partitionId, currentPartitionFetchState.offset), e)
                }
              case Errors.OFFSET_OUT_OF_RANGE =&gt;
                try {
                  val newOffset = handleOffsetOutOfRange(topicAndPartition)
                  partitionMap.put(topicAndPartition, new PartitionFetchState(newOffset))
                  error("Current offset %d for partition [%s,%d] out of range; reset offset to %d"
                    .format(currentPartitionFetchState.offset, topic, partitionId, newOffset))
                } catch {
                  case e: Throwable =&gt;
                    error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e)
                    partitionsWithError += topicAndPartition
                }
              case _ =&gt;
                if (isRunning.get) {
                  error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id,
                    partitionData.exception.get))
                  partitionsWithError += topicAndPartition
                }
            }
          })
      }
    }
  }

  if (partitionsWithError.nonEmpty) {
    debug("handling partitions with error for %s".format(partitionsWithError))
    handlePartitionsWithErrors(partitionsWithError)
  }
}</pre>

두 번째로는 Errors.OFFSET_OUT_OF_RANGE 에러가 발생했을 때입니다. 이 에러가 발생하면, 아까 본 handleOffsetOutOfRange 함수를 통해서 다시 시작 newOffset을 설정하게 됩니다. 그럼 이 에러는 언제 발생하게 될까요?

Kafka는 consumer 가 자신이 처리한 위치 까지의 offset을 가지고 있는 구조입니다.(보통 주키퍼에 /consumers/{groupid} 형태로 기록이 되지요.) 그러니, 주키퍼의 해당 consumers 목록에 없다면 해당 설정대로 값을 읽게 됩니다. 두 번째로 이미 데이터가 접근할 수 없는 offset 즉, 이미 지워진 상태면… 해당 값을 읽어올 수 없다면, auto.commit.reset의 설정이 동작하게 됩니다.

저는 처음에 이것이 kafka 내에 설정값이 있고 해당 값보다 크면, 항상 새로 읽을 줄 알았습니다. T.T

그럼, 어떻게 항상 최신 데이터부터만 읽을 것인가? 가장 쉬운 방법은 해당 groupid에 속하는 오프셋을 필요할 때마다 지워주는 것입니다. 이러면 항상 최신의 데이터 부터 읽어오는 것을 알 수 있습니다. 두 번째는 굉장히 빨리 데이터를 지우도록 설정하는 것인데, 이러면 의도하지 않은 동작(그 주기 이전부터 읽어오게 되므로) 원하는 형태는 아닙니다.

[입 개발] memcached 의 binary protocol 과 text protocol의 동작은 조금씩 다르다.

보통 가장 유명한 캐시 솔루션을 뽑으라면… Memcached 나 Redis 가 될 가능성이 높습니다.(당연히 다른 좋은 솔루션들도 많지만…) 가장 많이 쓰이는 건 이 두가지가 아닐듯 합니다.

Redis의 경우는 text protocol 만 지원하지만, memcached의 경우는 text와 binary protocol 두 가지를 모두 지원합니다. 실제로 binary protocol을 사용하면 당연히 속도도 더 빠릅니다.

그런데… 당연히 text/binary protocol 두 가지를 지원하다고 하면 두 동작이 모두 동일할꺼라고 생각하기 쉽습니다. 그러나… memcached 에서 text와 binary protocol은 미묘하게 동작이 다른 경우가 있습니다. 그리고 그 대표적인 예가 incr 입니다.

먼저 text protocol에서 “incr” 커맨드의 동작을 살펴보면 다음과 같습니다. 실제 text protocol은 process_command 라는 함수에서 동작하게 됩니다. 소스가 엄청 길어서 중간에 “incr”를 찾아보면 process_arithmetic_command 함수를 호출한다는 것만 알면됩니다.

static void process_command(conn *c, char *command) {

    token_t tokens[MAX_TOKENS];
    size_t ntokens;
    int comm;

    assert(c != NULL);

    MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->rcurr, c->rbytes);

    if (settings.verbose > 1)
        fprintf(stderr, "<%d %s\n", c->sfd, command);

    /*
     * for commands set/add/replace, we build an item and read the data
     * directly into it, then continue in nread_complete().
     */

    c->msgcurr = 0;
    c->msgused = 0;
    c->iovused = 0;
    if (add_msghdr(c) != 0) {
        out_of_memory(c, "SERVER_ERROR out of memory preparing response");
        return;
    }

    ntokens = tokenize_command(command, tokens, MAX_TOKENS);
    if (ntokens >= 3 &&
        ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
         (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {

        process_get_command(c, tokens, ntokens, false);
    } else if ((ntokens == 6 || ntokens == 7) &&
               ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
                (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
                (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
                (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
                (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {

        process_update_command(c, tokens, ntokens, comm, false);

    } else if ((ntokens == 7 || ntokens == 8) && (strcmp(tokens[COMMAND_TOKEN].value, "cas") == 0 && (comm = NREAD_CAS))) {

        process_update_command(c, tokens, ntokens, comm, true);

    } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {

        process_arithmetic_command(c, tokens, ntokens, 1);

    } else if (ntokens >= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "gets") == 0)) {

        process_get_command(c, tokens, ntokens, true);

    } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {

        process_arithmetic_command(c, tokens, ntokens, 0);

    } else if (ntokens >= 3 && ntokens <= 5 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {

        process_delete_command(c, tokens, ntokens);

    } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "touch") == 0)) {

        process_touch_command(c, tokens, ntokens);

    } else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) {

        process_stat(c, tokens, ntokens);
    } else if (ntokens >= 2 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) {
        time_t exptime = 0;
        rel_time_t new_oldest = 0;

        set_noreply_maybe(c, tokens, ntokens);

        pthread_mutex_lock(&c->thread->stats.mutex);
        c->thread->stats.flush_cmds++;
        pthread_mutex_unlock(&c->thread->stats.mutex);

        if (!settings.flush_enabled) {
            // flush_all is not allowed but we log it on stats
            out_string(c, "CLIENT_ERROR flush_all not allowed");
            return;
        }

        if (ntokens != (c->noreply ? 3 : 2)) {
            exptime = strtol(tokens[1].value, NULL, 10);
            if(errno == ERANGE) {
                out_string(c, "CLIENT_ERROR bad command line format");
                return;
            }
        }

        /*
          If exptime is zero realtime() would return zero too, and
          realtime(exptime) - 1 would overflow to the max unsigned
          value.  So we process exptime == 0 the same way we do when
          no delay is given at all.
        */
        if (exptime > 0) {
            new_oldest = realtime(exptime);
        } else { /* exptime == 0 */
            new_oldest = current_time;
        }
        if (settings.use_cas) {
            settings.oldest_live = new_oldest - 1;
            if (settings.oldest_live <= current_time)
                settings.oldest_cas = get_cas_id();
        } else {
            settings.oldest_live = new_oldest;
        }
        out_string(c, "OK");
        return;

    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "version") == 0)) {

        out_string(c, "VERSION " VERSION);

    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "quit") == 0)) {

        conn_set_state(c, conn_closing);

    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "shutdown") == 0)) {

        if (settings.shutdown_command) {
            conn_set_state(c, conn_closing);
            raise(SIGINT);
        } else {
            out_string(c, "ERROR: shutdown not enabled");
        }
    } else if (ntokens > 1 && strcmp(tokens[COMMAND_TOKEN].value, "slabs") == 0) {
        if (ntokens == 5 && strcmp(tokens[COMMAND_TOKEN + 1].value, "reassign") == 0) {
            int src, dst, rv;

            if (settings.slab_reassign == false) {
                out_string(c, "CLIENT_ERROR slab reassignment disabled");
                return;
            }

            src = strtol(tokens[2].value, NULL, 10);
            dst = strtol(tokens[3].value, NULL, 10);

            if (errno == ERANGE) {
                out_string(c, "CLIENT_ERROR bad command line format");
                return;
            }

            rv = slabs_reassign(src, dst);
            switch (rv) {
            case REASSIGN_OK:
                out_string(c, "OK");
                break;
            case REASSIGN_RUNNING:
                out_string(c, "BUSY currently processing reassign request");
                break;
            case REASSIGN_BADCLASS:
                out_string(c, "BADCLASS invalid src or dst class id");
                break;
            case REASSIGN_NOSPARE:
                out_string(c, "NOSPARE source class has no spare pages");
                break;
            case REASSIGN_SRC_DST_SAME:
                out_string(c, "SAME src and dst class are identical");
                break;
            }
            return;
        } else if (ntokens == 4 &&
            (strcmp(tokens[COMMAND_TOKEN + 1].value, "automove") == 0)) {
            process_slabs_automove_command(c, tokens, ntokens);
        } else {
            out_string(c, "ERROR");
        }
    } else if (ntokens > 1 && strcmp(tokens[COMMAND_TOKEN].value, "lru_crawler") == 0) {
        if (ntokens == 4 && strcmp(tokens[COMMAND_TOKEN + 1].value, "crawl") == 0) {
            int rv;
            if (settings.lru_crawler == false) {
                out_string(c, "CLIENT_ERROR lru crawler disabled");
                return;
            }

            rv = lru_crawler_crawl(tokens[2].value);
            switch(rv) {
            case CRAWLER_OK:
                out_string(c, "OK");
                break;
            case CRAWLER_RUNNING:
                out_string(c, "BUSY currently processing crawler request");
                break;
            case CRAWLER_BADCLASS:
                out_string(c, "BADCLASS invalid class id");
                break;
            case CRAWLER_NOTSTARTED:
                out_string(c, "NOTSTARTED no items to crawl");
                break;
            }
            return;
        } else if (ntokens == 4 && strcmp(tokens[COMMAND_TOKEN + 1].value, "tocrawl") == 0) {
            uint32_t tocrawl;
             if (!safe_strtoul(tokens[2].value, &tocrawl)) {
                out_string(c, "CLIENT_ERROR bad command line format");
                return;
            }
            settings.lru_crawler_tocrawl = tocrawl;
            out_string(c, "OK");
            return;
        } else if (ntokens == 4 && strcmp(tokens[COMMAND_TOKEN + 1].value, "sleep") == 0) {
            uint32_t tosleep;
            if (!safe_strtoul(tokens[2].value, &tosleep)) {
                out_string(c, "CLIENT_ERROR bad command line format");
                return;
            }
            if (tosleep > 1000000) {
                out_string(c, "CLIENT_ERROR sleep must be one second or less");
                return;
            }
            settings.lru_crawler_sleep = tosleep;
            out_string(c, "OK");
            return;
        } else if (ntokens == 3) {
            if ((strcmp(tokens[COMMAND_TOKEN + 1].value, "enable") == 0)) {
                if (start_item_crawler_thread() == 0) {
                    out_string(c, "OK");
                } else {
                    out_string(c, "ERROR failed to start lru crawler thread");
                }
            } else if ((strcmp(tokens[COMMAND_TOKEN + 1].value, "disable") == 0)) {
                if (stop_item_crawler_thread() == 0) {
                    out_string(c, "OK");
                } else {
                    out_string(c, "ERROR failed to stop lru crawler thread");
                }
            } else {
                out_string(c, "ERROR");
            }
            return;
        } else {
            out_string(c, "ERROR");
        }
    } else if ((ntokens == 3 || ntokens == 4) && (strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) {
        process_verbosity_command(c, tokens, ntokens);
    } else {
        out_string(c, "ERROR");
    }
    return;
}

다시 process_arithmetic_command를 살펴보면 다음과 같습니다.

static void process_arithmetic_command(conn *c, token_t *tokens, const size_t ntokens, const bool incr) {
    char temp[INCR_MAX_STORAGE_LEN];
    uint64_t delta;
    char *key;
    size_t nkey;

    assert(c != NULL);

    set_noreply_maybe(c, tokens, ntokens);

    if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
        out_string(c, "CLIENT_ERROR bad command line format");
        return;
    }

    key = tokens[KEY_TOKEN].value;
    nkey = tokens[KEY_TOKEN].length;

    if (!safe_strtoull(tokens[2].value, &delta)) {
        out_string(c, "CLIENT_ERROR invalid numeric delta argument");
        return;
    }

    switch(add_delta(c, key, nkey, incr, delta, temp, NULL)) {
    case OK:
        out_string(c, temp);
        break;
    case NON_NUMERIC:
        out_string(c, "CLIENT_ERROR cannot increment or decrement non-numeric value");
        break;
    case EOM:
        out_of_memory(c, "SERVER_ERROR out of memory");
        break;
    case DELTA_ITEM_NOT_FOUND:
        pthread_mutex_lock(&c->thread->stats.mutex);
        if (incr) {
            c->thread->stats.incr_misses++;
        } else {
            c->thread->stats.decr_misses++;
        }
        pthread_mutex_unlock(&c->thread->stats.mutex);

        out_string(c, "NOT_FOUND");
        break;
    case DELTA_ITEM_CAS_MISMATCH:
        break; /* Should never get here */
    }
}

case DELTA_ITEM_NOT_FOUND 를 보면 아이템이 없으면 incr_misses를 증가시키고, NOT_FOUND를 리턴합니다. 즉 아이템이 없으면 단순히 없다고 리턴하는 것입니다.

그런데 incr의 binary protocol에서의 동작을 살펴보면 조금 미묘하게 다릅니다.
일단 binary protocol 에서 incr이 수행될때 최종 함수는 complete_incr_bin 이라는 함수입니다.

static void complete_incr_bin(conn *c) {
    item *it;
    char *key;
    size_t nkey;
    /* Weird magic in add_delta forces me to pad here */
    char tmpbuf[INCR_MAX_STORAGE_LEN];
    uint64_t cas = 0;

    protocol_binary_response_incr* rsp = (protocol_binary_response_incr*)c->wbuf;
    protocol_binary_request_incr* req = binary_get_request(c);

    assert(c != NULL);
    assert(c->wsize >= sizeof(*rsp));

    /* fix byteorder in the request */
    req->message.body.delta = ntohll(req->message.body.delta);
    req->message.body.initial = ntohll(req->message.body.initial);
    req->message.body.expiration = ntohl(req->message.body.expiration);
    key = binary_get_key(c);
    nkey = c->binary_header.request.keylen;

    if (settings.verbose > 1) {
        int i;
        fprintf(stderr, "incr ");

        for (i = 0; i < nkey; i++) {
            fprintf(stderr, "%c", key[i]);
        }
        fprintf(stderr, " %lld, %llu, %d\n",
                (long long)req->message.body.delta,
                (long long)req->message.body.initial,
                req->message.body.expiration);
    }

    if (c->binary_header.request.cas != 0) {
        cas = c->binary_header.request.cas;
    }
    switch(add_delta(c, key, nkey, c->cmd == PROTOCOL_BINARY_CMD_INCREMENT,
                     req->message.body.delta, tmpbuf,
                     &cas)) {
    case OK:
        rsp->message.body.value = htonll(strtoull(tmpbuf, NULL, 10));
        if (cas) {
            c->cas = cas;
        }
        write_bin_response(c, &rsp->message.body, 0, 0,
                           sizeof(rsp->message.body.value));
        break;
    case NON_NUMERIC:
        write_bin_error(c, PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL, NULL, 0);
        break;
    case EOM:
        out_of_memory(c, "SERVER_ERROR Out of memory incrementing value");
        break;
    case DELTA_ITEM_NOT_FOUND:
        if (req->message.body.expiration != 0xffffffff) {
            /* Save some room for the response */
            rsp->message.body.value = htonll(req->message.body.initial);

            snprintf(tmpbuf, INCR_MAX_STORAGE_LEN, "%llu",
                (unsigned long long)req->message.body.initial);
            int res = strlen(tmpbuf);
            it = item_alloc(key, nkey, 0, realtime(req->message.body.expiration),
                            res + 2);

            if (it != NULL) {
                memcpy(ITEM_data(it), tmpbuf, res);
                memcpy(ITEM_data(it) + res, "\r\n", 2);

                if (store_item(it, NREAD_ADD, c)) {
                    c->cas = ITEM_get_cas(it);
                    write_bin_response(c, &rsp->message.body, 0, 0, sizeof(rsp->message.body.value));
                } else {
                    write_bin_error(c, PROTOCOL_BINARY_RESPONSE_NOT_STORED,
                                    NULL, 0);
                }
                item_remove(it);         /* release our reference */
            } else {
                out_of_memory(c,
                        "SERVER_ERROR Out of memory allocating new item");
            }
        } else {
            pthread_mutex_lock(&c->thread->stats.mutex);
            if (c->cmd == PROTOCOL_BINARY_CMD_INCREMENT) {
                c->thread->stats.incr_misses++;
            } else {
                c->thread->stats.decr_misses++;
            }
            pthread_mutex_unlock(&c->thread->stats.mutex);

            write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, NULL, 0);
        }
        break;
    case DELTA_ITEM_CAS_MISMATCH:
        write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, NULL, 0);
        break;
    }
}

아까 전과 비슷한 코드들이 있습니다. 다시 case DELTA_ITEM_NOT_FOUND 를 찾아보면 뭔가 다른 부분을 볼 수 있을 것입니다. else 문 부분은 text_protocol과 동일한데, if 문이 뭔가 추가되었네요.

        if (req->message.body.expiration != 0xffffffff) {
            /* Save some room for the response */
            rsp->message.body.value = htonll(req->message.body.initial);
    
            snprintf(tmpbuf, INCR_MAX_STORAGE_LEN, "%llu",
                (unsigned long long)req->message.body.initial);
            int res = strlen(tmpbuf);
            it = item_alloc(key, nkey, 0, realtime(req->message.body.expiration),
                            res + 2);
    
            if (it != NULL) {
                memcpy(ITEM_data(it), tmpbuf, res);
                memcpy(ITEM_data(it) + res, "\r\n", 2);

                if (store_item(it, NREAD_ADD, c)) {
                    c->cas = ITEM_get_cas(it);
                    write_bin_response(c, &rsp->message.body, 0, 0, sizeof(rsp->message.body.value));
                } else {
                    write_bin_error(c, PROTOCOL_BINARY_RESPONSE_NOT_STORED,
                                    NULL, 0);
                }
                item_remove(it);         /* release our reference */
            } else {
                out_of_memory(c,
                        "SERVER_ERROR Out of memory allocating new item");
            }
        }

코드를 보면 expiration 이 -1이 아니면 값을 셋팅하는 부분이 있습니다. 즉 binary protocol에서는 해당 키가 없더라도… 해당 시점에 생성이 가능합니다. text protocol 에서는 불가능 한데 말이죠. 또한, 최초에 아이템이 없을때만 생성이되고, 있을 경우에는 expiration을 변경하지도 않습니다. 코드를 보니… 궁금증들이 해결이 되는 것 같습니다.

[입 개발] RabbitMQ를 제대로 설치하는 방법

개발을 하다보면, Queue를 써야 하는 경우가 종종 있습니다. 뭔가를 비동기 적으로 처리하기 위해서, 또는 다이렉트로 들어가는 부담을 줄이기 위해서…

그런데 그러다보면 항상 고민되는 것이 바로 HA 입니다. 뭐, Queue의 모든 데이터가 날아가도 된다. 이러면 사실 큰 문제가 아니겠지만… 일반적으로 Queue의 데이터를 그렇게 날려도 되는 경우는 많지 않습니다.

일반적으로 많이 사용하는 Queue 는 여기서 소개할 RabbitMQ, ActiveMQ, Redis 기반의 Resque나 SideKiq(Resque 보다는 Sidekiq이 ㅎㅎㅎ) 아니면, 로그나 이런 대규모 부하에서는 거의 디팩토인 kafka 가 있습니다.

일단 각각 장단점이 있기 때문에, 여기서는 꽤 안정성을 보장해 주는 RabbitMQ의 HA를 구성하는 설치에 대해서만 설명하도록 하겠습니다.(다만 RabbitMQ의 경우는 운영하다 Consumer 가 느리면… 뻗어버리는 단점이…)

사실 설치 자체는 RabbitMQ 사용법을 그대로 따르면 됩니다. 별로 어려운 것도 없습니다.

CentOS면 yum으로, Ubuntu 면 apt로 쉽게 설치가 가능합니다. Rabbitmq 공식 페이지를 참조하면 됩니다.

그리고 보통은 클러스터를 설정합니다. 이 설정 방법도 쉽습니다. erlang 쿠키를 맞춰주고, join_cluster 만 해주면 됩니다. RabbitMQ HA Guide를 참고하시면 쉽습니다.

그러면 무엇을 해야하는가? 바로 이 Cluster가 그냥 쓰면 장애의 원인이 되기 때문입니다. 기본적으로, RabbitMQ에서 Cluster 모드가 되면, 채널 정보가 보통 Disk Node 에 저장되게 되는데, 이 서버가 내려가면… 해당 채널들은 모두 사용이 불가능해집니다.(장애시, HA를 하려고 설정한건데… 도리어, 장애가 나는거죠. 물론 그냥 죽어도 장애긴 합니다.)

그럼 제대로 RabbitMQ에서 HA를 구성하는 방법은 무엇일까요?

바로 mirrored queue를 운영하는 것입니다. RabbitMQ는 mirroed queue 라고 해서, 해당 Queue의 내용이 클러스터된 서버내에 중복되어 저장되는 기능을 의미합니다. 다만, 디폴트 설정으로는 설정이 된 시점의 데이터 부터, 복사가 되고 그 이전의 데이터는 복사되지 않습니다. (물론 이것도 옵션에 의해서 모두 가져가도록 할 수 있습니다.)

Mirrored Queue가 구성하는 방법 역시 RabbitMQ HA Guide에 잘 나와있습니다.

rabbitmqctl set_policy ha-all "^ha\." '{"ha-mode":"all"}

이러면… ha. 으로 시작하는 queue 들은 mirrored 형태로 구성하겠다는 것입니다. 실제 web ui에서 보면 Queue 이름에 두대로 클러스터가 되었다면 ha.test +1, 세 대라면 ha.test +2 식으로 구성되어야만 제대로 설정이 된 것입니다.

이제 어떤 서버로 연결할 것인가의 이슈가 생깁니다. 앞에 Load Balancer 를 붙여서(haproxy든 L4든) 장애난 서버를 제외하고 연결하게 하면… 제대로 된 서비스가 가능해집니다.