[입개발] Druid에서 transform 시 알아야 할 팁.

Apache Druid 는 빠른 속도로 데이터를 Aggregation 할 수 있는 툴이지만, 처음 사용하면 이것저것 미묘하게 힘든 부분들이 있다.

다음과 같은 걸 기억해두자. transform에서 오래걸렸던 부분은 컬럼명은 더블 쿼터로 “__time” 이런식으로 그리고 날짜 포맷부분등은 리터럴이라 쿼터로 ‘yyyy-MM-dd’ 형식으로 감싸야 한다는 것이다. TimeZone 도 리터럴이다.


{
"type": "index_parallel",
"spec": {
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "s3",
"prefixes": [
"s3://test-bucket/path1/path2/2020-06-30/"
]
},
"inputFormat": {
"type": "parquet"
},
"appendToExisting": true
},
"tuningConfig": {
"type": "index_parallel",
"partitionsSpec": {
"type": "dynamic"
},
"maxNumConcurrentSubTasks": 4
},
"dataSchema": {
"dataSource": "test_druid_log",
"granularitySpec": {
"type": "uniform",
"queryGranularity": "HOUR",
"rollup": true,
"segmentGranularity": "HOUR"
},
"timestampSpec": {
"column": "timestamp_column",
"format": "iso"
},
"dimensionsSpec": {
"dimensions": [
"id",
"date"
"name",
"email",
"country",
"user_id",
"service"
]
},
"transformSpec": {
"transforms": [
{
"type": "expression",
"name": "date",
"expression": "timestamp_format(\"__time\", 'yyyy-MM-dd', 'Asia/Seoul')"
}
]
}
}
}
}

[Tip] Spring Boot 2.1.0 에서의 CORS 설정

뭔가 내가 손대면 안되는 건지… 인터넷에서 찾으면 꽤 많은 방법이 나온다.

좀 예전 글이긴 하지만 다음 문서를 보면 XML로 설정하는 방법

<mvc:cors>
<mvc:mapping path="/api/"
allowed-origins="http://domain1.com, http://domain2.com
allowed-methods="GET, PUT
allowed-headers="header1, header2, header
exposed-headers="header1, header2" allow-credentials="false"
max-age="123" />
<mvc:mapping path="/resources/
"
allowed-origins="http://domain1.com/
</mvc:cors>

아니면 addCorsMappings 을 override 하는 방법

@Override
            public void addCorsMappings(CorsRegistry registry) {
                registry.addMapping("/**").allowedOrigins("http://localhost:4200");
            }

아니면 corsFilter() 함수를 Bean으로 노출하는 방법

@Bean
	public FilterRegistrationBean corsFilter() {
		UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
		CorsConfiguration config = new CorsConfiguration();
		config.setAllowCredentials(true);
		config.addAllowedOrigin("http://domain1.com");
		config.addAllowedHeader("*");
		config.addAllowedMethod("*");
		source.registerCorsConfiguration("/**", config);
		FilterRegistrationBean bean = new FilterRegistrationBean(new CorsFilter(source));
		bean.setOrder(0);
		return bean;
	}


 그런데 xml은 내가 시도해보지 않았고, 2,3번은 다 안되었다.(왜 안되는지는 시간이 없어서 확인 안함...)
최후에 되는 방법은 그냥 Filter를 이용한 방법이다. 왜 이것만 되지 -_-;;;


@Component
@Slf4j
public class CORSFilter implements Filter {
    @Override
    public void init(FilterConfig filterConfig) {
    }

    @Override
    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
        throws IOException, ServletException {
        HttpServletResponse resp = (HttpServletResponse) response;
        resp.setHeader("Access-Control-Allow-Origin", "*");
        resp.setHeader("Access-Control-Allow-Methods", "POST, GET, PUT, DELETE");
        resp.setHeader("Access-Control-Max-Age", "3600");
        resp.setHeader("Access-Control-Allow-Headers", "*");
        chain.doFilter(request, response);
    }

    @Override
    public void destroy() {

    }
}

[Tip] Finatra Default HttpClient 사용하기

Finatra 에서 Default HttpClient 사용하기. dest가 실제로 접속할 호스트로 설정되는 부분이다. hostname인줄 알고 했다가 계속 실패함. 이렇게 하고 HttpClient를 inject 하면 해당 설정을 이용한 걸고 injection이 되게 됨.

object DruidHttpClientModule extends HttpClientModuleTrait {
  //endpoint_url = "localhost:8080"
  private val config = ConfigFactory.load()
  override val dest = config.getString("endpoint_url")
  override val label = "Druid"

  override def defaultHeaders: Map[String, String] = Map("host" -> dest)
  @Provides
  @Singleton
  final def provideHttpClient(
      injector: Injector,
      statsReceiver: StatsReceiver,
      mapper: ScalaObjectMapper
  ): HttpClient = newHttpClient(injector, statsReceiver, mapper)
}

@Singleton
class DruidClient @Inject()(httpClient: HttpClient, mapper: ScalaObjectMapper) {
    def query[REQUEST, RESPONSE: Manifest](request: REQUEST): Future[Option[RESPONSE]] = {
        for {
            response <- httpClient.execute(RequestBuilder.post("/druid/v2/sql").body(mapper.writeValueAsString(request)))
        } yield {
            if (response.contentString == "null") {
                None
            } else {
               Some(mapper.parse[RESPONSE](response.contentString))
            }
        }
    }

    def query[REQUEST](request: REQUEST): Future[Option[String]] = {
        for {
            response <- httpClient.execute(RequestBuilder.post("/druid/v2/sql").body(mapper.writeValueAsString(request)))
        } yield {
            if (response.contentString == "null") {
                None
            } else {
                Some(response.contentString)
            }
        }
    }
}

[Tip] HBase Asynchbase 의 Defer를 Future로 변경하기

OpenTSDB에서 만든 Asynchbase는 Future가 아닌 Defer를 이용합니다. 이를 Future로 바꾸는 함수는 다음과 같습니다. 이 때 주의할 것은 HBase는 protobuf-java 2.5.0을 쓰고 있기 때문에,
현재 다른 library에서 다른 protobuf-java 버전을 쓰면 문제가 발생할 수 있습니다.


import java.nio.charset.StandardCharsets

import org.hbase.async._

import scala.collection.JavaConversions._
import scala.concurrent.ExecutionContext
import com.twitter.util.{Future, Promise, Return, Throw}
import com.stumbleupon.async.{Callback, Deferred}

object Test {
  implicit val ec = scala.concurrent.ExecutionContext.global

  implicit def futureFromDeferred[A](d: Deferred[A]): Future[A] = {
    val promise = new Promise[A]

    d.addCallback(new Callback[Unit, A] {
      def call(arg: A) = (promise() = Return(arg))
    })

    d.addErrback(new Callback[Unit, Throwable] {
      def call(arg: Throwable) = (promise() = Throw(arg))
    })

    promise
  }

  def makeClient() = {
    val client = new HBaseClient("localhost")
    client
  }

  def test(key: String) = {
    val defer = futureFromDeferred(client.get(buildRequest(key)))
    for {
      keyValue <- FutureOption(defer)
      v <- Future.value(Some(
        new String(keyValue.head.value(), StandardCharsets.UTF_8)))
    } yield v
  }

