[입 개발] Redis Replication 이 실패하는 경우에 살펴 보아야 할 것들…

Redis 에서 Replication은 매우 핫 한 기능입니다. 다만, 메모리를 많이 쓰고 있는 경우에, Replication을 새로 걸어서 새로운 슬레이브를 추가하고 싶다면, Replication이 실패하는 경우가 생길 수 있습니다. 이 때, 어떤 것들을 보아야 할지… 알아보도록 하겠습니다.

1. 디스크의 여유 공간을 확인한다.
-> 의외로 쉽게 발생할 수 있는 이슈입니다. Redis는 Default로 fork 후에 메모리의 내용들을 압축해서 RDB로 저장하게 됩니다. 즉, 디스크에 여유 공간이 없으면 실패하게 됩니다. 이 때는 뭐, 간단하게 디스크 여유 공간을 만들어줌으로써, 해결할 수 있습니다.

3.0에서 추가될것으로 보이는 것 중에, 기존에는 Replication을 위해서 RDB 파일을 로컬에 저장하고 이를 읽어서 전달하는 방식이었는데, 이제, 파일을 저장하지 않고 메모리 상황에서 바로 보내는 기능이 추가되는 중이라서, 이 이슈는 점점 없어질 것으로 보입니다.

다만 2.8까지라면… 마스터/슬레이브 모두 디스크 여유 공간을 확인해야 합니다.

2. 디렉토리의 퍼미션을 확인한다.
-> 위와 같은 이유지만, 퍼미션 역시 확인해야 합니다. 이 경우 RDB 파일을 저장하지 못해서, 계속 마스터/슬레이브 연결이 실패하게 됩니다. 이때 재미난 상황은 슬레이브에서는 마스터랑 연결되었다고 나오지만… 마스터는 슬레이브가 연결된걸 확인하지 못합니다. SYNC 명령이 안끝나서 그렇게 인식이 되는… 재미난…

3. 메모리 사이즈를 확인한다.
-> 보통 버추얼 메모리라는 것은 물리 메모리 + Swap 사이즈를 말합니다. 이 크기를 넘어가면 프로세스가 죽을 수 있는데, vm.overcommit_memory 를 설정함으로써, 회피할 수 있습니다. 다만 이것도 너무 크기가 크면 실패할 수 있습니다.

sysctl vm.overcommit_memory=1

[입 개발] hadoop fs -tail [-f] URI 의 구현에 대해서

가끔씩 보면 tail -f 는 어떻게 동작할지에 대해서 궁금할때가 생깁니다. 얘가 무슨 수로… 뒤에를 계속 읽어올까?
OS에서 제공하는 notify 관련 함수를 이용할까? 등의 별별 생각을 해보지만… 사실 백가지 생각이 불여일견입니다.

hadoop cmd 는 각각의 command 클래스를 이용하는 CommandPattern 형태로 되어있습니다. 그러나 여기서 관심있는 것은 오직 tail 뿐… 그런데, 너무나 간단하게 Tail 소스만 까면… 끝납니다.

1. -f 옵션이 붙으면, 무한루프를 돈다.(파일 사이즈를 계산해서 offset 보다 적으면 종료)
2. 한번 looping 후에 followDelay 만큼 sleep 한다.
3. 기본적으로 현재 파일 사이즈의 끝 – 1024 만큼의 offset 부터 읽는다.
4. 한번에 읽어들이는 내용은 conf 에 정의되어 있다.(여기서 사이즈가 1024보다 적으면… 음… 맨 뒤가 아닐수도 있는데, 이부분은 뭔가 최저값이 셋팅되지 않을까?)

class Tail extends FsCommand {
  public static void registerCommands(CommandFactory factory) {
    factory.addClass(Tail.class, "-tail");
  }
..
  public static final String NAME = "tail";
  public static final String USAGE = "[-f] <file>";
  public static final String DESCRIPTION =
    "Show the last 1KB of the file.\n" +
    "\t\tThe -f option shows appended data as the file grows.\n";

  private long startingOffset = -1024;
  private boolean follow = false;
  private long followDelay = 5000; // milliseconds

  @Override
  protected void processOptions(LinkedList<String> args) throws IOException {
    CommandFormat cf = new CommandFormat(1, 1, "f");
    cf.parse(args);
    follow = cf.getOpt("f");
  }
  // TODO: HADOOP-7234 will add glob support; for now, be backwards compat
  @Override
  protected List<PathData> expandArgument(String arg) throws IOException {
    List<PathData> items = new LinkedList<PathData>();
    items.add(new PathData(arg, getConf()));
    return items;
  }

  @Override
  protected void processPath(PathData item) throws IOException {
    if (item.stat.isDirectory()) {
      throw new PathIsDirectoryException(item.toString());
    }

    long offset = dumpFromOffset(item, startingOffset);
    while (follow) {
      try {
        Thread.sleep(followDelay);
      } catch (InterruptedException e) {
        break;
      }
      offset = dumpFromOffset(item, offset);
    }
  }

  private long dumpFromOffset(PathData item, long offset) throws IOException {
    long fileSize = item.refreshStatus().getLen();
    if (offset > fileSize) return fileSize;
    // treat a negative offset as relative to end of the file, floor of 0
    if (offset < 0) {
      offset = Math.max(fileSize + offset, 0);
    }
....
    FSDataInputStream in = item.fs.open(item.path);
    try {
      in.seek(offset);
      // use conf so the system configured io block size is used
      IOUtils.copyBytes(in, System.out, getConf(), false);
      offset = in.getPos();
    } finally {
      in.close();
    }
    return offset;
  }
}

[입 개발] RabbitMQ 설치하기 for CentOS 6.5

1. OS: Centos 6.5
2. 설치
– erlang(EPEL)
-> wget -O /etc/yum.repos.d/epel-erlang.repo http://repos.fedorapeople.org/repos/peter/erlang/epel-erlang.repo
-> yum install erlang

– rabbitmq
-> http://www.rabbitmq.com/install-rpm.html
-> http://www.rabbitmq.com/releases/rabbitmq-server/v3.3.4/rabbitmq-server-3.3.4-1.noarch.rpm
-> sudo rpm -Uvh rabbitmq-server-3.3.4-1.noarch.rpm

3. 설정
– /etc/hosts 호스트를 등록한다.
– /etc/rabbitmq/rabbitmq.config

