[입 개발] Jedis 2.1.0 을 가지고 삽질한 이야기…

최근에… “왜 안되지!!!” -> “헉… 왜 됬지!!!” -> “왜 안되지!!!” -&gt “마음의 평안”을 가진 일이 있습니다. 그게 바로 Jedis 2.1.0을 쓰면서 발생한 일입니다. -_-;;; 저는 레디스 꼬꼬마기 때문에… 여러 개의 Request 가 갈 수도 있으니…pipeline을 이용해야지라고 결정을 했습니다. 참고로 현재 Jedis 버전은 2.8.2 와 2.9.0 입니다.(즉 2.1.0 은 아주아주 예전 버전입니다.)

그런데… 이상한 일은 Redis에 데이터를 집어넣는 아주 간단한 작업인데… 데이터가 늦게 들어가는 것이었습니다. 즉 해당 함수를 호출하면, 이상하게 함수를 호출 한 뒤, 몇 초 후에 해당 명령어가 Redis Monitor를 통해서 실행되는 것을 볼 수 있었습니다.

이 때 부터는 “왜 안되지!!!” 모드였습니다.

현재 버전까지는 Jedis 가 Future를 이용해서 비동기 실행을 지원하는 구조가 아니기 때문에, 아주 옛날 버전인 2.1.0 에서는 당연히 비동기가 안 될것은 자명한 일이었습니다.

그런데… 결국 제가 pipeline 에서 sync 함수를 사용해야 하는데 exec 를 사용했기 때문에 발생한 이슈라는 것을 깨닫게 되었습니다.(역시 사람은 낮잠을 자야…) 그런데… 이게 문제다라는 것을 깨달은 순간부터…

그럼 Jedis의 2.1.0 에서의 sync 와 exec 함수를 간단하게 살펴보도록 하겠습니다. 먼저 기본적으로 Jedis는 응답을 바로 주지만, pipeline 모드에서는 Response 구조체에 값을 넣어주게 됩니다.

    public void sync() {
        List<Object> unformatted = client.getAll();
        for (Object o : unformatted) {
            generateResponse(o);
        }
    }

    public Response<List<Object>> exec() {
        client.exec();
        Response<List<Object>> response = super.getResponse(currentMulti);
        currentMulti = null;
        return response;
    }

위의 코드 처럼 sync는 그냥 던진 리퀘스트에 대한 응답을 모두 생성하게 됩니다. getAll 함수는 inputStream 으로 들어온 응답들을 파싱해서 돌려주게 됩니다.

    public List<Object> getAll(int except) {
        List<Object> all = new ArrayList<Object>();
        flush();
        while (pipelinedCommands > except) {
        	try{
                all.add(Protocol.read(inputStream));
        	}catch(JedisDataException e){
        		all.add(e);
        	}
            pipelinedCommands--;
        }
        return all;
    }

그리고 pipeline은 아래의 Queable 을 상속 받기 때문에, 위에서 getAll로 받은 결과들을 각 Response에 채워줍니다.

package redis.clients.jedis;

import java.util.LinkedList;
import java.util.Queue;

public class Queable {
    private Queue<Response<?>> pipelinedResponses = new LinkedList<Response<?>>();

    protected void clean() {
        pipelinedResponses.clear();
    }

    protected Response<?> generateResponse(Object data) {
        Response<?> response = pipelinedResponses.poll();
        if (response != null) {
            response.set(data);
        }
        return response;
    }

    protected <T> Response<T> getResponse(Builder<T> builder) {
        Response<T> lr = new Response<T>(builder);
        pipelinedResponses.add(lr);
        return lr;
    }
}

그렇기 때문에 sync를 했다면, 제대로 실행되었겠지만, 제가 호출한 exec()는 multi()의 쌍을 위한 것이므로 완전히 잘못되었다고 볼 수 있습니다.

이 때 부터, 이제 “왜 됬지!!!!!!” 모드로 변환하게 됩니다. 이 때도 물론 전 좀 잘못된 지식을 가지고 있었습니다. sync를 할 때 명령이 실행될 것이다라고 생각한거죠. 그러니 sync 도 안했는데… 왜!!! exec는 exec 커맨드만 보냅니다. -_-(아마 여기서는 에러 응답이…)(자꾸 pipeline 곽 multi/exec 를 헷갈리고 있습니다.)

그런다가 다시 jedis를 소스를 따라가보니… 또 다시 충격이… set을 실행하면 다음과 같이 client.set을 실행합니다.

    public Response<String> set(String key, String value) {
        client.set(key, value);
        return getResponse(BuilderFactory.STRING);
    }