  def buildRequest(key: String)(implicit ec: ExecutionContext) = {
    val get =
      new GetRequest(tableName, key)
    get.qualifier("cf1:content_id")
  }
}

[입 개발] Redis 6.0 – ThreadedIO를 알아보자.

안녕하세요. 입개말만 하는 CharSyam 입니다. 이번에 Redis Version 6.0.x 가 출시되었습니다. Redis 5.0에서도 Stream 등 새로운 기능이 들어왔었는데, 이번 6.0에서도
ACL 및 여러가지 기능들이 들어왔습니다. 그 중에서도 많은 사람들이 관심있어 하는 것이 바로 ThreadedIO 입니다. Redis를 쓰는 사람들은 Redis에 가졌던 많은 불만 중에 하나가
왜 Multi Thread를 지원하지 않는가였습니다. 그러면 성능이 훨씬 높아질텐데라는 생각을 하면서요. 반대로 Multi Thread를 지원하면 기존의 Redis의 특징중에 하나였던 Atomic을
어떻게 보장할 것인가도 의문이이었습니다.

그렇다면 지금 Redis 6.0에서는 이걸 어떻게 지원했는가? 그리고 어떻게 동작하는가가 이번 잡글의 주제입니다.

다음은 Redis 커미터인 Antirez의 트윗입니다. Redis의 ThreadedIO는 복잡하지 않으면서도 2.5배 정도 빨라졌다는 것입니다. 그렇다면 어떻게 이렇게 됬을까요?

일단 결론부터 말씀드리면, Redis 의 ThreadedIO가 적용되는 부분은 다음과 같습니다.

  • 클라이언트가 전송한 명령을 네트웍으로 읽어서 파싱하는 부분
  • 명령이 처리된 결과 메시지를 클라이언트에게 네트웍으로 전달하는 부분

위의 두 개에서 여전히 명령의 실행 자체는 빠져 있습니다. 넵 Redis 6.0의 ThreadedIO는 여전히 명령의 실행은 Single Threaded 입니다. 즉 기존에 문제가 되던 Atomic이 깨지지 않았다는 것입니다.

그렇다면 Redis에서 Threaded IO 가 어떻게 구현되었는지 살펴보도록 하겠습니다.

먼저 이전까지 완벽한(?) Single Threaded 형식의 Redis Event Loop 입니다. 즉 여기서는 하나의 이벤트 루프에서 IO Multiplexing을 이용해서 Read/Write 이벤트를 받아오고, Read 이벤트가 발생하면 네트웍에서 패킷을 읽고, 명령이 완성되면 실행이 되었습니다.

그런데 새로운 Redis 6의 Threaded IO는 조금 모양이 다릅니다. 실제로 IO Multiplexing 작업을 하고 나면, 이 이벤트들이 발생한 클라이언트를 다음과 같은 리스트에 저장합니다.

(여기서 write는 조금 복잡합니다. IO Multiplexing에 의해서 Write Event 가 발생해서 해당 리스트에 저장하는게 아니라, addReply 계열이 호출되었을 때, clients_pending_write에 저장되게 됩니다.)

  • Read 이벤트 : server.clients_pending_read
  • Write 이벤트 : server.clients_pending_write

그리고 이제 beforeSleep에서 (beforeSleep은 매틱 마다 IO Multiplexing 전에 호출 되는 함수입니다.) 다음과 같은 두 개의 함수를 호출합니다.

  • handleClientsWithPendingReadsUsingThreads
  • handleClientsWithPendingWritesUsingThreads
/* This function gets called every time Redis is entering the
 * main loop of the event driven library, that is, before to sleep
 * for ready file descriptors. */
void beforeSleep(struct aeEventLoop *eventLoop) {
    UNUSED(eventLoop);

    ......
    /* We should handle pending reads clients ASAP after event loop. */
    handleClientsWithPendingReadsUsingThreads();

    ......


    /* Handle writes with pending output buffers. */
    handleClientsWithPendingWritesUsingThreads();
    ......
}

먼저 handleClientsWithPendingReadsUsingThreads 함수 부터 알아보도록 하겠습니다. 주석을 추가합니다.
먼저 clients_pending_read 리스트에서 하나씩 항목을 가져옵니다. 하나 처리할 때 마다 item_id 를 증가합니다.
해당 클라이언트를 다음 공식에 의해서 io_threads에 할당합니다.

target_id = item_id % server.io_threads_num

그리고 해당 스레드들이 작업을 하도록 trigger를 켜고, 작업이 완료되길 기다립니다.
여기서는 Read 이벤트들만 처리가 됩니다. IO 스레드들의 모든 작업이 끝나면, 이제 다시 client_pending_read 리스트를 순회하면서, 명령이 완성되었으면 명령을 처리합니다. 여기서 processCommand가 모두 처리되므로, Atomic이 유지됩니다.

/* When threaded I/O is also enabled for the reading + parsing side, the
 * readable handler will just put normal clients into a queue of clients to
 * process (instead of serving them synchronously). This function runs
 * the queue using the I/O threads, and process them in order to accumulate
 * the reads in the buffers, and also parse the first command available
 * rendering it in the client structures. */
int handleClientsWithPendingReadsUsingThreads(void) {
    if (!io_threads_active || !server.io_threads_do_reads) return 0;
    int processed = listLength(server.clients_pending_read);
    if (processed == 0) return 0;

    if (tio_debug) printf("%d TOTAL READ pending clients\n", processed);

    /* Distribute the clients across N different lists. */
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_read,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        //해당 공식을 통해서 특정 스레드에서 처리되도록 할당된다.
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        //Round Robin 방식으로 스레드에 할당
        item_id++;
    }

    /* Give the start condition to the waiting threads, by setting the
     * start condition atomic var. */
    io_threads_op = IO_THREADS_OP_READ;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        //io_threads 는 각각의 io_threads_pending 값이 1보다 크면 동작하게 되므로 실행을 Trigger 한다.
        io_threads_pending[j] = count;
    }

    /* Also use the main thread to process a slice of clients. */
    // io_threads_list[0] 번은 현재의 main thread, main thread도 일하도록 균등하게 할당됨.
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        readQueryFromClient(c->conn);
    }
    listEmpty(io_threads_list[0]);

    /* Wait for all the other threads to end their work. */
    //모든 스레드의 작업이 끝나길 대기한다. 위에서 io_threds_pending의 값이 trigger 였다는 걸 기억하자.
    //0이되면 스레드의 작업이 종료된 것이다.
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }
    if (tio_debug) printf("I/O READ All threads finshed\n");

    /* Run the list of clients again to process the new buffers. */
    while(listLength(server.clients_pending_read)) {
        //IO 스레드의 작업으로 명령이 파싱된 것들은 모두 CLIENT_PENDING_COMMAND가 붙게 된다.
        //이제 Main Thread에서는 이렇게 붙은 애들만 처리하면 된다. 
        ln = listFirst(server.clients_pending_read);
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_READ;
        listDelNode(server.clients_pending_read,ln);

        if (c->flags & CLIENT_PENDING_COMMAND) {
            c->flags &= ~CLIENT_PENDING_COMMAND;
            if (processCommandAndResetClient(c) == C_ERR) {
                /* If the client is no longer valid, we avoid
                 * processing the client later. So we just go
                 * to the next. */
                continue;
            }
        }
        //psync2 때문에 추가된 부분 https://github.com/antirez/redis/commit/4447ddc8bb36879db9fe49498165b360bf35ba1b
        //그런데 이 코드가 없었으면, 위에서 하나의 패킷에 여러개의 명령이 들어왔다면, 뒤의 명령이, 다음 패킷이 들어올때 까지 처리되지 않는 이슈가
        //있었을 것으로 보인다. 
        processInputBuffer(c);
    }
    return processed;
}