[
   {mnesia, [{dump_log_write_threshold, 1000}]},
   {rabbit, [
       {tcp_listeners, [5672]},
        {log_levels, [{connection, info}]}
   ]},
   {rabbitmq_management, [
       {listener,[{port, 55672}]},
       {redirect_old_port, false}
   ]}
 ].

– .erlang.cookie 설정(/var/lib/rabbitmq/.erlang.cookie)
-> 동일하게 맞춘다.

4. rabbitmq 설정(rabbit1, rabbit2 서버 두대인 경우)
– rabbit1> sudo service rabbitmq-server start
– rabbit2> sudo service rabbitmq-server start
– rabbit2> sudo rabbitctl stop_app
– rabbit2> sudo rabbitmqctl join_cluster –ram rabbit@indigo117
– rabbit2> sudo rabbitmqctl change_cluster_node_type disc
– rabbit2> sudo rabbitmqctl start_app
– rabbit1> sudo rabbitmqctl cluster_status
– rabbit2> sudo rabbitmqctl cluster_status
– rabbit1> sudo rabbitmqctl set_policy ha-all “^\.” ‘{“ha-mode”:”all”}’

5. web plugin(all node)
– rabbitmq-plugins enable rabbitmq_management
– sudo service rabbitmq-server restart

6. 유저 설정
– sudo rabbitmqctl delete_user guest
– sudo rabbitmqctl add_user test 1234
– sudo rabbitmqctl set_user_tags test administrator

7. http://servername:55672/

8. example
– pip install pika

#!/usr/bin/env python
import pika

#Very Important!!!
credentials = pika.PlainCredentials("id", "password")
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost', credentials=credentials))

channel = connection.channel()
channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()

[입 개발:Redis] 나의 잘못된 오해, AOF

오늘 제대로 알고 있지 못한 부분에 대해서 한 독자님의 지적을 받고… AOF 관련 코드를 유심히, 그리고 좀 자세히 보게 되었습니다. 그런데… 음… 제가 완전히 잘못 알고 있던 부분이 있었습니다.

일단, 저는 Redis 의 AOF 가 DB의 WAL(Write ahead Log) 의 변종이라고 생각하고 있습니다. 먼저 Write ahead Log 에 대해서 아주 간략하게 설명하자면…(이번에 조사하면서 WAL조차도 잘못 이해하고 있었다는 걸 알았습니다. T.T)

데이터의 변경이 발생하기 전에 이 변경사항에 대한 Log를 남기고, 이를 이용해서 Data의 durability 를 보장하는 방법입니다. 디비등에서는 실제 데이터 영역의 변경을 하기 전에, 이에 대한 변경 사항을 commit시에 Log로 남기고, 이를 이용해서 나중에 실제 데이터 영역을 변경하기 위해서 사용하기도 합니다.

여기서 중요한 부분은 Log 가 persistent 할 수도 있고, 아닐 수도 있다는 점입니다. 왜냐하면 매번 disk에 write가 발생하면, 느려질테니깐요. 그래서 보통 Log를 Buffering 하고 이를 한꺼번에 쓰는 형태의 작업을 하게됩니다. 그런데… 여기서 이제부터 일반적으로 고민이 시작되는거죠.

DB의 데이터는 중요하다. 그런데 insert, update, delete등에 의해서 변경이 벌어지는데, 이것이 log buffer에 쌓이고, 실제 디스크에 쓰여지지 않는다면, 데이터의 유실이 발생할 수도 있는 것입니다.

그래서 mysql 의 innodb 의 경우는 innodb_flush_log_at_trx_commit 옵션을 이용해서 Disk에 flush 하는 주기를 조절하거나, 매번하도록 되어있습니다.(여기서의 기준은 하나의 Query Event 가 그 단위가 되는 것입니다.)

그럼 Redis 의 AOF는 무엇이 다르냐?

어떻게 보면, 거의 유사합니다. 하지만 다음과 같은 부분이 다릅니다.
1. AOF buffer 에 데이터를 남기는 시점이, 실제 메모리에 데이터가 변경된 이후이다.
-> 데이터의 메모리 변경 후에, 커맨드를 만들어서 AOF buffer 에 저장한다.
2. 그리고 실제 disk 에 flushing 하는 시점은 매 event loop의 시작 부분인 beforesleep 에서 동작한다.
-> 즉 AOF buffer 들어 있는 내용은, 하나의 event loop가 모두 끝난 다음에 디스크에 쓰여진다.
-> 각각의 버퍼는 각 명령 수행뒤에 propagation 에서 만들어짐.
-> Redis 는 single thread로 동작하기 때문에, 이 사이에 만약 1024개의 커맨드가 처리되었다고 한다면,
그 사이에 장애가 발생하면 해당 데이터를 돌릴 수 있는 방법은 없다.

여기서 잘못된 저의 오해는
1. Mysql처럼 Query 단위로, 데이터의 변경이 발생하기 전에 Logging이 되어야 된다고 생각
-> 그러나 실제로 Redis 에서는 커맨드 실행 후에, AOF buffer 만 만들어서 저장
-> event loop 전의 beforeSleep 에서 flushAppendOnlyFile 을 호출해서 AOF Buffer 를 Disk에 Flush 함.

그러면 옵션의 appendfsync 는 어떻게 동작하는가? 다음과 같습니다.
1. aof buffer 의 디스크에 쓰기는 오직 flushAppendOnlyFile()에서만 저장된다.
2. appendfsync가 no 면, 그냥 beforeSleep때 마다 os의 write를 호출하고, 실제 os와 disk간의 sync 는 os에
맡긴다.
3. EVERYSEC의 현재 fsync 작업이 스레드 큐에 존재하면, write를 하지 않고 return.
없으면 write 후에 fsync 를 타 스레드에 하도록 돌림.
만약 계속 fsync 작업이 남아있는걸로 판단하면, 그냥 write 함.
EVERYSEC 으로 되어있지만, 해당 cron 작업에 따라, 더 느려질 가능성도 존재.
4. ALWAYS의 경우, 매번 beforeSleep에서 디스크에 쓰고, fsync 도 동기로 호출

즉 Redis 의 AOF는 어떤 옵션을 쓰더라도, Write가 많을 경우에는 장애가 발생할 경우, 바로 직전의 명령이 아니라, 한 이벤트 루프 안에서 업데이트된 꽤 많은 데이터가 유실될 가능성도 있다는 걸 알아두고 사용하시면 좋을듯 합니다.