다시 client의 set을 따라가 봅시다. 헉 sendCommand 가 있습니다. 실제로 보낸다는 얘기죠.

    public void set(final byte[] key, final byte[] value) {
	sendCommand(Command.SET, key, value);
    }

sendCommand 는 실제로 Protocol.java 의 sendCommand를 호출합니다.

    private static void sendCommand(final RedisOutputStream os,
	    final byte[] command, final byte[]... args) {
	try {
	    os.write(ASTERISK_BYTE);
	    os.writeIntCrLf(args.length + 1);
	    os.write(DOLLAR_BYTE);
	    os.writeIntCrLf(command.length);
	    os.write(command);
	    os.writeCrLf();

	    for (final byte[] arg : args) {
		os.write(DOLLAR_BYTE);
		os.writeIntCrLf(arg.length);
		os.write(arg);
		os.writeCrLf();
	    }
	} catch (IOException e) {
	    throw new JedisConnectionException(e);
	}
    }

이것만 보고는 다시 -_- “왜 안되지!!!” 로 되돌아가게 됩니다. 코드만 보면 pipeline도 바로 실행이 되기 때문에, 제가 호출한 시점에 set 명령이 동작했어야 하기 때문입니다. 그리고 유심히 코드를 보다가 RedisOutputStream 을 보고서야 그 의문이 풀렸습니다.

RedisOutputStream 은 java/redis/clients/util/RedisOutputStream.java 에 있습니다. 아래는 RedisOutputStream의 생성자입니다. 일단 8192 bytes를 buf 라는 이름으로 할당합니다.
그리고 write 함수를 보면 buf에 commands를 저장합니다. 그리고 이 데이터를 실제 쓰는 시점은 flushBuffer를 호출할 때입니다.

    public RedisOutputStream(final OutputStream out, final int size) {
        super(out);
        if (size <= 0) {
            throw new IllegalArgumentException("Buffer size <= 0");
        }
        buf = new byte[size];
    }

    public void write(final byte b[], final int off, final int len) throws IOException {
        if (len >= buf.length) {
            flushBuffer();
            out.write(b, off, len);
        } else {
            if (len >= buf.length - count) {
                flushBuffer();
            }

            System.arraycopy(b, off, buf, count, len);
            count += len;
        }
    }

    private void flushBuffer() throws IOException {
        if (count > 0) {
            out.write(buf, 0, count);
            count = 0;
        }
    }

이 얘기는 반대로 flushBuffer 가 호출되지 않을때는 명령이 전달되지 않는다는 것입니다. write 함수를 보면 데이터가 buf size(8192) 보다 커져야 할 때 실제로 명령을 전달하지 않습니다. IO를 줄이기 위해서죠.

즉, 제가 보낸 set 명령은 8192 바이트보다 적어서 flushBuffer 가 호출되지 않아서 전달되지 않다가, 나중에 이 버퍼가 차거나 다른 이유로 그 때 전달된 것입니다. 이제 정신 상태가 “마음의 평화” 상태로 전이되었습니다.

그런데 한가지 의문이 들 수 있습니다. client 의 sendCommmand를 바로 호출한 흐름과 동일하므로 평소에는 어떻게 바로 결과를 얻을 수 있을까요?

그것은 Jedis 가 get관련 함수가 있거나 응답을 바로 읽어야 할 때, flush 함수를 임의로 호출 해 주기 때문입니다. 즉, 바로 결과를 읽어야 할 시점에는 buf 에 있는 데이터를 전부 전달하고 그 결과를 받아오게 됩니다.

정리하자면, 제 set 명령은 buf 사이즈가 차지 않아서 buf에만 들어가고 실행되지 않고 리턴 된 것입니다. 그냥 set 명령을 호출했으면 바로 결과를 보기 위해서 getStatusCodeReply() 함수등이 실행되면서 명령이 전달되었을텐데, pipeline이라 바로 결과를 읽지 않기 때문에 발생한 것입니다.
그래서 다른 명령들이 해당 클라이언트를 사용해서 8192 바이트가 넘거나 해당 명령이 결과를 얻을려고 할 때 실제로 수행이 됩니다. 만약 다른 명령이 응답을 읽으려고 했다면, 버퍼에 들어가 있던 set 명령의 결과를 받아가므로 실제로는 익셉션이나 다른 값을 얻게 되었을겁니다.

결과적으로 Jedis는 multi 가 호출되지 않으면 exec 에서 Exception을 내도록 수정이 되었으므로, 그 뒤에는 이런 문제를 겪을 일이 없습니다. -_-;;; 저 혼자 삽질을 쭈욱쭈욱 했네요. 흑흑흑, 처음에는 딴 이슈인줄 알고 보다가… 엄청나게 삽질한 케이스입니다. T.T