handleClientsWithPendingWritesUsingThreads 함수도 거의 동일한 로직입니다.

int handleClientsWithPendingWritesUsingThreads(void) {
    int processed = listLength(server.clients_pending_write);
    if (processed == 0) return 0; /* Return ASAP if there are no clients. */

    /* If I/O threads are disabled or we have few clients to serve, don't
     * use I/O threads, but thejboring synchronous code. */
    if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
        return handleClientsWithPendingWrites();
    }

    /* Start threads if needed. */
    if (!io_threads_active) startThreadedIO();

    if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed);

    /* Distribute the clients across N different lists. */
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_write,&li);
    int item_id = 0;
 
    //같은 로직으로 타켓 클라이언트를 IO 스레드 큐에 분배합니다.
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    /* Give the start condition to the waiting threads, by setting the
     * start condition atomic var. */
    // Write 이벤트만 처리한다고 설정합니다.
    io_threads_op = IO_THREADS_OP_WRITE;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        io_threads_pending[j] = count;
    }

    /* Also use the main thread to process a slice of clients. */
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        writeToClient(c,0);
    }
    listEmpty(io_threads_list[0]);

    /* Wait for all the other threads to end their work. */
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }
    if (tio_debug) printf("I/O WRITE All threads finshed\n");

    /* Run the list of clients again to install the write handler where
     * needed. */
    listRewind(server.clients_pending_write,&li);
    //처리하지 못한 부분은 여기서 다시 handler를 걸어줍니다. 이 녀석들은 다음번 IO Multiplexing 때에 실행됩니다.
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);

        /* Install the write handler if there are pending writes in some
         * of the clients. */
        if (clientHasPendingReplies(c) &&
                connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
        {
            freeClientAsync(c);
        }
    }
    listEmpty(server.clients_pending_write);
    return processed;
}

여기서 재미있는 부분 중에 하나는 io_threads_op 입니다. Redis의 IO Thread 들은 이 변수에 지정된 타입만 수행을 합니다. 코드를 보시면

list *io_threads_list[IO_THREADS_MAX_NUM];

void *IOThreadMain(void *myid) {
    /* The ID is the thread number (from 0 to server.iothreads_num-1), and is
     * used by the thread to just manipulate a single sub-array of clients. */
    long id = (unsigned long)myid;
    char thdname[16];

    snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
    redis_set_thread_title(thdname);
    redisSetCpuAffinity(server.server_cpulist);

    while(1) {
        /* Wait for start */
        for (int j = 0; j < 1000000; j++) {
            if (io_threads_pending[id] != 0) break;
        }

        /* Give the main thread a chance to stop this thread. */
        //io_threads_pending 값을 보고 작업을 수행한다. 0보다 커야만 동작
        if (io_threads_pending[id] == 0) {
            pthread_mutex_lock(&io_threads_mutex[id]);
            pthread_mutex_unlock(&io_threads_mutex[id]);
            continue;
        }

        serverAssert(io_threads_pending[id] != 0);

        if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));

        /* Process: note that the main thread will never touch our list
         * before we drop the pending count to 0. */
        listIter li;
        listNode *ln;
        listRewind(io_threads_list[id],&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);

            //io_threads_op를 보고 write를 할지, read를 할지 결정한다.
            if (io_threads_op == IO_THREADS_OP_WRITE) {
                writeToClient(c,0);
            } else if (io_threads_op == IO_THREADS_OP_READ) {
                readQueryFromClient(c->conn);
            } else {
                serverPanic("io_threads_op value is unknown");
            }
        }
        listEmpty(io_threads_list[id]);
        io_threads_pending[id] = 0;

        if (tio_debug) printf("[%ld] Done\n", id);
    }
}

저 부분들이 어떻게 client_pending_read, client_pending_write 에 들어가는지는 좀 복잡합니다. 다음번에 쓸지 안쓸지는 ㅎㅎㅎ…
코드를 읽다보니, 조금 애매한 부분들이 있긴 한데, 실제로 성능이 2,3배 좋아졌다고 하니…

정리하면, Redis 의 ThreadedIO가 적용되는 부분은 다음과 같습니다. 그리고 그 구조상 ProcessCommand는 여전히 main thread에서만 실행되기 때문에 Atomic 합니다.
다시 Redis에서 ThreadedIO가 적용되는 부분은 다음과 같습니다. Redis 5에서는 lazy free등이 설정에 따라서 Thread로 동작하긴합니다.

  • 클라이언트가 전송한 명령을 네트웍으로 읽어서 파싱하는 부분
  • 명령이 처리된 결과 메시지를 클라이언트에게 네트웍으로 전달하는 부분

그런데 이 부분이 도입되고 왜 반응성이 좋아졌는가? 실제로 명령이 수행되는 스레드는 한개가 아닌가? 라는 질문이 생길껍니다. 제 생각(?)을 말씀드리자면, 그 만큼 레디스가 많이 사용될때는
클라이언트들의 리퀘스트가 처리되지 않고 네트웍에 대기하고 있거나, 수행되었지만, 응답을 받지 못해서 늦게 처리되는 부분들이 줄어들어서라고 생각합니다. 레디스를 사용하다보면, Redis에는
slow log가 없지만, 클라이언트 단에서는 200~300ms 이상 걸릴때가 종종 생기는데, 이런 이슈가 줄어들것으로 보이고, 여기서 걸리던 시간 만큼 더 명령을 처리하게 되었으니… 실제 처리량도
늘어날듯 합니다.(스레드를 더 쓰니, 더 속도가 빨라지긴 해야 ㅎㅎㅎ)

그럼 모두들 고운하루되세요. Antirez의 말대로 코드량은 작은대, 중복으로 호출되면서 Flags에 따라서 그냥 설정만 하고 리턴하는 부분들이 있어서… 좀 코드는 까다롭네요.
write처리도 대규모 write가 발생할 때는 좀 비효울적인 부분이 있을 수 있는…

[입 개발] airflow 의 schedule_interval 에 대해서

일단 저는 airflow에 대해서는 초초초초초보입니다. 현재 airflow를 조금씩 사용해보고는 있지만, 아직 써본지 얼마 안된… 내부구조도 모르고 어떻게 돌아가는지도 잘 모릅니다. 그 전에는 pinball(pinball)을 몇년 썼지만, 아… pinball도 내부 구조는 거의 모르는 초초초초초초보입니다.(흑 나란 남자… 쓰레기…)