서버를 만드실때는 포트를 32768 이전으로 설정하세요.

최근에 아주 재미난(?) 일을 격었습니다. 서버를 시작시키는데, 아무리해도 서버가 뜨지 않는 것이었죠.
코드를 봐서는 아무런 문제가 없는데… 에러 로그는

“binding error: port is already used” 비슷한 오류가 발생하는 것이었습니다.

netstat 으로 LISTEN 포트만 찾아봐서는… 제가 사용하는 포트를 찾을 수도 없었죠.

그런데 netstat으로 포트를 더 찾아보니… 이상한 결과를 볼 수 있었습니다.

tcp        0      0 192.168.1.4:48121          192.168.1.4:6379         ESTABLISHED 15598/redis-server

여기서 제 서버에서 사용하고자 하던 포트를 48121 이라고 가정합니다. -_-(잉, 뭔가 가정이 이상하다구요. 그렇죠, 저도 뭔가 많이 이상합니다. 그러나 그것이 현실로 일어났습니다.)

그리고 다시 한번 제목을 보시죠.. 일단 포트에 대해서 잘 모르는 지식을 잠시 정리하자면 다음과 같습니다.

  • socket 이 바인딩하게 되는 포트는 udp/tcp 는 별개다. 즉 udp:53, tcp:53번 모두 바인딩 가능합니다. 실제로 DNS가 일반적으로 이렇게 하고 있죠.
  • socket 이 바인딩하게 되는 주소는 ip + port 다 즉 1.2.3.4 를 가지고 있는 서버는 127.0.0.1:48121 과 1.2.3.4:48121 를 따로 바인딩할 수 있다.

즉 다음과 같은 예제를 보면 에러가 발생해야 합니다.

# -*- coding: utf-8 -*-.

from flask import Flask, request, redirect, url_for, jsonify

app = Flask(__name__,static_folder='static', static_url_path='')

@app.route('/health_check.html')
def health():
    return "OK"

if __name__ == '__main__':
    app.run(host="127.0.0.1", port=20000)

두 개를 실행시키면 다음과 같이 오류가 발생합니다.

 * Running on http://127.0.0.1:20000/
Traceback (most recent call last):
  File "2.py", line 23, in <module>
    app.run(host="127.0.0.1", port=20000)
  File "/Users/charsyam/anaconda/lib/python2.7/site-packages/flask/app.py", line 772, in run
    run_simple(host, port, self, **options)
  File "/Users/charsyam/anaconda/lib/python2.7/site-packages/werkzeug/serving.py", line 710, in run_simple
    inner()
  File "/Users/charsyam/anaconda/lib/python2.7/site-packages/werkzeug/serving.py", line 692, in inner
    passthrough_errors, ssl_context).serve_forever()
  File "/Users/charsyam/anaconda/lib/python2.7/site-packages/werkzeug/serving.py", line 486, in make_server
    passthrough_errors, ssl_context)
  File "/Users/charsyam/anaconda/lib/python2.7/site-packages/werkzeug/serving.py", line 410, in __init__
    HTTPServer.__init__(self, (host, int(port)), handler)
  File "/Users/charsyam/anaconda/lib/python2.7/SocketServer.py", line 419, in __init__
    self.server_bind()
  File "/Users/charsyam/anaconda/lib/python2.7/BaseHTTPServer.py", line 108, in server_bind
    SocketServer.TCPServer.server_bind(self)
  File "/Users/charsyam/anaconda/lib/python2.7/SocketServer.py", line 430, in server_bind
    self.socket.bind(self.server_address)
  File "/Users/charsyam/anaconda/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
socket.error: [Errno 48] Address already in use

이제 강제로 아이피를 설정해보겠습니다.

127.0.0.1, 192.168.1.4, 0.0.0.0 으로 세 개의 ip로 실행을 시켰습니다. 모두 20000 번으로 실행했습니다.

charsyam ~/kakao_home_affiliate (master*) $ python 1.py
 * Running on http://127.0.0.1:20000/

charsyam ~/kakao_home_affiliate (master*) $ python 2.py
 * Running on http://0.0.0.0:20000/

charsyam ~/kakao_home_affiliate (master*) $ python 3.py
 * Running on http://192.168.1.4:20000/

charsyam ~ $ telnet 0 20000 <-- 이건 실제로 local loopback으로 전달되므로 127.0.0.1 과 동일합니다.
Trying 0.0.0.0...
Connected to 0.
Escape character is '^]'.
GET /health_check.html HTTP/1.1

HTTP/1.0 200 OK
Content-Type: text/html; charset=utf-8
Content-Length: 2
Server: Werkzeug/0.9.4 Python/2.7.5
Date: Mon, 14 Apr 2014 14:48:43 GMT

OK

charsyam ~ $ telnet 127.0.0.1 20000
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
GET /health_check.html HTTP/1.1

HTTP/1.0 200 OK
Content-Type: text/html; charset=utf-8
Content-Length: 2
Server: Werkzeug/0.9.4 Python/2.7.5
Date: Mon, 14 Apr 2014 14:48:53 GMT

OK

charsyam ~ $ telnet 192.168.1.4 20000
Trying 192.168.1.4...
Connected to 192.168.1.4.
Escape character is '^]'.
GET /health_check.html HTTP/1.1

HTTP/1.0 200 OK
Content-Type: text/html; charset=utf-8
Content-Length: 2
Server: Werkzeug/0.9.4 Python/2.7.5
Date: Mon, 14 Apr 2014 14:49:08 GMT

OK

모두 동작하는 걸 볼 수 있습니다.

charsyam ~/kakao_home_affiliate (master*) $ python 1.py
 * Running on http://127.0.0.1:20000/
127.0.0.1 - - [14/Apr/2014 23:48:43] "GET /health_check.html HTTP/1.1" 200 -
127.0.0.1 - - [14/Apr/2014 23:48:53] "GET /health_check.html HTTP/1.1" 200 -

charsyam ~/kakao_home_affiliate (master*) $ python 2.py
 * Running on http://0.0.0.0:20000/

charsyam ~/kakao_home_affiliate (master*) $ python 3.py
 * Running on http://192.168.1.4:20000/
192.168.1.4 - - [14/Apr/2014 23:49:08] "GET /health_check.html HTTP/1.1" 200 -