airflow는 참 좋은거 같은데, 최초에 만나는 장벽이 바로 TimeZone 이슈입니다. airflow를 만든 수 많은 역촋들은 시간 이슈로 많은 고생을 하셨는지, 세상은 글로벌(글로 벌을 선다는 의미죠.) 다른 시간대는 다 집어쳐버려라는 마인드로… UTC만 이용하도록 만들었습니다.(물론 실제로 이런 이유는 절대로 아닙니다.)

이번에 airflow 1.10.10 부터는 드디어 UI에 UTC가 아닌 다른 TimeZone을 보여주도록 기능이 추가되었습니다.([AIR-8046]) 그런데 이 1.10.10 은 4/9일에 릴리즈가 되었고, 화면의 UI만 바꿔주는 걸로 보입니다.

앞에도 말했지만 airflow를 처음쓰게 되면 생기는 이슈가 바로 이 시간대와 execution_date 인데요.(사실 저는 아직도 잘 이해하지 못했습니다.) 여기에 대해서 정말 잘 설명한 글이 있어서 Link를 공유합니다.

그래서 저걸 이해하고 나명 이제 남은것은 default_args 의 start_date와 end_date 입니다.
아래와 같은 값을 설정하면 datetime(2020, 4, 16) 은 UTC 기준입니다. 우리네 KST와는 다르죠. 9시간 느립니다.
즉 한국시간으로는 2020-04-16 09:00:00 이 됩니다.

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2020, 4, 16),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

그래서 보통 한국시간으로 코드를 보기 위해서, 다음과 같이 타임존을 지정합니다.

KST = pendulum.timezone("Asia/Seoul")

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2020, 4, 16, tzinfo=KST),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

이제 시작 시간이 한국시간으로 2020-04-16 00:00:00 이 되었습니다. 그런데 여기에 airflow는 schedule_interval 이라는 값이 추가가 됩니다.

schedule_interval 은 cron 형식과 동일하게 사용할 수 있습니다. 사용법 자체는 설명하지 않습니다. 옐르 들어
“0 17 * * *” 라고 하면 매일 17시에 실행을 하라는 것입니다.(딱 그 시간에 시작되지는 않습니다.) 그런데 이러면… 이 17시의 기준이 되는 timezone은 무엇일까요?

넵, 정답은… BYC~~~ 라고 하면 맞아 죽을꺼고 아까 제가 airflow의 timezone 철학은 UTC라고 했습니다라고 생각하시면, 경기도 오산의 오산이고, 구로구 오류동의 오류입니다.

정답부터 얘기하자면 schedule_interval은 startdate 의 timezone에 영향을 받습니다. 당연히 자세한 설명은 피하고 코드를 봅니다. 일단 복잡하니, 중간과정은 다 생략하고 following_schedule 라는 함수를 봅니다.

    def following_schedule(self, dttm):
        """
        Calculates the following schedule for this dag in UTC.

        :param dttm: utc datetime
        :return: utc datetime
        """
        if isinstance(self.normalized_schedule_interval, str):
            # we don't want to rely on the transitions created by
            # croniter as they are not always correct
            dttm = pendulum.instance(dttm)
            naive = timezone.make_naive(dttm, self.timezone)
            cron = croniter(self.normalized_schedule_interval, naive)

            # We assume that DST transitions happen on the minute/hour
            if not self.is_fixed_time_schedule():
                # relative offset (eg. every 5 minutes)
                delta = cron.get_next(datetime) - naive
                following = dttm.in_timezone(self.timezone).add_timedelta(delta)
            else:
                # absolute (e.g. 3 AM)
                naive = cron.get_next(datetime)
                tz = pendulum.timezone(self.timezone.name)
                following = timezone.make_aware(naive, tz)
            return timezone.convert_to_utc(following)
        elif self.normalized_schedule_interval is not None:
            return dttm + self.normalized_schedule_interval

위의 메서드에서 다른 건 어려우니 following(다음 스케줄 시간)을 구하는 코드만 보겠습니다. 코드는 뭔가 차이가 나지만
둘다 self.timezone을 사용하고 있습니다.

following = dttm.in_timezone(self.timezone).add_timedelta(delta)
......
tz = pendulum.timezone(self.timezone.name)
following = timezone.make_aware(naive, tz)

그럼 self.timezone이 어디서 오는지 살펴보면 되겠군요.

        # set timezone from start_date
        if start_date and start_date.tzinfo:
            self.timezone = start_date.tzinfo
        elif 'start_date' in self.default_args and self.default_args['start_date']:
            if isinstance(self.default_args['start_date'], str):
                self.default_args['start_date'] = (
                    timezone.parse(self.default_args['start_date'])
                )
            self.timezone = self.default_args['start_date'].tzinfo

        if not hasattr(self, 'timezone') or not self.timezone:
            self.timezone = settings.TIMEZONE

네 그렇습니다. self.timezone 은 위와 같은 코드로 결정됩니다. start_date가 있으면 여기서 timezone 정보를
가져옵니다. 그리고 그게 없으면 settings.TIMEZONE 을 가져옵니다. 마지막으로 조금 더 살펴보면 airflow.cfg 에서
default_timezone 을 셋팅하면 지정한 값으로 해당 TIMEZONE이 셋팅됩니다. system으로 해두면 자동 시스템 로컬로
설정이 되네요.

TIMEZONE = pendulum.timezone('UTC')
try:
    tz = conf.get("core", "default_timezone")
    if tz == "system":
        TIMEZONE = pendulum.local_timezone()
    else:
        TIMEZONE = pendulum.timezone(tz)
except Exception:
    pass

아주 당연한 얘기지만, airflow 에서 이런 값들이 어떻게 동작하는 지 알아야만, 내가 원하는 시간에 스케줄이 되도록
스케줄 설정이 가능합니다. 이 timezone 말고 execution_date가 어떤건지를 잘 아셔야 정확하게 수행이 가능하니
위의 링크의 글을 꼭 잘 읽어보시길 바랍니다. airflow 초보는 힘드네요.

[입 개발] 왜 Redis 응답이 느린데, slowlog에는 안찍히나요?

Redis를 쓰다보면, 확실히 응답이 늦게 와서, Client 에서는 timeout이 걸리는데, 실제 Redis의 slowlog를 보면 아무런 정보가 없을 때가 많습니다.

오늘은 Redis에서 느린건 확실한거 같은데 왜 slowlog는 남지 않는지 살짝 알아보도록 하겠습니다. 먼저 느리다는 것은 어떤 의미일까요? 느리다는 것 자체는 우리가 기대(또는 예상) 하는 속도보다 더 시간이 걸리는 것으로 얘기할 수 있습니다. 영어로는 latency가 얼마인가? 이런식으로도 표현을…

그런데 이 latency가 얼마냐? 라는 것은 어떻게 측정하는가에 따라서 달라집니다. 어떻게 보면 상대적인 거죠. 위에서 얘기했듯이, 분명히 클라이언트에서는 timeout이 걸릴정도로 느릴 수 있는데, Redis 입장에서는 slowlog가 쌓이지 않는다는 것은 Redis 입장에서는 이게 느리지 않다라는 것입니다.(여기서 살짝 뭔가를 덧붙이면, 사실 Redis가 응답을 늦게 주는 건 맞습니다. 그러나 Redis는 일찍 줬다고 측정하는거죠. 왜?)