실제로 0.0.0.0 이 INADDR_ANY이지만 127.0.0.1 이나 192.168.1.4로 명시하면, 각각의 주소가 명시되므로 거기로 전달되게 됩니다.
(이러면… port 를 탈취하는 것도 가능할것 같네요. 실제로 테스트해보면 192.168.1.4를 kill 하면 그 다음부터는 0.0.0.0 으로 전달이 되네요.)

자자… 그러면… 이제 무슨 일이 있었던 걸까요?

정리하자면… 실제 서버가 Redis 서버에 연결되면서 하필이면 192.168.1.4:48121를 만들어서 실제 192.168.1.4:6379 에 붙어버린겁니다. 그러면서 서버는 48121포트를 할당하려고 하니 이미 bind 에러가 발생한것이죠.(흑흑흑, 정말 재수가 없었던겁니다.)

그럼… 이를 해결하는 방법이 있을까요? 뭔가 할당되는 정책이 있을까요? 이것을… 사실 local port 라고 합니다. 외부로 나가기 위해서 만들어지는 port 이죠.

이 값은 그럼 어떻게 설정될까요? 기본적으로 shell 에서 sysctl -A 를 해보면 거기서 다음과 같은 두 개의 값을 발견할 수 있습니다.

net.ipv4.ip_local_port_range = 32768	61000
net.ipv4.ip_local_reserved_ports =

ip_local_port_range 를 보면 기본적으로 저 값 안에서 32768에서 61000번 사이에서 할당이 하게 됩니다. 즉, 외부에서 접속하는 소켓의 포트가 위의 범위에서 할당되므로, 이 값을 피하는게 좋습니다. 위의 32768, 61000이 기본으로 설정되는 범위입니다. 즉 다음과 같은 port는 서버 설정시에 피하시는게 좋습니다.

그럼 실제로 커널 코드를 까보면 다음과 같습니다. 코드는 지루할테니 설렁설렁 보겠습니다. linux kernel 3.14를 기본으로 합니다.
먼저 해당 범위 값을 가져오는 함수는 inet_get_local_port_range 함수입니다.

void inet_get_local_port_range(struct net *net, int *low, int *high)
{
  unsigned int seq;

  do {
    seq = read_seqbegin(&net->ipv4.sysctl_local_ports.lock);

    *low = net->ipv4.sysctl_local_ports.range[0];
    *high = net->ipv4.sysctl_local_ports.range[1];
  } while (read_seqretry(&net->ipv4.sysctl_local_ports.lock, seq));
}

그리고 이 함수를 inet_csk_get_port 에서 호출합니다. 이 코드에서 실제로 local_port 가 결정납니다.

    do {
      if (inet_is_reserved_local_port(rover))
        goto next_nolock;
      head = &hashinfo->bhash[inet_bhashfn(net, rover,
          hashinfo->bhash_size)];
      spin_lock(&head->lock);
      inet_bind_bucket_for_each(tb, &head->chain)
        if (net_eq(ib_net(tb), net) && tb->port == rover) {
          if (((tb->fastreuse > 0 &&
                sk->sk_reuse &&
                sk->sk_state != TCP_LISTEN) ||
               (tb->fastreuseport > 0 &&
                sk->sk_reuseport &&
                uid_eq(tb->fastuid, uid))) &&
              (tb->num_owners < smallest_size || smallest_size == -1)) {
            smallest_size = tb->num_owners;
            smallest_rover = rover;
            if (atomic_read(&hashinfo->bsockets) > (high - low) + 1 &&
                !inet_csk(sk)->icsk_af_ops->bind_conflict(sk, tb, false)) {
              snum = smallest_rover;
              goto tb_found;
            }
          }
         if (!inet_csk(sk)->icsk_af_ops->bind_conflict(sk, tb, false)) {
            snum = rover;
            goto tb_found;
          }
          goto next;
        }
      break;
    next:
      spin_unlock(&head->lock);
    next_nolock:
      if (++rover > high)
        rover = low;
    } while (--remaining > 0);

뭔가 코드는 복잡하지만 inet_is_reserved_local_port 를 이용해서 예약된 local_port 는 피하는 것을 알 수 있습니다.
(udp는 또 udp_lib_get_port 에서 결정이 됩니다.)

물론 여기서 좀 더 깊게는 더 커널을 살펴봐야 겠지만… 대략 위의 형태로 동작한다는 것을 알 수 있습니다. 일단 이번 글은 여기서 끝!!!

[입 개발] Redis 가 멀티스레드라구요?

제가 항상 강조하는 것중에 Redis는 멀티스레드가 아니고 싱글 스레드이기 때문에 항상 사용에 주의해야 한다고 말을 드렸는데… 뭔가 깽기는게 있어서 ps -eLf 를 해보겟습니다.

charsyam@charsyam-vm-main:~/redis$ ps -eLf | grep "redis"
charsyam 31860  2920 31860 10    3 22:58 pts/0    00:00:05 src/redis-server *:6379
charsyam 31860  2920 31861  0    3 22:58 pts/0    00:00:00 src/redis-server *:6379
charsyam 31860  2920 31862  0    3 22:58 pts/0    00:00:00 src/redis-server *:6379

헉… 무려 스레드가 3개가 떠 있습니다. 자 바로 주먹에 돌을 쥐시면서, 이 구라쟁이야 하시는 분들이 보이시는 군요. (퍽퍽퍽!!!)

자… 저는 분명히 맨날 싱글 스레드 싱글 스레드라고 외쳤는데… Redis는 무려 멀티 스레드 어플리케이션이었던 것입니다!!!

그러면… 이 스레드들을 늘리면… 엄청난 성능 향상이 있을까요?

힌트를 드리자면, 이 스레드들은… 성능과 영향은 있지만… 더 늘린다고 해서 성능 향상이 생기고 기존 명령이 한꺼번에 많이 처리되지는 않는다는 것입니다.

이게 무슨소리냐!!! 라고 하시는 분들이 계실껍니다.

먼저, 목숨을 부지하기 위해서 결론부터 말씀드리자면… 이 두 스레드는 Redis에서 데이터를 처리하는 스레드가 아닙니다.(진짜예요!!! 이번엔 믿어주세요 T.T)

Redis의 스레드를 처리하는 파일은 bio.h 와 bio.c 이고 bio.h 를 보면 다음과 같은 코드를 볼 수 있습니다.