먼저, 이 속도를 측정하는 구간에 대해서 이야기를 해보겠습니다. 일반적으로 우리는 다음과 같은 상황으로 측정합니다. 보통 클라이언트 입장에서 latency 라고 하면 Redis를 호출하고, Redis의 응답을 받기까지의 시간을 의미합니다.

latency1

즉 위의 그림에서 1에서 2까지의 시간을 우리는 보통 latency라고 표현합니다. 그래서 이 시간동안의 timeout을 넘어가면 응답이 느리다고 인식하게 되는 것이죠.

그런데 Redis 입장에서 latency, 즉 slowlog를 측정하는 방법은 좀 다릅니다.
latency2

위의 그림에서 빨간색 3으로 표시된 부분만이 Redis에서 slowlog의 대상이 되는 시간입니다. 소스를 보면 다음과 같습니다. c->cmd->proc 함수를 호출하는 바로 앞과 뒤에서 해당 함수의 수행시간을 재고 있습니다.

void call(client *c, int flags) {
    ......
    start = server.ustime;
    c->cmd->proc(c);
    duration = ustime()-start;

    ......

    /* Log the command into the Slow log if needed, and populate the
     * per-command statistics that we show in INFO commandstats. */
    if (flags & CMD_CALL_SLOWLOG && !(c->cmd->flags & CMD_SKIP_SLOWLOG)) {
        char *latency_event = (c->cmd->flags & CMD_FAST) ?
                              "fast-command" : "command";
        latencyAddSampleIfNeeded(latency_event,duration/1000);
        slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
    }
   
    ......
}

이 상황에서 다시 문제가 되는 부분들을 확인해 봅시다. 크게 두 가지로 나눠집니다.

  • Case A: 클라이언트 입장에서도 latency가 느리고, Redis에서도 느려서 slowlog가 찍힐 만한 상황
  • Case B: 클라이언트 입장에서는 latency가 느리지만, Redis 입장에서는 느리지 않다고 인식하는 경우

먼저 Case A 부터 살펴봅시다. Case A는 Redis에서 위에서 말한 빨간색 3에 해당하는 케이스입니다. Redis의 PrcoessCommand가 느려지는 경우는, O(N)의 명령을 사용하거나, 해당 메모리가 swaping 대상으로 매번 디스크를 접근해야 될 경우입니다. 후자는 메모리가 부족할 때 발생하는 케이스이고, 전자는 대량의 아이템을 한번에 가져가는 케이스, 예를 들어 KEYS 명령을 쓰거나, 굉장히 많은 item이 든 collection의 데이터를 모두 가져오는, 예를 들어, 10만개가 들었는데 10만개를 전부 가져오면 느려집니다. 대표적으로 다음과 같은 명령들이 있습니다.

  1. hgetll
  2. smembers

아니면 이렇게 든 collection을 지울 때 발생하게 됩니다.

그럼 이제 오늘의 주제였던 Case B를 살펴봅시다. 일단 다시 짚고 넘어가야할 것이, Redis는 싱글스레드입니다. 즉, 하나의 명령을 수행하는 동안 다른 명령을 수행하지 못합니다. 그리고, Redis의 네트웍 모듈 역시 이 싱글스레드에 영향을 받습니다.

Redis에서 하나의 명령을 수행하는 과정은, 네트웍 패킷을 클라이언트 마다 epoll 등의 IO multiplexer를 이용해서 읽어들이고, 이게 완성되었다라고 판단이 들면 그 시점에 바로 수행을 해버립니다. 즉, 클라이언트에서 Request를 보냈다고 하더라도, TCP로 인해서 패킷이 분할되서 갈 수도 있고, 한방에 갈수도 있지만, 하나의 Request가 완성이 되었다고 Redis가 판단해야만, 그 때 실행되게 됩니다. 그렇다면, 내가 네트웍으로 Request를 보냈지만, Redis가 다른 명령들의 network 패킷을 읽어드리고 해당 명령어를 처리하는 중이라면, 우리가 보낸 Request는 아직 실행이 되지 않습니다. 즉, 우리가 Request를 보낸 시간은 계속 지나가지만, Redis가 다른 수 많은 명령을 처리한다면, 그리고 그 기간이 길어진다면 위에서 말한 1,2의 시간은 길지만, Redis 입장에서는 하나의 명령을 짧게 처리하면, 자신은 여전히 빨리 처리한다고 생각하므로, slowlog가 남지 않고, 클라이언트에서는 응답이 느려지는 현상이 발생하게 되는 것입니다.

이럴 경우는 Redis에서, 시간이 느린 명령을 최대한 줄이거나(slowlog에 걸릴만큼은 아니지만 느린 명령들), Redis 서버를 늘려서, Requests 자체를 줄이는 방법이 있습니다. 시간이 느린 명령은 info all 명령으로 cmd states를 보시면, 호출 수와 평균 응답시간이 있으니 이거 기준으로 튜닝하시면 됩니다.

그럼 고운 하루되세요.

[입 개발] redis-cli 에서 –rdb는 주의해서 사용하셔야 합니다.

최근에 지인에게서 문의가 왔습니다. 뭔가 특별히 한게 없는데, Redis 서버가 죽어버리고 재시작했다고, 이것저것 알아보니, 일단 메모리를 많이 사용하고 있던 상태였습니다. 웬지 느낌은, 아주 전통적인 Redis가 데이터 저장을 위해서 fork하면서 메모리를 추가로 사용하고, 이걸로 인해서 OOM등이 발생한게 아닐까라고 생각을 했는데…

지인의 얘기로는 다음과 같은 –rdb 명령을 사용했다고 합니다.

redis-cli --rdb

일단 저도 한번도 해당 옵션을 써본적이 없어서 소스를 확인해보니 다음과 같습니다. 해당 소스는 redis-cli.c를 보시면 됩니다.

        } else if (!strcmp(argv[i],"--rdb") && !lastarg) {
            config.getrdb_mode = 1;
            config.rdb_filename = argv[++i];
        }

getrdb_mode라는 옵션을 셋팅합니다. 해당 부분을 다시 찾아보면 getrdb_mode가 켜져있을 때 sendCapa()라는 함수와 getRDB를 호출합니다.

    /* Get RDB mode. */
    if (config.getrdb_mode) {
        if (cliConnect(0) == REDIS_ERR) exit(1);
        sendCapa();
        getRDB(NULL);
    }

sendCapa를 호출해서 eof를 설정하면 diskless replication을 시도합니다. 여기서 주의할 것은 diskless replication은 서버에 RDB파일을 만들지 않을뿐, fork를 해야하는 것은 동일합니다. 그리고 이 부분이 문제를 일으키게 됩니다. 다만 여기서는 실제 RDB를 만들지 않고 해당 클라이언트에게는 diskless replication을 하라고 설정만 하는 단계입니다.(다만 서버가 repl_diskless_sync를 켜두지 않으면 만구땡…)

void sendCapa() {
    sendReplconf("capa", "eof");
}

void sendReplconf(const char* arg1, const char* arg2) {
    printf("sending REPLCONF %s %s\n", arg1, arg2);
    redisReply *reply = redisCommand(context, "REPLCONF %s %s", arg1, arg2);

    /* Handle any error conditions */
    if(reply == NULL) {
        fprintf(stderr, "\nI/O error\n");
        exit(1);
    } else if(reply->type == REDIS_REPLY_ERROR) {
        fprintf(stderr, "REPLCONF %s error: %s\n", arg1, reply->str);
        /* non fatal, old versions may not support it */
    }
    freeReplyObject(reply);
}

getRDB 함수는 이제 실제로 SYNC 명령을 보내서 replication을 수행합니다. 원래의 Replication은 그 뒤로 계속 master로 부터 데이터를 받아야 하지만, 여기서 getRDB는 그냥 현재 메모리 상태의 스냅샷만 전달하고 종료하게 됩니다.

코드를 보면 sendSync 후에 payload를 가져오게 되는데, 해당 payload는 RDB생성시 diskless_sync 방식이 아닐경우에만 파일 사이즈를 전달해줘서 가져올 수 있고, diskless_sync 방식을 쓸때는 eof_mark 라는 40바이트의 랜덤값을 처음에 전달해주고, 마지막에 해당 값이 나오면 RDB생성을 종료하게 됩니다.

static void getRDB(clusterManagerNode *node) {
    int s, fd;
    char *filename;
    if (node != NULL) {
        assert(node->context);
        s = node->context->fd;
        filename = clusterManagerGetNodeRDBFilename(node);
    } else {
        s = context->fd;
        filename = config.rdb_filename;
    }
    static char eofmark[RDB_EOF_MARK_SIZE];
    static char lastbytes[RDB_EOF_MARK_SIZE];
    static int usemark = 0;
    unsigned long long payload = sendSync(s, eofmark);
    char buf[4096];

    if (payload == 0) {
        payload = ULLONG_MAX;
        memset(lastbytes,0,RDB_EOF_MARK_SIZE);
        usemark = 1;
        fprintf(stderr,"SYNC sent to master, writing bytes of bulk transfer "
                "until EOF marker to '%s'\n", filename);
    } else {
        fprintf(stderr,"SYNC sent to master, writing %llu bytes to '%s'\n",
            payload, filename);
    }

    /* Write to file. */
    if (!strcmp(filename,"-")) {
        fd = STDOUT_FILENO;
    } else {
        fd = open(filename, O_CREAT|O_WRONLY, 0644);
        if (fd == -1) {
            fprintf(stderr, "Error opening '%s': %s\n", filename,
                strerror(errno));
            exit(1);
        }
    }

    while(payload) {
        ssize_t nread, nwritten;

        nread = read(s,buf,(payload > sizeof(buf)) ? sizeof(buf) : payload);
        if (nread = RDB_EOF_MARK_SIZE) {
                memcpy(lastbytes,buf+nread-RDB_EOF_MARK_SIZE,RDB_EOF_MARK_SIZE);
            } else {
                int rem = RDB_EOF_MARK_SIZE-nread;
                memmove(lastbytes,lastbytes+nread,rem);
                memcpy(lastbytes+rem,buf,nread);
            }
            if (memcmp(lastbytes,eofmark,RDB_EOF_MARK_SIZE) == 0)
                break;
        }
    }
    if (usemark) {
        payload = ULLONG_MAX - payload - RDB_EOF_MARK_SIZE;
        if (ftruncate(fd, payload) == -1)
            fprintf(stderr,"ftruncate failed: %s.\n", strerror(errno));
        fprintf(stderr,"Transfer finished with success after %llu bytes\n", payload);
    } else {
        fprintf(stderr,"Transfer finished with success.\n");
    }
    close(s); /* Close the file descriptor ASAP as fsync() may take time. */
    fsync(fd);
    close(fd);
    fprintf(stderr,"Transfer finished with success.\n");
    if (node) {
        sdsfree(filename);
        return;
    }
    exit(0);
}

다시 정리하자면 redis-cli에서 –RDB 옵션을 주면 서버에 diskless_sync를 요청합니다.(서버가 설정이 되어있어야만 동작합니다.) 그러나 해당 작업은 Redis 서버를 fork하게 만들기 때문에, 메모리 사용량이 높고 Write가 많은 상황에서는 절대적으로 피하시는게 좋습니다.

[입개발] Spring-data-redis 에서 Jedis로 TLS를 쓰면서 인증서 체크는 Disable 하는 방법

흐음… 최근에 Redis를 TLS로 사용해야 할 일이 생겼습니다.(아시는 분은 아시겠지만, 저는 자바맹, Spring 맹이라…) Spring-data-redis 를 쓰는 서비스에서 TLS를 키는 것은 사실 아주아주아주 쉽습니다. 일단 저희는 spring-data-redis에서 Jedis를 쓰고 있는 상황입니다.(여기서 왜 lettuce 안쓰고 Jedis 쓰냐고 물으시면, 원래 그렇게 만들어져 있었으니까라고 대답을 ㅎㅎㅎ)

그런데 JedisConenctionFactory에서 TLS를 쓰는 건 아주 쉽습니다. 그냥 setUseSsl(true)만 호출하면 그 때부터 TLS가 딱!!! 됩니다.

딱!!! 되는데 제대로 접속하기 위해서는 인증서가 제대로 된 위치에 존재해야 합니다. 그런데, 내부 레디스 서버마다 인증서를 만들기는 귀찮을 수 있습니다. 그럴때는 살포시… 인증서를 무시해주면 되는데…

구글링을 해봐도, Spring Data Redis에 TLS를 적용하는 방법은 많이 나옵니다. 또, 인증서 체크를 Disable 하는 방법도 꽤 많이 나옵니다. 그런데 인증서 체크를 끄는 법은 다 HTTPS 관련 Rest Template 설정에 관한 것들입니다. 즉 아주 편하게 할 수 있는 Spring Data Redis 에서 TLS를 인증서 체크를 끄는 방법이 검색해도 안나오더라는…(아마도 제가 못 찾은걸 확신합니다만…)

일단 기본적인 방법은 다음과 같습니다.(다음 StackOverflow 를 참고합니다.)

  1. TrustManager를 생성한다.
  2. SSLContext를 가져온다.
  3. SSLContext의 init를 아까 생성한 TrushManager를 이용하도록 설정한다.
  4. 해당 SSLContext의 SSLSocketFactory를 이용하도록 설정한다.
import javax.net.ssl.*;
import java.security.*;
import java.security.cert.X509Certificate;

public final class SSLUtil{

    private static final TrustManager[] UNQUESTIONING_TRUST_MANAGER = new TrustManager[]{
            new X509TrustManager() {
                public java.security.cert.X509Certificate[] getAcceptedIssuers(){
                    return null;
                }
                public void checkClientTrusted( X509Certificate[] certs, String authType ){}
                public void checkServerTrusted( X509Certificate[] certs, String authType ){}
            }
        };