#define REDIS_BIO_CLOSE_FILE    0 /* Deferred close(2) syscall. */
#define REDIS_BIO_AOF_FSYNC     1 /* Deferred AOF fsync. */
#define REDIS_BIO_NUM_OPS       2

REDIS_BIO_NUM_OPS는 몇개의 잡큐(개별 하나당 스레드)를 만들 것인지에 대한 내용이고, REDIS_BIO_CLOSE_FILE 과 REDIS_BIO_AOF_FSYNC를 보면… 아하.. 이것들이 뭔가 데이터 처리를 안할꺼라는 믿음이 생기시지 않습니까?(퍽퍽퍽)

크게 두 가지 함수가 존재합니다. 하나는 작업 큐에 작업을 넣는 bioCreateBackgroundJob 함수
그리고 이걸 실행하는 bioProcessBackgroundJobs 입니다.

void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
    struct bio_job *job = zmalloc(sizeof(*job));

    job->time = time(NULL);
    job->arg1 = arg1;
    job->arg2 = arg2;
    job->arg3 = arg3;
    pthread_mutex_lock(&bio_mutex[type]);
    listAddNodeTail(bio_jobs[type],job);
    bio_pending[type]++;
    pthread_cond_signal(&bio_condvar[type]);
    pthread_mutex_unlock(&bio_mutex[type]);
}

void *bioProcessBackgroundJobs(void *arg) {
    struct bio_job *job;
    unsigned long type = (unsigned long) arg;
    sigset_t sigset;

    /* Make the thread killable at any time, so that bioKillThreads()
     * can work reliably. */
    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
    pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);

    pthread_mutex_lock(&bio_mutex[type]);
    /* Block SIGALRM so we are sure that only the main thread will
     * receive the watchdog signal. */
    sigemptyset(&sigset);
    sigaddset(&sigset, SIGALRM);
    if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
        redisLog(REDIS_WARNING,
            "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));

    while(1) {
        listNode *ln;

        /* The loop always starts with the lock hold. */
        if (listLength(bio_jobs[type]) == 0) {
            pthread_cond_wait(&bio_condvar[type],&bio_mutex[type]);
            continue;
        }
        /* Pop the job from the queue. */
        ln = listFirst(bio_jobs[type]);
        job = ln->value;
        /* It is now possible to unlock the background system as we know have
         * a stand alone job structure to process.*/
        pthread_mutex_unlock(&bio_mutex[type]);

        /* Process the job accordingly to its type. */
        if (type == REDIS_BIO_CLOSE_FILE) {
            close((long)job->arg1);
        } else if (type == REDIS_BIO_AOF_FSYNC) {
            aof_fsync((long)job->arg1);
        } else {
            redisPanic("Wrong job type in bioProcessBackgroundJobs().");
        }
        zfree(job);

        /* Lock again before reiterating the loop, if there are no longer
         * jobs to process we'll block again in pthread_cond_wait(). */
        pthread_mutex_lock(&bio_mutex[type]);
        listDelNode(bio_jobs[type],ln);
        bio_pending[type]--;
    }
}

보면 두 개의 잡큐가 각각 CLOSE 와 AOF_FSYNC 를 처리하고 있습니다. 그리고 이 두 잡큐에 잡을 넣는 것은 모두 aof.c 에 존재합니다.

하나는 aof_background_fsync 함수이고, 나머지 하나는 backgroundRewriteDoneHandler 에서 호출하고 있습니다. 하나는 aof_를 닫을 때 이를 Async 하게 close 하기 위한 것이고, 또 하나는 aof 작업중에 fsync를 통해서 데이터를 동기화 시키는 부분입니다.

이 것들은 disk를 flush 하거나, 파일을 닫기위해서 OS 작업이 되는 것을 해당 스레드에서 하게 되면, 블럭이 되어서 다른 작업이 느려질 수 있으므로, 해당 작업들을 OS 레벨에서 비동기로 처리하기 위한 것입니다.

즉, 이 스레드들은… 더 늘릴 필요도 없고(AOF는 한번에 하나만 생성이 됩니다.) 더 늘린다고 해서 실제로 Redis 자체의 작업을 빠르게 해주는 게 아니라는 것입니다.

즉, 여전히 Redis 는 싱글 스레드라고 보셔야 합니다.

[입 개발] memcached slabclass 에 대해서…

오래간 만에 memcached 소슬 보니, 너무 잘못 이해하고 있거나 한것들이 많아서 처음부터 새로 보면서 이번에는 기록을 좀 남겨둘려고 합니다. 흔히들 memcached가 내부적으로 메모리를 어떻게 관리하는지 잘 아시지만, 코드 레벨로는 잘 모르실 수도 있기 때문에 그냥 정리합니다.

먼저 간단히 용어를 정리하자면…

  • chunk_size : key + value + flag 정보를 저장하기 위한 최소 크기: 기본 48
  • factor : item size 크기를 얼마만큼씩 증가시킬지 결정하는 값: 기본 1.25
  • Chunk_align_bytes : chunk 할당시에 사용하는 align : 8
  • item_size_max: 최대 item의 크기: 기본 1MB

그리고 사이즌 chunk_size * 1.25^(n-1) 형태로 증가하게 됨.

이제 slab.c를 보면 slabclass_t 를 볼 수 있습니다.

typedef struct {
    unsigned int size;      /* sizes of items */
    unsigned int perslab;   /* how many items per slab */

    void *slots;           /* list of item ptrs */
    unsigned int sl_curr;   /* total free items in list */

    unsigned int slabs;     /* how many slabs were allocated for this class */

    void **slab_list;       /* array of slab pointers */
    unsigned int list_size; /* size of prev array */

    unsigned int killing;  /* index+1 of dying slab, or zero if none */
    size_t requested; /* The number of requested bytes */
} slabclass_t;

static slabclass_t slabclass[MAX_NUMBER_OF_SLAB_CLASSES];

MAX_NUMBER_OF_SLAB_CLASSES 는 201로 정의되어 있습니다. 즉 최대 201개의 slabclass 가 만들어지는데, 실제로는 chunk_size 와 factor 값에 따라서 최대 item_size_max를 넘어가 버리면, slabclass는 거기까지만 사용됩니다.(slab_init 를 보면 쉽게 알 수 있습니다.)