    public  static void turnOffSslChecking() throws NoSuchAlgorithmException, KeyManagementException {
        // Install the all-trusting trust manager
        final SSLContext sc = SSLContext.getInstance("SSL");
        sc.init( null, UNQUESTIONING_TRUST_MANAGER, null );
        HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());
    }

    public static void turnOnSslChecking() throws KeyManagementException, NoSuchAlgorithmException {
        // Return it to the initial state (discovered by reflection, now hardcoded)
        SSLContext.getInstance("SSL").init( null, null, null );
    }

    private SSLUtil(){
        throw new UnsupportedOperationException( "Do not instantiate libraries.");
    }
}

위의 코드는 HttpsURLConnection 을 위한 SSL 인증서 체크를 무시하는 방법입니다. 다만, 우리는 이걸 보면, 아 Spring Data Redis의 JedisConnectionFactory 도 유사하다고 추측할 수 있습니다. 실제로 JedisConnectionFactory 코드를 보면 SSLSocketFactory를 가져오는 함수는 보이지 않습니다.

조금 더 파보면 JedisConnectionFactory 안에는 clientConfiguration 변수가 있고, 다음과 같은 setSslSocketFactory, getSslSocketFactory 와 같은 함수들이 보입니다.

static class MutableJedisClientConfiguration implements JedisClientConfiguration {
......
		@Override
		public Optional getSslSocketFactory() {
			return Optional.ofNullable(sslSocketFactory);
		}

		public void setSslSocketFactory(SSLSocketFactory sslSocketFactory) {
			this.sslSocketFactory = sslSocketFactory;
		}

		/*
		 * (non-Javadoc)
		 * @see org.springframework.data.redis.connection.jedis.JedisClientConfiguration#getSslParameters()
		 */
		@Override
		public Optional getSslParameters() {
			return Optional.ofNullable(sslParameters);
		}

		public void setSslParameters(SSLParameters sslParameters) {
			this.sslParameters = sslParameters;
		}
......
}

그리고 createJedis 같은 함수를 보면 아래와 같이 Jedis 인스턴스를 생성할 때 clientConfiguration의 getSslSocketFactory 함수를 쓰고 있는걸 볼 수 있습니다.

	private Jedis createJedis() {

		if (providedShardInfo) {
			return new Jedis(getShardInfo());
		}

		Jedis jedis = new Jedis(getHostName(), getPort(), getConnectTimeout(), getReadTimeout(), isUseSsl(),
				clientConfiguration.getSslSocketFactory().orElse(null), //
				clientConfiguration.getSslParameters().orElse(null), //
				clientConfiguration.getHostnameVerifier().orElse(null));

		Client client = jedis.getClient();

		getRedisPassword().map(String::new).ifPresent(client::setPassword);
		client.setDb(getDatabase());

		return jedis;
	}

오호 이제 저 setSslSocketFactory 함수를 통해서 아까 얻은 SSLContext의 SSLSocketFactory 로 바꿔주면 될듯 합니다. 그런데 아주 사소한 문제가 있습니다. 어떻게 JedisConnectionFactory에서 저 값을 바꿀 수 있을까요? 살짝 살펴보니 오오 다음과 같은 함수가 존재합니다.

public class JedisConnectionFactory implements InitializingBean, DisposableBean, RedisConnectionFactory {
        ......
        public JedisClientConfiguration getClientConfiguration() {
		return clientConfiguration;
	}
        ......
}

그런데, 오예!! 하면서 받아서 setSslSocketFactory를 호출해보려고 하면…. 문제가 발생합니다. 그것은!!!, JedisClientConfiguration 이 interface 인데… ReadOnly Interface라는 것입니다. 대략 다음과 같습니다.

public interface JedisClientConfiguration {
	boolean isUseSsl();
	Optional getSslSocketFactory();
	Optional getSslParameters();
	Optional getHostnameVerifier();
	boolean isUsePooling();
	Optional getPoolConfig();
	Optional getClientName();
	Duration getConnectTimeout();
	Duration getReadTimeout();
        ......
}

흐음, 그러면 어차피 내부는 MutableJedisClientConfiguration 클래스이니, 그냥 강제 형변환해버리면 되지 않을까요? 일단 제가 자바에 깊은 지식이 없어서… 실패했을 수도 있지만, Inner Class 고, 같은 패키지가 아니면 사용할 수가 없습니다. 그럼 무슨 말이야… 앞에 시도한 방법은 모두 실패… 우리는 이상한 산으로 가고 있던 것입니다.

그럼 어떻게 해야 하며, 해당 소스를 살펴보니, 다행히 다음과 같은 생성자가 있습니다. 잘 살펴보면 RedisStandaloneConfiguration 과 JedisClientConfiguration 을 생성자로 받고 있습니다.

public JedisConnectionFactory(RedisStandaloneConfiguration standaloneConfig, JedisClientConfiguration clientConfig) {

		this(clientConfig);

		Assert.notNull(standaloneConfig, "RedisStandaloneConfiguration must not be null!");

		this.standaloneConfig = standaloneConfig;
	}

이제 대충 방법이 떠오르시나요? 즉, 다음과 같이 JedisClientConfiguration 을 상속받는 다른 클래스를 만들어서 JedisConnectionFactory 의 생성자로 넘기면, 우리가 원하는 동작을 하도록 만들 수가 있습니다.

대략 저는 다음과 같은 JedisSSLClientConfiguration 이라는 클라스를 만들었습니다. 그리고 JedisConnectionFactory 에 넘겨주니, 인증서 체크를 잘 회피하면서 동작하게 되었습니다. 역시 소스를 보면 길이 보이는 경우가 종종 있네요. 자바를 잘하시는 분들이 부럽습니다. 전 맨날 삽질만…

    JedisSSLClientConfiguration() {
        setUsePooling(true);
        setUseSsl(true);

        TrustManager[] trustAllCerts = new TrustManager[] {
                new X509TrustManager() {
                    public java.security.cert.X509Certificate[] getAcceptedIssuers()
                    {
                        return new X509Certificate[0];
                    }
                    public void checkClientTrusted(java.security.cert.X509Certificate[] certs, String authType)
                    {
                    }
                    public void checkServerTrusted(java.security.cert.X509Certificate[] certs, String authType)
                    {}
                }
        };

        try {
            SSLContext sc = SSLContext.getInstance("TLS");
            sc.init(null, trustAllCerts, null);
            sslSocketFactory = sc.getSocketFactory();
            sslParameters = new SSLParameters();
            sslParameters.setEndpointIdentificationAlgorithm("");
        } catch (Exception e) {
            throw new RuntimeException(e.toString());
        }

        hostnameVerifier = new HostnameVerifier() {
            @Override
            public boolean verify(String s, SSLSession sslSession) {
                return true;
            }
        };
    }

[입 개발] Redis 버그 – Dataset 사이즈가 200GB가 넘어가면 죽는다구요?

오늘은 최근에 이슈가 되었던, Redis 버그에 대해서 분석해보는 시간을 가지도록 하겠습니다.
해당 이슈는 Redis issue 4493 를 보시면 됩니다.