/**
 * Determines the chunk sizes and initializes the slab class descriptors
 * accordingly.
 */
void slabs_init(const size_t limit, const double factor, const bool prealloc) {
    int i = POWER_SMALLEST - 1;
    unsigned int size = sizeof(item) + settings.chunk_size;

    ......
    ......

    while (++i < POWER_LARGEST && size <= settings.item_size_max / factor) {
        /* Make sure items are always n-byte aligned */
        if (size % CHUNK_ALIGN_BYTES)
            size += CHUNK_ALIGN_BYTES - (size % CHUNK_ALIGN_BYTES);

        slabclass[i].size = size;
        slabclass[i].perslab = settings.item_size_max / slabclass[i].size;
        size *= factor;
        if (settings.verbose > 1) {
            fprintf(stderr, "slab class %3d: chunk size %9u perslab %7u\n",
                    i, slabclass[i].size, slabclass[i].perslab);
        }
    }

    power_largest = i;
    slabclass[power_largest].size = settings.item_size_max;
    slabclass[power_largest].perslab = 1;
    if (settings.verbose > 1) {
        fprintf(stderr, "slab class %3d: chunk size %9u perslab %7u\n",
                i, slabclass[i].size, slabclass[i].perslab);
    }

    ......
    ......

위의 소스에 나오는 size는 slab별로 할당되는 기본 사이즈의 크기이고 위의 slabclass 구조체에는 item_size_max(기본 1MB) 를 넣어주고, perslab 에는 item_size_max / size로 몇개의 아이템이 들어갈 수 있는지 들어가게됩니다.

그리고 이 slab 안에 array로 item 들이 할당되게 됩니다. 기본적으로 array의 크기는 16으로 설정되고 그 뒤로는 2배씩 증가하게 됩니다. 관련 함수는 grow_slab_list를 보시면 됩니다. 그리고 slab에서 사용하는 chunk가 항상 item_size_max 인것은 아니고, size * perslab으로 될 때도 있습니다.(do_slabs_newslab 에서 확인 가능, memory_allocate 를 이용함)

static int do_slabs_newslab(const unsigned int id) {
    slabclass_t *p = &slabclass[id];
    int len = settings.slab_reassign ? settings.item_size_max
        : p->size * p->perslab;
    char *ptr;

    if ((mem_limit && mem_malloced + len > mem_limit && p->slabs > 0) ||
        (grow_slab_list(id) == 0) ||
        ((ptr = memory_allocate((size_t)len)) == 0)) {

        MEMCACHED_SLABS_SLABCLASS_ALLOCATE_FAILED(id);
        return 0;
    }

    memset(ptr, 0, (size_t)len);
    split_slab_page_into_freelist(ptr, id);

    p->slab_list[p->slabs++] = ptr;
    mem_malloced += len;
    MEMCACHED_SLABS_SLABCLASS_ALLOCATE(id);

    return 1;
}

slabclass 는 여기까지 하고, 다음번에는 실제 item 의 추가 삭제시에 어떻게 되는가에 대해서 정리해보도록 하겠습니다.

[Redis] Scan/SScan/ZScan/HScan 이야기…

Redis를 사용하다보면 특정 Key 목록을 가져오고 싶다는 욕망이 항상 생깁니다. 기본적으로 Single Thread 이기 때문에, 전체 Key를 도는 것은 너무 비용이 비싸서…
특정 Key단위로 나눠서 컬렉션에 넣고, 다시 이걸 이용해서 컬렉션을 만드는 짓(?)을 해야만 했었다는 문제가 있었습니다. 그래서, 결국, Redis에도 추가된 명령이 바로

scan/sscan/zscan/hscan 입니다.
scan은 전체 key 목록에서, sscan은 set 안에서, zscan 은 sorted set 안에서, hscan은 hash 안에서 키를 가져오는 명령입니다.

특징은 DB에서 처럼 cursor 라는 개념이 있고 이로 인해서 기본 10개 또는 지정된 개수만큼 가져오는 것이 가능합니다. 또한 keys 명령처럼 패턴으로도 찾을 수 있습니다.

다음과 같은 명령으로 간단하게 사용할 수 있습니다. scan 과 sscan/zscan/hscan의 차이는 key를 지정하는냐 마느냐의 차이 밖에 없습니다.(해당 키 안에서 동작해야하기 때문이죠.)

SCAN cursor [MATCH pattern] [COUNT count]
SSCAN key cursor [MATCH pattern] [COUNT count]
ZSCAN key cursor [MATCH pattern] [COUNT count]
HSCAN key cursor [MATCH pattern] [COUNT count]

샘플은 다음과 같습니다. 첫번째 결과가 다음 cursor 값인데 이것이 0이 나올때 까지 계속해서 호출하면 됩니다.

redis 127.0.0.1:6379> scan 0
1) "17"
2)  1) "key:12"
    2) "key:8"
    3) "key:4"
    4) "key:14"
    5) "key:16"
    6) "key:17"
    7) "key:15"
    8) "key:10"
    9) "key:3"
   10) "key:7"
   11) "key:1"
redis 127.0.0.1:6379> scan 17
1) "0"
2) 1) "key:5"
   2) "key:18"
   3) "key:0"
   4) "key:2"
   5) "key:19"
   6) "key:13"
   7) "key:6"
   8) "key:9"
   9) "key:11"

기본적으로 10개를 가져오게 되어있고, 가져올 개수(Count) 를 지정해줄 수 있습니다. 모든 scan은 scanGenericCommand 라는 공통 함수를 이용해서 처리가 됩니다.

void scanGenericCommand(redisClient *c, robj *o, unsigned long cursor) {
    ......
    ht = NULL;
    if (o == NULL) {
        ht = c->db->dict;
    } else if (o->type == REDIS_SET && o->encoding == REDIS_ENCODING_HT) {
        ht = o->ptr;
    } else if (o->type == REDIS_HASH && o->encoding == REDIS_ENCODING_HT) {
        ht = o->ptr;
        count *= 2; /* We return key / value for this type. */
    } else if (o->type == REDIS_ZSET && o->encoding == REDIS_ENCODING_SKIPLIST) {
        zset *zs = o->ptr;
        ht = zs->dict;
        count *= 2; /* We return key / value for this type. */
    }

    if (ht) {
        void *privdata[2];

        /* We pass two pointers to the callback: the list to which it will
         * add new elements, and the object containing the dictionary so that
         * it is possible to fetch more data in a type-dependent way. */
        privdata[0] = keys;
        privdata[1] = o;
        do {
            cursor = dictScan(ht, cursor, scanCallback, privdata);
        } while (cursor && listLength(keys) < count);
    } else if (o->type == REDIS_SET) {
        int pos = 0;
        int64_t ll;

        while(intsetGet(o->ptr,pos++,&ll))
            listAddNodeTail(keys,createStringObjectFromLongLong(ll));
        cursor = 0;
    } else if (o->type == REDIS_HASH || o->type == REDIS_ZSET) {
        unsigned char *p = ziplistIndex(o->ptr,0);
        unsigned char *vstr;
        unsigned int vlen;
        long long vll;

        while(p) {
            ziplistGet(p,&vstr,&vlen,&vll);
            listAddNodeTail(keys,
                (vstr != NULL) ? createStringObject((char*)vstr,vlen) :
                                 createStringObjectFromLongLong(vll));
            p = ziplistNext(o->ptr,p);
        }
        cursor = 0;
    } else {
        redisPanic("Not handled encoding in SCAN.");
    }

    ......

위의 코드에서 핵심적인 부분은, 찾아야 하는 key들을 keys라는 리스트에 추가하고, 이 리스트를 돌면서 패턴에 맞는 것들을 삭제한 결과를 돌려줍니다. “우와 대박이다.” 라고 생각할 수
있는데, 여기서 주의할 것이 있습니다. 기본적으로 딱 정해진 개수만큼 가져오는것이 아니라는 겁니다. 당연히 Redis는 다양한 자료구조를 지원하고 있고, 속도를 위해서 같은 자료구조라도
구현 방식이 여러가지입니다. 예를 들어, sorted set은 SKIPLIST 형태로 구현된것이 잇고, ziplist 형식도 있습니다. 그래서 또한 동작방식도 조금씩 다릅니다.(그래서 사실 이 글을 쓰고
있는거긴 한데 말입니다.)

다음과 같은 경우를 주의해야합니다.
1] 기본적으로 scan 의 경우 table 의 한 블럭을 가져오는 것이라서, 여기에 개수가 많으면 시간이 많이 걸릴 수도 있다.(다만, 리해싱 테이블이 bitmasking 크기만큼 커지므로, 한 블럭이 극단적으로 커질 가능성은 높지 않습니다.)
2] set/sorted set/hash 의 내부 구조가 hash table 이나 Skiplist 가 아닐 경우(ziplist 로 구현되어 있을 경우), 한 컬렉션의 모든 데이터를 가져오므로, keys와 비슷한 문제가 그대로 발생할 수 있다.

위의 경우만 아니면 keys가 필요할 때 scan 명령을 사용하는 것은 꽤 좋은 선택이 될것 같습니다.

기본적으로 사용법은 다음과 같습니다.(python 기준입니다.)

import redis
r = redis.StrictRedis('localhost', port=2000)

init = 0
while(True):
    ret = r.scan(init)
    print init
    init = ret[0]
    print ret[1]
    if (init == '0'):
        break

[입 개발] Redis Pipeline and Transaction(Multi/Exec)

해당 블로그는 KT Olleh UCloudBiz의 지원을 받고 있습니다.

Redis Pipeline에 대한 내용을 보고 나면, 다음과 같은 질문이 꼭 나오게 됩니다. Pipeline을 이용하면 성능은 좋아지는데, Redis는 Single Thread 라, Pipeline으로 처리하는 동안에는 다른 클라이언트의 작업을 처리못하게 되는거 아닌가요? 라는 질문입니다.

먼저 결론부터 내리자면, Pipeline 형태로 데이터를 전달하더라도 중간에 명령을 수행할 수 있습니다.(진짜루?)

그럼 다음과 같이 간단하게 테스트를 해보도록 하겠습니다. 테스트에는 KT Olleh UcloudBiz 의 8vCore, 16GB 머신을 이용했습니다.

일단 지난번의 테스트 코드를 다시 사용합니다. 다만 테스트를 위해서 프로세스는 하나만 띄웁니다.

from multiprocessing import Process
import redis
import random
import string

def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
    return ''.join(random.choice(chars) for x in range(size))

data = id_generator(512, "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyz!@#$%^&*()")

def f(idx):
    print 'PR', idx
    r = redis.StrictRedis(host='172.27.217.193', port=6379, db=0)
    pipeline = r.pipeline()
    for i in xrange(idx * 1024 * 1024 * 2, (idx+1)*1024*1024 * 2):
        key = 'PR%s'%(i)
        pipeline.get(key)
        if (i != idx * 0124 * 1024 and i%1024==0):
            pipeline.execute();

if __name__ == '__main__':
    print data
    pids = []
    for i in xrange(1):
        p = Process(target=f, args=(i,))
        pids.append(p)
        p.start()

    for pid in pids:
        pid.join()

이제 이 코드를 실행하고 모니터링을 해보면… 이상한 결과가 보여집니다.

redis-pipeline-with-transaction

분명히 문제 없다고 해놓고, 시간이 뻥뛰고 있습니다. redis-stat이라는 모니터링 툴 역시 내부적으로 info 명령을 주기적으로 전달하는 건데, 저렇게 시간이 확 지난거면… 중간에 명령이 실행이 안되는거지 않느냐!!! 라는 생각이 부글부글 드실껍니다.

여기에 비밀이 하나 있습니다. 제가 사용한 Python Redis Clinet 인 redis-py 가 Redis 의 다른 특성을 함께 사용하고 있어서 그렇습니다. 여기서 설명할 것이 Redis Transaction Multi/Exec 입니다. Transaction 이라는 것은 여러 개의 명령을 하나의 성공/실패의 그룹으로 묶어서 수행하는 것을 말합니다. 예를 들어 다음과 같은 명령들이 있다고 합니다.

set A 123
get A

당연히 기대하는 값은 “123” 이겠지만, set A 123 명령 다음에 set A 23이라는 명령을 다른 클라이언트에 의해서 실행이 되면 기대했던 결과를 얻을 수가 없습니다. 그래서 redis 2.6 부터는 MULTI/EXEC 라는 Transaction을 지원하기 위한 명령을 지원하고 있습니다. MULTI/EXEC 의 특징은 그 안의 명령을 모두 한꺼번에 실행시켜준다라는 특성이 있습니다. 즉