실제로 해당 이슈는 2017년 11월 30일에 올라왔습니다. 실제로 현재 버전에는 다 패치가 되어있습니다. 그러나 DBMS나 캐시등의 툴은 큰 보안버그가 없는 이상 업데이트가 굉장히 느립니다.(성능에 영향을 주기 때문에…)

다른 건 일단 모두 제외하고 어떤 이슈가 있고, 어떻게 패치되었는지 확인해보도록 하겠습니다.

일단 데이터가 200GB가 넘어가면 죽는다는 뭔가 쉽게이해하기 어려운 상황입니다. Redis 는 기본적으로 Key에 512MB까지 그리고 Value에 512MB가 할당되고, 메모리가 넘치기 전까지는 저장이 되어야 합니다. 그런데 보통 특정 사이즈가 문제가 되는것은, 누구나 예측이 되는 아주 간단한 이유가 있습니다. 바로 overflow 나 underflow, 이런 생각을 가지고 가면, 문제가 좀 더 해결하기 쉽지만, 갑자기 Redis가 죽으면 알기가 힘들죠.(이건 다, 우리는 현재 모든 정보를 알고 있기 때문에 쉽게 이해를 할 수 있는…)

Redis 는 보통 죽기 전에 자신의 정보를 뿌리고 죽는데, 위의 링크를 보시면 다음과 같은 정보들이 있습니다. 우와 755GB 메모리를 가진 장비에 211GB의 메모리를 쓰고 있네요. 부럽습니다. 사실 메모리 영역만 보면 사실 큰 문제가 될께 없어보입니다.

Memory
used_memory:226926628616
used_memory_human:211.34G
used_memory_rss:197138104320
used_memory_rss_human:183.60G
used_memory_peak:226926628616
used_memory_peak_human:211.34G
used_memory_peak_perc:117.84%
used_memory_overhead:137439490190
used_memory_startup:486968
used_memory_dataset:89487138426
used_memory_dataset_perc:39.43%
total_system_memory:811160305664
total_system_memory_human:755.45G

Keyspace
db2:keys=2147483651,expires=0,avg_ttl=0

그런데 Keyspace를 보니 조금 다르네요. db2의 key가 2147483651개가 있습니다. 흐음…

다음과 같이 signed 변수들의 범위를 보면

      char : 127
      short : 32767
      int : 2147483647

입니다. 흐음 일단 key의 개수가 int의 범위를 넘어갔네요. 2147483651을 음수로 바꾸면
-2147483645 이 됩니다. 흐음… 이렇게 바뀌면 문제가 발생할 수도 있겠네요. 그런데 뭔가 이상합니다.

분명히 해당 info정보에서는 제대로 2147483651 가 나와있는데요?
해당 정보를 출력하는 info Command는 실제로 genRedisInfoString 라는 함수를 이용합니다.
아래 코드를 보면 keys, vkeys는 long long 입니다. 그걸 사용하고 있는 dictSize 함수는 매크로로 그냥 값을 가져옵니다.
실제로 redisDb 구조체는 dict를 가지고 있고 dict는 다시 dictht 라는 해시 테이블을 가지고 있습니다. 거기서 used 변수를
가져오는게 dictSize 함수입니다.

#define dictSize(d) ((d)->ht[0].used+(d)->ht[1].used)

typedef struct redisDb {
    dict *dict;                 /* The keyspace for this DB */
    dict *expires;              /* Timeout of keys with a timeout set */
    dict *blocking_keys;        /* Keys with clients waiting for data (BLPOP)*/
    dict *ready_keys;           /* Blocked keys that received a PUSH */
    dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS */
    int id;                     /* Database ID */
    long long avg_ttl;          /* Average TTL, just for stats */
} redisDb;

typedef struct dict {
    dictType *type;
    void *privdata;
    dictht ht[2];
    long rehashidx; /* rehashing not in progress if rehashidx == -1 */
    unsigned long iterators; /* number of iterators currently running */
} dict;

typedef struct dictht {
    dictEntry **table;
    unsigned long size;
    unsigned long sizemask;
    unsigned long used;
} dictht;

sds genRedisInfoString(char *section) {
    sds info = sdsempty();
    ......

    /* Key space */
    if (allsections || defsections || !strcasecmp(section,"keyspace")) {
        if (sections++) info = sdscat(info,"\r\n");
        info = sdscatprintf(info, "# Keyspace\r\n");
        for (j = 0; j < server.dbnum; j++) {
            long long keys, vkeys;

            keys = dictSize(server.db[j].dict);
            vkeys = dictSize(server.db[j].expires);
            if (keys || vkeys) {
                info = sdscatprintf(info,
                    "db%d:keys=%lld,expires=%lld,avg_ttl=%lld\r\n",
                    j, keys, vkeys, server.db[j].avg_ttl);
            }
        }
    }
    return info;
}

그런데 used라는 변수는 unsigned long 입니다. 32bit에서는 문제가 될 수 있지만, 64bit에서는 8바이트가 할당되는 변수입니다.
64bit 에서 다음과 같은 코드를 돌려보시면 알 수 있습니다.

#include

int main(int argc, char *argv[]) {
printf(“%d\n”, sizeof(unsigned long));
return 0;
}

그럼 제대로 64bit가 되어있는데 무엇이 문제인가!!!, 처음부터 다 분석하면 어려우니 해당 문제를 일으키는 곳을 바로 확인해 보겠습니다. 코멘트를 자세히 읽어보면… 실제로 아이템을 추가할 때가 문제가 됩니다. 다시 앞으로 돌아가서 아이템 개수가 int의 범위를 넘어갔다는 것을 기억해둡니다.

다음 dictAddRaw 함수를 보면 index가 int 입니다. 헉… 바로 눈치 채시겠죠. 저기 int 로 인해서 아래의 ht->table[index]
라는 코드가 overflow 로 음수가 들어가게 됩니다. 바로 Redis는 저세상으로…

dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{
    int index;
    dictEntry *entry;
    dictht *ht;

    if (dictIsRehashing(d)) _dictRehashStep(d);

    if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)
        return NULL;

    ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
    entry = zmalloc(sizeof(*entry));
    entry->next = ht->table[index];
    ht->table[index] = entry; /* used++;

    /* Set the hash entry fields. */
    dictSetKey(d, entry, key);
    return entry;
}

그럼 해당 실제 patch는 어떻게 적용되었을까요?

해당 코드를 보시면 해당 int는 long으로, 그 외에 많은 부분들이 64bit unsigned 또는 signed 로 변경된것을 볼 수 있습니다.
실제로 해당 PR은 두개로 나뉘어져 각각 반영되었고, Redis 4.0.7에 다음 두개의 commits으로 볼 수 있습니다.
dict: fix the int problem for defrag

dict: fix the int problem

즉 해당 버그를 피하실려면 최소한 4.0.7 이후의 버전을 선택하셔야 하고 가능하면 최신 버전을 고르시는 걸 추천드립니다.
그런데 21억개를 넘어가는 아이템이라… 웬만한 규모에서는 발생하지 않을 문제겠지만, 엄청 많은 데이터를 쓰는 곳에서만 발생할 수 있었던 문제로 보입니다.