MULTI
set A 123
get A
EXEC

의 마지막 결과는 항상 123 입니다. 이것의 구현은 어떻게 되는 것일까요? 아주 간단합니다. MULTI가 나오면 Redis 가 해당 Client로 할당된 큐에 EXEC가 나오기 전까지 모든 명령을 담아두고 있다가 EXEC 명령이 들어오면, 그냥 모두 한꺼번에 수행시켜줍니다. 그래서 이 안에 blocking 관련 BLPOP등의 명령은 데이터가 없으면 바로 실패하게 됩니다. 그리고 MUTLI/EXEC 사이에 명령이 많으면, 이걸 수행하기 위해서, 그 동안에는 다른 클라이언트의 명령을 처리하지 못합니다.(Single Thread 라…)

왜 이 설명을 했냐 하면, Python Redis Client 는 기본적으로 pipeline을 실행할 때, MULTI/EXEC를 사용합니다.(이 특성 때문에 redis-py 의 pipeline 기능을 twemproxy와 함께 사용하려고 하면 지원하지 않는 명령이라 제대로 실행이 되지 않습니다.)

그럼 어떻게 해야 하는가? 호출시에 transaction=False 를 넣어주면 됩니다. 다음과 같습니다.

def f(idx):
    print 'PR', idx
    r = redis.StrictRedis(host='172.27.217.193', port=6379, db=0)
    pipeline = r.pipeline(transaction=False)
    for i in xrange(idx * 1024 * 1024 * 2, (idx+1)*1024*1024 * 2):
        key = 'PR%s'%(i)
        pipeline.get(key)
        if (i != idx * 0124 * 1024 and i%1024==0):
            pipeline.execute();

자 이렇게 바꾸고 다시 실행 후 모니터링 해보도록 하겠습니다.

redis-pipeline-without-transaction

자, 이제 2초마다 제대로 모니터링 되는 걸 알 수 있습니다. 즉 제대로 중간에 명령이 끼어들 수 있다는 겁니다.
(제가 거짓말 한게 아닙니다.)

정리하면 다음과 같습니다.
1. Redis Pipeline은 중간에 명령을 허용한다. 즉 이로 인해 뭔가 데이터가 바뀔 수도 있다.(다른 클라이언트가 변경할 수 있으니…)
2. Multi/Exec를 쓰면, 그 사이의 명령이 전부 한꺼번에 수행된다. 이 안의 작업이 길면 다른 클라이언트들의 명령이 늦게 처리된다.
3. 사용시에 라이브러리를 잘 확인하자. 위와 같은 이유로 안될 수 있다.

[입 개발] Redis Pipeline

해당 블로그는 KT Olleh UCloudBiz의 지원을 받고 있습니다.

가끔씩 Redis의 성능을 좀 더 극대화 시키고 싶어하시는 분들이 있으시기 때문에, 오늘은 이에 가장 많이 사용되는 Pipeline에 대해서 설명을 할려고 합니다.

먼저 재미난 사실을 알려드리자면, Redis pipeline이라는 것이 성능 향상을 위해서 유리하다라는 것은 많은 분들이 알고계시지만, 사실, 서버에는 Redis pipeline 라는 기능이 없다는 것입니다.

다음과 같은 명령을 Redis 소스에서 확인해보면, redis-benchmark.c 와 redis-cli.c를 빼고는 아무데도 pipe라는 글자자체가 없다는 것을 알 수 있습니다.

grep pipe src/* -Rn

그렇다면 Pipeline의 정체는 무엇일까요? 먼저 Pipeline을 적용한 테스트와 Pipeline을 적용하지 않은 테스트 결과를
보여드리도록 하겠습니다.

언어는 python 에 KT Olleh UCloudBiz 의 8vcore 16GB 메모리 모델을 사용했습니다.
두 대를 사용해서 한대는 redis-server unstable 버전이 설치되어 있고, 다른 서버에서 접속한 버전입니다.

from multiprocessing import Process
import redis
import random
import string

def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
    return ''.join(random.choice(chars) for x in range(size))

data = id_generator(512, "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyz!@#$%^&*()")

def f(idx):
    print 'PR', idx
    r = redis.StrictRedis(host='172.27.217.193', port=6379, db=0)
    for i in xrange(idx * 1024 * 1024, (idx+1)*1024*1024):
        key = 'PR%s'%(i)
        r.get(key)

if __name__ == '__main__':
    print data
    pids = []
    for i in xrange(80):
        p = Process(target=f, args=(i,))
        pids.append(p)
        p.start()

    for pid in pids:
        pid.join()

redis-not-pipeline위의 그림을 보면 55k 에서 65k 수준의 cmd/s 를 볼 수 있습니다.

이제 Pipeline을 적용한 테스트를 한번 보도록 하겠습니다. 다른 부분은 동일하고 함수 f()만 조금 수정되었습니다.

def f(idx):
    print 'PR', idx
    r = redis.StrictRedis(host='172.27.217.193', port=6379, db=0)
    pipeline = r.pipeline()
    for i in xrange(idx * 1024 * 1024, (idx+1)*1024*1024):
        key = 'PR%s'%(i)
        pipeline.get(key)
        if (i != idx * 0124 * 1024 and i%1024==0):
            pipeline.execute();

redis-pipeline

물론 거의 200k 전후로 cmd/s 가 나오는 것을 볼 수 있습니다. 도대체 pipeline은 무엇이길래, 이런 속도차를 보여주는 것일까요?

사실 pipeline을 이용해서 가지게 되는 이점은 다음과 같습니다.
1. 명령들이 뭉쳐서 전달되기 때문에, 클라이언트 입장에서는 함수 콜이 줄어들고, 서버 입장에서도 I/O가 적게 발생합니다.
2. 보통 하나의 명령을 보내면 결과를 확인하는 시점이 있는데, 이미 서버에서는 다음 명령이 실행되고 있기 때문에 이런 시간이 줄어듭니다.

실제로 memcached 나 redis를 쓸 때, 한 서버와 통신할 때 mget등을 이용해서 명령어 전송 수를 줄이는 것도 일종의 튜닝 방법입니다. 사실 서버입장에서는 명령어가 뭉쳐서 들어와서 시스템 콜이 줄어드는 이점 이외에는 아무런 차이가 없지만… 심할때는 위와 같은 차이가 나는 것입니다.