[입 개발] Spark Structured Streaming 에서 Offset 은 어떻게 관리되는가(아주 간략한 버전)?

DStream 만 열심히 쓰다가, Structured Streaming 을 강제로 써야할 일이 생겼습니다. 그런데 DStream을 쓸 때는 Offset 을 명확하게 가져오거나 볼 수 있었는데, Structured Streaming 을 보니, 아는게 없었습니다. -_-;;; 분명히 까만건 글씨고 하얀건 바탕인데…

먼저 Structured Streaming 에서 offset 을 지정하는 방법은 startingOffsets 을 지정하는 방법이 있습니다. 보통 “earliest” 와 “latest” 를 지정하지만, 시작 Offset을 명시적으로 지정할 수도 있습니다.

  val stream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaServer)   // comma separated list of broker:host
      .option("subscribe", topic)    // comma separated list of topics
      .option("startingOffsets", "earliest") //earliest, latest
      .load()

earliest 는 현재 Kafka에서 해당 토픽이 가지고 있는 가장 빠른 offset, latest 는 가장 마지막 offset 입니다. 다음과 같이 Offset 을 명시적으로 지정할 수 도 있습니다.

  val stream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaServer)   // comma separated list of broker:host
      .option("subscribe", topic)    // comma separated list of topics
      .option("startingOffsets", """{"test_logs": {"0": 100, "1": 200}""")
      .load()

일단 위의 offset 지정 방법은 checkpoint가 생성되기 전에 최초에만 지정되는 형태입니다. 즉 checkpoint 가 생성되면 위의 staringOffsets는 checkpoint 의 값을 사용하게 됩니다.

일단 Structured Streaming 을 하면 checkpoint 를 사용하게 되어 있습니다. checkpoint 를 설정하면, 다음과 같은 기본 디렉토리 들이 생깁니다.

  • commits – 완료된 batchId 가 저장된다.
    • commits/0, commits/1 이런식으로 파일이 저장된다.
  • metadata
    • 잘 모릅니다.
  • offsets
    • 실제 offsets 이 저장됩니다.
    • offsets/0, offsets/1, offsets/2 이런식으로 Offset 이 저장되어 있다.
  • sources
    • 잘 모릅니다.

일단 잘 모르는 것들을 제외하고 나서 남아있는 offsets 와 commits 입니다. 여기서 offsets 안에는 batchId 가 있는데, 이 파일 안에 실제로 offset이 들어가 있습니다. 다음과 같이 버전 {정보} {오프셋} 형태로 들어가 있습니다. 실제 오프셋은 {topic_name: {partitionId: offset} 구조 입니다. 마지막줄의 내용이, 위에서 보았던, Specific Offset 지정 방법과 동일한 형태인 것을 볼 수 있습니다.

v1
{“batchWatermarkMs”:0,”batchTimestampMs”:1615293540745,”conf”:{“spark.sql.streaming.stateStore.providerClass”:”org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider”,”spark.sql.streaming.join.stateFormatVersion”:”2″,”spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion”:”2″,”spark.sql.streaming.multipleWatermarkPolicy”:”min”,”spark.sql.streaming.aggregation.stateFormatVersion”:”2″,”spark.sql.shuffle.partitions”:”200″}}
{“test_logs”:{“0″:1058826043,”1″:1058853622,”2”:1058871214}}

실제로 이 offset 을 남기는 부분은 external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceInitialOffsetWriter.scala 파일을 보시면 됩니다.

여기서 Offset 이 어떻게 관리되고 처리되는지 살짝 알아보도록 하겠습니다. 먼저 offsets/{nextBatchId} 로 파일이 먼저 생성됩니다. 이 파일이 있으면 현재 offset 부터 nextBatchId 에 있는 offset 까지 처리가 되게 됩니다. 해당 파일이 없으면 latest offset까지가 다음 offset 이 되면서 해당 파일이 만들어지게 됩니다. nextBatchId 는 일종의 WAL(Write ahead Log) 의 역할을 합니다.

그리고 하나의 MicroBatch 가 완료되면 commits 에 해당 batchId 가 생성되면서 완료를 확인합니다. 즉 commits/3 까지 있고 offsets/3 이 있다면, batchId 3번 까지 완료가 되었다는 뜻입니다.

그럼 이제 실제로 예를 한번 들어보겠습니다.

startingOffsets 를 earliest 로 설정하고 해당 값이 100 이고 처음 batch가 시작된다고 하면 offsets/0 파일에 {“test_logs”:{“0”: 200}} 이렇게 저장되어 있다면, 첫 배치에서 Offset 100 ~ Offset 199 까지 처리하게 됩니다. 그래서 혹시나 Offset 을 조작할 일이 있다면, 처리된 commits/{batchId} 를 지우거나, N번째와 N-1번째를 수정해주면 동작할 것 같습니다.(지우기는 해봤는데… 막 수정까지는 하지 안해봤습니다. 다만 Validation 검증 코드가 없는 걸로 봐서 동작할꺼 같습니다. 둘 다 바꾸기는 귀찮아서…)

처음에 살짝 당황했던건 offsets/0 에 있는 값이 실제로 준 offset 보다 이후 값이었는데, 왜 그런지 몰라서 한참 고민을 했습니다. offsets/0 에 있는 값이 DStream 의 untilOffset과 유사하다고 보시면 됩니다.

그리고 여기 offsets 안에 저장하는 값은 KafkaSourceOffset 과 KafkaSourceInitialOffsetWriter(HDFSMetadataLog) 을 보시면됩니다. RateLimit 가 설정이 되면 ./external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala 의 latestOffset 함수를 보면 설정에 따라서 다음 offset이 설정이 됩니다.

삽질(실험?) 과 소스를 보면서 이해한 Offset에 대한 아주 간략한 내용입니다. 위의 설명을 보면 아시겠지만, 위의 내용은 거의 Kafka 전용이고, 다른 것들은 내용이 다를 것으로 보입니다.

[입 개발] Kafka 와 Spark Structured Streaming 에서 checkpoint 에서 아주 과거의 Offset이 있으면 어떻게 동작할까?

최근에 Kafka와 Spark Structured Streaming 코드를 작성할 일이 있어서 작업을 하는데, 이상한 에러를 만났습니다. 그렇습니다. 여러분 저는 Spark Structured Streaming 이 처음이라… 흑 초초보의 슬픔이네요.

에러는 다음과 같았습니다. empty 인 DataFrame을 쓸 수 없다는 것인데요.

21/03/05 07:14:04 ERROR MicroBatchExecution: Query [id = c80a0c8a-b6fe-4961-ae6c-a79a880cb369, runId = e9993a73-6579-4cd9-9ab4-fdbd2995ae6d] terminated with error
org.apache.spark.sql.AnalysisException: 
Datasource does not support writing empty or nested empty schemas.
Please make sure the data schema has at least one or more column(s).
         ;
	at org.apache.spark.sql.execution.datasources.DataSource$.validateSchema(DataSource.scala:950)
	at org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:595)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:437)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:421)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:294)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:884)

분명히 카프카에는 데이터가 계속 들어오고 있는 상황인데, 이유를 알 수 없어서, 몇 시간을 날렸는데… 바꾼 다른 코드의 문제인가 싶어서 이것저것 바꿔보고, 컴파일 And 실행의 반복만…(플랫폼을 EMR에서 Databricks로 그리고 Spark 2.x 에서 3.0.1로 바꾸고 Scala 버전도 2.11 에서 2.12로 바뀌면서 이래저래 이슈가 좀 있었는데… 그 탓인줄…)

그러다가 우연히… CheckPoint를 지우고 나니… 정상적으로 실행이 잘 되는것이었습니다. 그 때 든 생각은 아, 뭔가 Offset에 문제가 있구나라는 생각이 들었습니다. 사실 이 문제가 발생하는 조건을 분석한 다음에 보면, 어떻게 보면 당연한 문제인데요. 문제가 되었던 상황은 같은 토픽으로 checkpoint 가 만들어졌는데, 제가 게을러서 Kafka Retention 기간이 지나서 다시 실행했던 것이었습니다.

즉 문제 상황은, 이미 checkpoint 에는 그 시점의 Offset 이 들어가 있는데, Kafka에서는 이미 Retention 정책으로 인해서 그 부분의 데이터는 없어진 상황인거죠.(checkpoint는 지정한 위치의 offsets 디렉토리 안에 batchId가 생기고 그 안에 있습니다.)

그런데 기본적으로 Spark에서 Kafka가 Offset 을 아주 과거로 주면, earlist 로 동작하게 되어있습니다. (소스는 보다가 아직 이해가 안되서 이걸 정확하게 설명할 정도로 보지는 못했습니다. 나중에 이해되면 추가할께요 관심이 있으시면, 다음 코드들을 보시면 좋을듯 합니다.





sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala

그런데 이게 CheckPoint 안에 들어가 있는 Offset 은 그냥 그대로 처리하는 것으로 보입니다. 그럼 결론부터 생각하면 이게 문제가 되는가? 라고 하면, 큰 문제는 아닙니다. 위에서 에러가 발생한 것은 데이터가 0인 DataFrame 을 쓸려고 해서 생긴문제입니다.

다시 문제로 돌아가서 그렇다면 위와 같은 상황에서 데이터는 어떻게 처리가 될까요? 일단 조건으로 백프레셔 설정은 없다고 가정합니다. 그리고 checkpoint 에 저장되어 있던 값이 10001 이고 Kafka Log Retention 에 의해서 중간에 10001~70000 까지가 사라지고,현재 earliest가 70001 이라고 가정합니다. 그리고 현재 latest 는 100000 이라고 하겠습니다. 그러면 다음과 같이 MicroBatch 가 발생하게 됩니다. 그러면 다음과 같이 처리가 되게 됩니다.

BatchIDCount비고
N0존재하지 않는 10001~70000 까지의 처리량
N+130000earliest 부터 현재 시간까지 들어온 양 30000
N+2들어오는 만큼원래 스트리밍에서 처리해야 할 양

다행인 부분은 첫 부분에서 0으로 처리가 되기는 하지만, 그 다음 부터는 다시 정상(?)적으로 커버가 되게 된다는 점입니다.

다시 결론부터 얘기하자면, 위의 상황은 정상적으로 생각하면 발생하면 안되는 상황입니다. Kafka Retention 정책보다 뒤에 처리 스트리밍 코드를 돌린다는 것은 결국 Data Loss 상황인데, 이건 어떻게 보면 심각한 문제입니다. 저 같은 경우에는 개발 과정에서, 휴가 다녀온 다음에 실행해서 발생한 이슈였습니다. 물론 지식이 부족하기도 하지만요. 그래도 덕분에 카프카와 Spark Structured Streaming 에서 offset 이 어떻게 처리되는지 살짝 속내를 볼 수 있었네요.(휴, 왜 저만 이런일들이 벌어지는지…)

[입 개발] HikariCP 는 왜 나를 물먹이는가…

HikariCP는 왜 저를 물먹이는걸까요?… 정답은 제가 못나서 입니다. 흑흑흑, 오늘은 HikariCP를 사용하다가 겪는 일반적인 상황과는 전혀 상관없이 그냥 제가 겪은… 삽질을 공유하려고 합니다.

HikariCP를 아주 특이하게 사용하고 있었습니다. 그런데, 문제는 제가 자바맹이라서 HikariCP에 대한 지식이 일도 없다는 거죠. 히카리는 빛이라는 일본어라는 것만 알고 있습니다.(역시 H2!!!)

코드를 간략하게 정리하면 다음과 같은 형태로 사용하고 있었습니다. write가 가능한 DB 유저와 read가 가능한 DB 유저로 구분해서 사용하고 있었습니다. 아래와 같은 구조는 지양하셔야 합니다. 몰랐던거라…

    val dbConf = new HikariConfig()
    dbConf.setUsername(writeUser)
    dbConf.setPassword(writeUserPass)
    dbConf.addDataSourceProperty("user", readUser)
    dbConf.addDataSourceProperty("password", readUserPass)

그래서 read만 필요한 경우에 다음과 같이 DataSourceProperties를 넘겨서 readUser로 사용하는 케이스 였습니다.

dbConf.getDataSourceProperties

이것 자체가 올바른 사용방법은 아닌데, 이 방법을 혹시나 쓰신다면 HikariCP 3.2.0 이상에서부터만 가능합니다.

그런데 이상하긴 하지만, 이렇게 아주 잘 사용하고 있었습니다. 그런데 갑자기 다른 플랫폼으로 이전을 했더니, 기존에 잘 쓰던 writeUser 대신에 readUser로 접속을 하면서, 기존에 사용하던 것들이 모두 실패하기 시작했습니다.

일단 다음과 같은 형태로 움직이기 시작했습니다.

  1. 각 DB 계정의 권한이 명확한지 확인!
  2. 특정 버전 사이의 코드 변경 확인
  3. 각 플랫폼의 Library 버전 확인

각 DB의 계정은 명확했습니다. 메뉴얼 한 테스트 및, show grants for ‘writeUser’ 를 해봐도 명확하게 기 설정된 정보가 그대로 존재했었습니다. 특정 버전 사이의 코드도 전과 바뀐 것이 없는 것을 확인하자. 이제 생각은 아 HikariCP의 버전이 다르겠구나라는 생각이 들었습니다.

생각이 거기까지 들자, 버전을 확인해보니, 제가 쓰던 버전은 3.4.2 였고, 새로운 플랫폼의 버전은 3.1.0이었습니다. 그런데, 자바맹이다 보니 이걸 당장 분석할 시간은 없고, 일단은 뭔가 꽁수로 처리를 하고, 해당 건은 나중에 자세히 알아봐야지 할려고 하다가… 좀 시간이 나서 HikariCP를 보기 시작했습니다. 사실 3.1.0 -> 3.4.2 사이에 엄청 큰 변화는 제가 본 코드에는 없었습니다.(사실, README도 안보고, 그냥 쭈루룩 넘어간…)

그런데 그러면 왜 이런 문제가 생긴것일까요? 이걸 제대로 이해하기 위해서는 일단 간단히 이 부분에 대한 HikariCP의 구조를 알아야 합니다. 위의 소스에서 보듯이 configuration을 저장하고 있는 것은 HikariConfig 입니다. 그리고 다음과 같이 DataSource를 만들어서 사용하고 있습니다.

val dbConf = HikariConfig()
val ds = new HikariDataSource(dbConf)

그리고 필요한 connection 은 다음과 같이 DataSource 의 getConnection()을 호출합니다.

val connection = ds.getConnection()

그럼 이제 실제 DataSource의 getConnection 이 어떻게 동작하는지 살펴보도록 하겠습니다. 우리가 볼 것은 HikariDataSource인걸 그 위의 소스에서 알 수 있습니다. 그런데 HikariDataSource의 getConnection 은 아주 복잡합니다.(제 기준에서요.)

   @Override
   public Connection getConnection() throws SQLException
   {
      if (isClosed()) {
         throw new SQLException("HikariDataSource " + this + " has been closed.");
      }

      if (fastPathPool != null) {
         return fastPathPool.getConnection();
      }

      // See http://en.wikipedia.org/wiki/Double-checked_locking#Usage_in_Java
      HikariPool result = pool;
      if (result == null) {
         synchronized (this) {
            result = pool;
            if (result == null) {
               validate();
               LOGGER.info("{} - Starting...", getPoolName());
               try {
                  pool = result = new HikariPool(this);
                  this.seal();
               }
               catch (PoolInitializationException pie) {
                  if (pie.getCause() instanceof SQLException) {
                     throw (SQLException) pie.getCause();
                  }
                  else {
                     throw pie;
                  }
               }
               LOGGER.info("{} - Start completed.", getPoolName());
            }
         }
      }

      return result.getConnection();
   }

히카리한 코드입니다. 자세한 내용은 모르겠고, 일단 pool 이 없으면 pool을 생성합니다. new HikariPool(this) 이라는 코드가 보이네요. 그런데 요 생성자를 따라가보면, 많은 것을 하고 있습니다. 일단 생성자를 따라가기 전에 먼저 HikariPool의 구조를 보시면 PoolBase 라는 것을 상속 받고 있습니다. 나머지는 일단 패스~~~~(모르는건, 아는척 넘어갑시다.)

public final class HikariPool extends PoolBase implements HikariPoolMXBean, IBagStateListener {
}

그리고 실제 HikariPool의 생성자를 살펴봅니다. 중요한 부분이 많지만 과감하게 날려버립니다.

public HikariPool(final HikariConfig config)
   {
      super(config);

      this.connectionBag = new ConcurrentBag<>(this);
      this.suspendResumeLock = config.isAllowPoolSuspension() ? new SuspendResumeLock() : SuspendResumeLock.FAUX_LOCK;

      this.houseKeepingExecutorService = initializeHouseKeepingExecutorService();

      checkFailFast();

      ......
   }

이제 checkFailFast() 라는 함수가 보입니다. 여기서 보시면 PoolEntry를 createPoolEntry()라는 함수로 생성하고, 성공하면, connectionBag에 넣어주고 끝입니다.

private void checkFailFast()
   {
      final long initializationTimeout = config.getInitializationFailTimeout();
      if (initializationTimeout < 0) {
         return;
      }

      final long startTime = currentTime();
      do {
         final PoolEntry poolEntry = createPoolEntry();
         if (poolEntry != null) {
            if (config.getMinimumIdle() > 0) {
               connectionBag.add(poolEntry);
               logger.debug("{} - Added connection {}", poolName, poolEntry.connection);
            }
            else {
               quietlyCloseConnection(poolEntry.close(), "(initialization check complete and minimumIdle is zero)");
            }

            return;
         }

         if (getLastConnectionFailure() instanceof ConnectionSetupException) {
            throwPoolInitializationException(getLastConnectionFailure().getCause());
         }

         quietlySleep(SECONDS.toMillis(1));
      } while (elapsedMillis(startTime) < initializationTimeout);

      if (initializationTimeout > 0) {
         throwPoolInitializationException(getLastConnectionFailure());
      }
   }

이 뒤에서 뭔가 동작이 일어나면 복잡하겠지만, 일단 createPoolEntry() 함수를 봅니다. 전에 절 괴롭혔던(전, 항상 괴롭힘을 당합니다. MaxLifeTime 값을 이용하는 것이 보이지만, 우리의 관심은 그쪽은 아닙니다.

   private PoolEntry createPoolEntry()
   {
      try {
         final PoolEntry poolEntry = newPoolEntry();

         final long maxLifetime = config.getMaxLifetime();
         if (maxLifetime > 0) {
            // variance up to 2.5% of the maxlifetime
            final long variance = maxLifetime > 10_000 ? ThreadLocalRandom.current().nextLong( maxLifetime / 40 ) : 0;
            final long lifetime = maxLifetime - variance;
            poolEntry.setFutureEol(houseKeepingExecutorService.schedule(
               () -> {
                  if (softEvictConnection(poolEntry, "(connection has passed maxLifetime)", false /* not owner */)) {
                     addBagItem(connectionBag.getWaitingThreadCount());
                  }
               },
               lifetime, MILLISECONDS));
         }

         return poolEntry;
      }
      catch (ConnectionSetupException e) {
         if (poolState == POOL_NORMAL) { // we check POOL_NORMAL to avoid a flood of messages if shutdown() is running concurrently
            logger.error("{} - Error thrown while acquiring connection from data source", poolName, e.getCause());
            lastConnectionFailure.set(e);
         }
      }
      catch (Exception e) {
         if (poolState == POOL_NORMAL) { // we check POOL_NORMAL to avoid a flood of messages if shutdown() is running concurrently
            logger.debug("{} - Cannot acquire connection from data source", poolName, e);
         }
      }

      return null;
   }

다시 newPoolEntry() 함수를 따라갑니다. 좀 딥하게 내려가지만, 일차선 도로입니다. 빠질 곳이 없지요.

   PoolEntry newPoolEntry() throws Exception
   {
      return new PoolEntry(newConnection(), this, isReadOnly, isAutoCommit);
   }

newPoolEntry는 쉽습니다. newConnection()만 보면 되겠네요. newConnection() 함수에서는 밑에 connection 부분만 보시면 됩니다. username 과 password를 config(HikariConfig 입니다.) 에서 가져와서, username == null 이면, dataSource의 getConnection() 을 없으면 dataSource의 getConnection(username, password)를 호출합니다.

   private Connection newConnection() throws Exception
   {
      final long start = currentTime();

      Connection connection = null;
      try {
         String username = config.getUsername();
         String password = config.getPassword();

         connection = (username == null) ? dataSource.getConnection() : dataSource.getConnection(username, password);
         if (connection == null) {
            throw new SQLTransientConnectionException("DataSource returned null unexpectedly");
         }

         setupConnection(connection);
         lastConnectionFailure.set(null);
         return connection;
      }
      catch (Exception e) {
         if (connection != null) {
            quietlyCloseConnection(connection, "(Failed to create/setup connection)");
         }
         else if (getLastConnectionFailure() == null) {
            logger.debug("{} - Failed to create/setup connection: {}", poolName, e.getMessage());
         }

         lastConnectionFailure.set(e);
         throw e;
      }
      finally {
         // tracker will be null during failFast check
         if (metricsTracker != null) {
            metricsTracker.recordConnectionCreated(elapsedMillis(start));
         }
      }
   }

그럼 이 dataSource는 어디서 온 것일까요? 정체는 무엇일까요? 그걸 확인할려면 아까 HikariPool의 생성자에 있던 super(config); 구문을 따라가야 합니다. HikariPool은 뭘 상속 받았죠? 네! 그렇습니다. PoolBase 로 가봅시다.(그런데 사실 아까 newPoolEntry 부터 이미 PoolBase 였다는 사실은 안 비밀입니다.)

PoolBase(final HikariConfig config)
   {
      this.config = config;

      ......
      initializeDataSource();
   }

PoolBase의 생성자는 크게 딴게 없이 HikariConfig을 config으로 저장하고 initializeDataSource()를 호출하게 됩니다. 이 함수가 재미있는데, driverClass가 설정되었는지 jdbcUrl이 설정되었는지에 따라서 다르게 동작하는데. 일단 처음에는 DataSource가 내부적으로 없기 때문에 생성해야 합니다.

   private void initializeDataSource()
   {
      final String jdbcUrl = config.getJdbcUrl();
      final String username = config.getUsername();
      final String password = config.getPassword();
      final String dsClassName = config.getDataSourceClassName();
      final String driverClassName = config.getDriverClassName();
      final String dataSourceJNDI = config.getDataSourceJNDI();
      final Properties dataSourceProperties = config.getDataSourceProperties();

      DataSource ds = config.getDataSource();
      if (dsClassName != null && ds == null) {
         ds = createInstance(dsClassName, DataSource.class);
         PropertyElf.setTargetFromProperties(ds, dataSourceProperties);
      }
      else if (jdbcUrl != null && ds == null) {
         ds = new DriverDataSource(jdbcUrl, driverClassName, dataSourceProperties, username, password);
      }
      else if (dataSourceJNDI != null && ds == null) {
         try {
            InitialContext ic = new InitialContext();
            ds = (DataSource) ic.lookup(dataSourceJNDI);
         } catch (NamingException e) {
            throw new PoolInitializationException(e);
         }
      }

      if (ds != null) {
         setLoginTimeout(ds);
         createNetworkTimeoutExecutor(ds, dsClassName, jdbcUrl);
      }

      this.dataSource = ds;
   }

ds를 생성하는 DriverDataSource 부분을 확인해 봅시다. 파라매터로 config 의 username 과 password가 넘어갑니다.

   public DriverDataSource(String jdbcUrl, String driverClassName, Properties properties, String username, String password)
   {
      this.jdbcUrl = jdbcUrl;
      this.driverProperties = new Properties();

      for (Entry<Object, Object> entry : properties.entrySet()) {
         driverProperties.setProperty(entry.getKey().toString(), entry.getValue().toString());
      }

      if (username != null) {
         driverProperties.put(USER, driverProperties.getProperty("user", username));
      }
      if (password != null) {
         driverProperties.put(PASSWORD, driverProperties.getProperty("password", password));
      }

      ......
   }

중요하지 않은 부분을 다 날리고 보면, 저는 처음에 버그인줄 알았는데, username, password가 null 이 아닐 때, properties 에 “user”, 와 “password”가 있다면, 가지고 있는 username, password를 디폴트 값으로 셋팅합니다. 즉 property에 “user”, “password”가 설정되어 있다면, 이걸 무조건 우선적으로 쓰게 됩니다. 네네, 즉 둘 다 셋팅이 되면, 여기서 driverProperties는 값을 덮어씌우게 됩니다. 올레!!!, 그렇지 이 부분입니다!!! 라고 말하고 싶지만… 이 코드는 3.1.0 도 동일합니다. 원래 코드를 한번 다시 살펴보시죠.

    val dbConf = new HikariConfig()
    dbConf.setUsername(writeUser)
    dbConf.setPassword(writeUserPass)
    dbConf.addDataSourceProperty("user", readUser)
    dbConf.addDataSourceProperty("password", readUserPass)

그렇습니다. 우리의 코드는 처음부터 둘 다 쓰고 있었습니다. 그런데 위의 부분을 보면… 무조건 readUser, readUserPass 가 writeUser, writeUserPass로 설정된 값을 덮어써야만 합니다. 즉, 이전에도 동작을 안해야 정상이라는 겁니다. 여기서 사실 1차 좌절을 하게 됩니다. “왜 안되지!!!” 보다 더 무서운 “왜 되지!!!” 만나게 된겁니다. 저도 멘붕에 다시 한번 빠집니다.

그런데 여기서 아까 살짝 놓치고 지나간 부분이 생각났습니다. 아마도 여러분들은 다 맞추셨을꺼 같습니다. 전 초초초초초초초보 개발자라 T.T

혹시 아까 newConnection() 함수가 기억나시나요? 다시 가져오면 아래와 같습니다. 실제로 driverProperties가 설정되는 것은 위와 같지만… 우리가 사용하는 것은 아래 함수입니다.

connection = (username == null) ? dataSource.getConnection() : dataSource.getConnection(username, password);

오오 이 부분이 다르지 않을까 하고 3.1.0 소스를 봤더니!!! 당연하게도!!!, 똑 같습니다.(어어어, 이게 아닌데…)

음… 음.. 음… 다시 또 2차 좌절을 하는데…

dataSource 의 getConnection 함수가 다릅니다. 덜덜덜 아까 우리가 봤던 dataSource 는 DriverDataSource 입니다.

3.4.0 – cloned Properties를 만들고 여기에 user 와 password 를 넘겨 받은 값으로 셋팅합니다. null 이면 그냥 이전값 사용.

   public Connection getConnection(final String username, final String password) throws SQLException
   {
      final Properties cloned = (Properties) driverProperties.clone();
      if (username != null) {
         cloned.put("user", username);
         if (cloned.containsKey("username")) {
            cloned.put("username", username);
         }
      }
      if (password != null) {
         cloned.put("password", password);
      }

      return driver.connect(jdbcUrl, cloned);
   }

3.1.0 – 그냥 원래의 getConnection()을 그대로 호출합니다. 이건 뭥미!!!

   public Connection getConnection(String username, String password) throws SQLException
   {
      return getConnection();
   }

실제의 getConnection() 함수는 둘다 다음과 같이 동일합니다. 아까 우리가 만든 driverProperties 를 사용하고 있습니다.

   public Connection getConnection() throws SQLException
   {
      return driver.connect(jdbcUrl, driverProperties);
   }

결론적으로, getConnection함수에서 username, password를 처리하는 부분이 바뀌었기 때문에 3.4.0 에서는 의도한 대로 둘 다 동작을 했지만, 3.1.0 에서는 제대로 동작하지 않았던 것입니다. 자세한 패치는 https://github.com/brettwooldridge/HikariCP/commit/851e2d4592b52a9c367ada2c76f013b1d4e20ac3 를 보시면 됩니다.

코드를 보면 HikariCP의 개발자는 제가 사용한 것과 같은 방법을 쓸 거라고는 생각은 안했고, 이게 맞는 방향이 아니었던 것입니다. 흑… 역시 모르면 삽질하네요 T.T

[책 리뷰] 실전 대비 C 알고리즘 인터뷰 – 길벗

해당 리뷰는, 길벗 전문서 리뷰어로 당첨되어서 진행된 리뷰입니다.

최근 들어서 “코딩 인터뷰” 를 보는 회사들이 많이 있습니다. 그리고 이를 준비할 수 있는 인터넷 사이트들도 늘어나고 있습니다. 재미있는건 이런 걸 공부할 수 있는 사이트들이, 해당 플랫폼도 제공하는 경우가 많다는 거죠.

국내에서는 프르그래머스(https://programmers.co.kr/), 백준(https://www.acmicpc.net/) 등이 유명하고, 해외에서는 leetcode(https://leetcode.com/), HackerRank(https://www.hackerrank.com/) 등이 굉장히 유명한 사이트입니다.

제목에서 알다시피, 해당 책은 C로 알고리즘 테스트를 준비하는 책입니다. 개인적으로, 알고리즘 테스트는 c/c++/java 등으로 치르는 것보다는 python 등의 좀 더 코딩하기 편한 언어를 사용하는 것을 더 좋아하긴합니다.(흑, 갑자기 떠오르는 c로 해서 탈락했던 아픈 기억이… 그렇다고 해서 python으로 해서 붙었느냐고 물어보신다면…. 쩝…쩝…쩝…)

제가 알고리즘 테스트를 많이 풀어본것도 아니고, 그런 시험을 치는 회사에 붙어본 경험이 많지도 않지만…(떨어진 경험은… 많…) 한가지 재미있는 것은, 제가 위에서 코딩 인터뷰는 python 등의 언어를 쓰는 걸 권장한다고 말하긴 하지만, 자신이 자주 사용하는 언어에서 문제를 잘 풀어보면 확실히 유리하다는 것입니다.

그런데, 단순히 문제를 보고 답을 외우면, 쉽게 풀 수 있을가요? 이 책을 보면서 leetcode 문제를 다시 한문제를 풀어봤는데, 세번 실패하고 네번째에 성공했습니다. 그런데, 아주 재미있는 사실은… 그 전에 시도한 기록이 있고, 그 때는 한번에 풀었던…(이봐… 점점 퇴화도고 있는건가!!!) 이게 easy 난이도의 문제였는데, 반대로 그 다음에 푼 medium 난이도는 한번에 쉽게 풀었습니다.(누구냐!!! 넌!!!)

장점

“실전 대비 C 알고리즘 인터뷰” 를 읽으면서 가장 좋았던 부분은… 문제를 다양한 방법으로 푼다는 것입니다. 최소 두 가지의 풀이법을 보여주고, 보통은 세 가지의 풀이법을 보여주고 있습니다. 그리고 거기에 대해 항상 시간복잡도와 공간복잡도를 보여주고 있다는 점입니다. 흔히들 문제를 풀면서 착각하는 경우가, 시간복잡도가 가장 중요하다고 생각하는데, 사실 중요한 부분은, “요구사항” 을 맞추는 것입니다.

예를 들어, 문제에 따라, 시간 복잡도 보다, 공간복잡도를 중요시 하는 요구 사항이 있을 수 있고, 어떨때는, 이 두 가지 보다, 데이터의 기존 순서를 유지하는게 중요할 수도 있습니다.(실제 실무에서 이런 상황이 요구되는 경우가 있습니다.) 그래서 문제를 하나의 방법으로 풀었다고 해서 좋은 것이 아니라, 다양한 방법으로 풀어보는 것이 중요합니다.

책을 보다보면, 코딩 테스트 같은 종류의 문제도 있고, 용량이 줄어들면, 실제 스택의 크기를 두 배로 줄여라, 용량이 늘어나면 두 배로 늘려라 등의 그냥 일반적으로 코딩하다가 생기는 문제같은 것들에 대한것들도 꽤 있습니다.(이 방법은 실제로 여러군데서 사용하기도 합니다.)

단점

엄청난 단점은 아니지만, 문제 자체가 막 독특하다기 보다는, 구성 자체는, 다른 알고리즘 관련 책들과 마찬가지로 다양한 자료구조와 알고리즘을 다 다루고 있습니다.(정렬/트리/그래프/DP/문자열 등), 좀 어려운 문제(기반 지식이 필요한 문제)들은 관련 지식들이 많이 설명이 되어있고, 그렇지 않은 문제들은 그냥 해답만 있기도 합니다.(쉬운 문제들은 그냥 시간/공간 복잡도나 여러가지 해법 없이 그냥 단번에 풀이만 있는 경우도 뒤에는 종종 잇습니다.)

결론

워낙 문제가 많아서 문제를 전부 풀지도 못하고, 몇몇 문제들 밖에 풀어보지는 못했지만, 문제들이 평소에 본인이 생각하지 못하던 부분들을 계속 건들이게 해주는게 좋은듯 합니다.

언어와 상관없이 그 내용을 잘 이해하는 것이 좋기 때문에, 잘 이해하면 큰 도움이 될듯합니다.

[입 개발] AWS S3 503 Slow Down과 AWS S3 Versioning은 관계가 있다.

S3 버킷에서 파티션된 접두사마다 초당 3,500개의 PUT/COPY/POST/DELETE 및 5,500개의 GET/HEAD 요청을 전송할 수 있습니다. 그리고 이 한계를 넘으면 503 Slow Down 이 발생하면서 에러가 리턴되는데……

작업을 하다보면 위의 이슈를 만나게 되는 경우가 종종 생깁니다. 그런데 AWS S3 Versioning에 따라서 해당 에러가 자주 발생할 수도 있어서 주의가 필요합니다.

Spark 작업등을 하면 특정 작업을 하는 디렉토리의 상위 폴더에 _SUCCESS 가 생기면서 해당 버전이 계속 쌓이면서 문제가 발생할 수 있습니다.

이런 경우 LifeCycle을 잘 적용하거나, 작업 관련 prefix나 bucket을 분리하는게 좋을듯 합니다. 아예 버킷을 분리해서 version을 분리하는 것도 방법인듯 합니다.

https://www.waitingforcode.com/apache-spark-sql/apache-spark-success-anatomy/read

[입 개발] Redis 장애 종류 정리

간단하게 자주 발생하는 Redis 장애를 정리해봅니다. 뭐, 크게 아래의 분류를 넘어가는 일이 거의 없던걸로 기억합니다.

장애 분류소분류내용
메모리메모리 RSS 관리Redis는 In-Memory 솔루션이기 때문에 실제 물리 메모리보다 많은 메모리를 사용하게 되면, Swap으로 인해서 성능이 극심하게 떨어지게 된다. 즉 RSS 메모리 관리를 잘해야만 한다
설정기본 설정 사용* Redis는 기본적으로 기본 설정을 사용할 경우 실 서비스에서 RDB사용으로 과도한 메모리 디스크 저장으로 인해서 많은 부하를 가지게 된다. 필수로 SAVE 옵션을 끄고, 필요하다면 수동 백업이 필요하다
싱글스레드과도한 Value 크기* Redis는 싱글 스레드이기 때문에 하나의 명령이 긴 시간을 차지하면 결국 Redis 성능 저하로 이어진다.
* Hgetall, hvals 등의 collection의 데이터를 과도하게 많이 가져온다거나.
* 몇 MB 이상의 Key나 Value를 사용할 경우 문제가 발생한다.
O(N) 명령의 사용Keys나 flushdb/flushall, 큰 크기의 collection을 지우는 등의 문제 역시, Redis의 성능을 떨어트린다.

[입 개발] Spark에서 Parquet 파일 Custom Schema 로 읽어들이기

최근(?)에 다음과 같은 에러를 많이 보았습니다.

scala> val df = spark.read.parquest("s3://bucket-name/path/")
org.apache.spark.sql.AnalysisException: Parquet type not supported: INT32 (UINT_8);
    at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.typeNotSupported$1(ParquetSchemaConverter.scala:101)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convertPrimitiveField(ParquetSchemaConverter.scala:137)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convertField(ParquetSchemaConverter.scala:89)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter$$anonfun$1.apply(ParquetSchemaConverter.scala:68)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter$$anonfun$1.apply(ParquetSchemaConverter.scala:65)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at 

에러를 보면 Parquet type not supported: INT32 (UINT_8) 로 해당 타입을 지원하지 않는 다는 뜻입니다.

결론 부터 말하자면, Spark에서는 기본적으로 Unsigned Integer를 지원하지 않습니다. 그래서 만약 다음과 같은 Schema 의 Parquet 파일이 있다면 해당 파일을 읽는데 위와 같은 에러를 내게 됩니다. (재밌는건 Athena(아마도 Presto) 에서는 Unsigned Integer도 잘 읽어드립니다.)

Column NameColumn Type
value1INT64(LongType)
value2StringType
value3UINT8(tinyint(1))

결론부터 말하자면, 이런 경우 spark.read.parquet 만 하신다면 value3를 읽는데, 실패하지만, value3 가 필요없다면 Custom Scheme 를 주면 읽을 수 있습니다. 물론, 처음부터 Unsigned Type을 안쓰면 되지 않느냐!!! 라는 멋진 의견을 주실 수 있는데, DB를 덤프해서 parquet으로 저장하다보면, 자신도 모르게 Unsigned Type이 함께 딸려들어갈 수 있습니다. 참고로 AWS DMS(Data Migration Service)는 DB의 내용을 자동으로 덤프해서 parquet으로 만들어주는 멋진 기능이 있는데, white list 기반이 아닌 black list 기반입니다. 즉, 등록한 컬럼들만 덤프하는 것이 아니라, 명시적으로 제외한 컬럼들만 제외되서 덤프되기 때문에, 자신이 모르게 얼마든지 새로운 컬럼이 추가될 수 있습니다. 먼저 정답부터 드린다면…

위와 같은 경우에 schema 메서드를 이용해서 아주 쉽게 읽을 수 있습니다. (다만 해당 컬럼은 포기하셔야…) schema는 두 가지 형태로 제공이 가능합니다. 두 가지 방법 다 잘 동작합니다. 자세한 것은 다음 페이지를 읽어봅시다.(https://spark.apache.org/docs/2.4.0/api/scala/index.html#org.apache.spark.sql.DataFrameReader)

spark.read.schema("value1 long, value2 STRING").parquet(path)
val customSchema = StructType(Array(
    StructField("value1", LongType, true),
    StructField("value2", StringType, true)
    ))

spark.read.schema(customSchema).parquet(path)

자 그럼 왜 이렇게 동작하는지를 알아보시죠. 그런데 사실 Parquet 은 원래부터, 특정 컬럼들만 읽어들이는 기능을 제공하고 있습니다.(http://engineering.vcnc.co.kr/2018/05/parquet-and-spark/)

여기를 보시면 https://parquet.apache.org/documentation/latest/ Parquet의 데이터 구조가 특정 컬럼들만 읽을 수 있는 Columnar 형식이라는 것을 알 수 있습니다.

4-byte magic number "PAR1" 
<Column 1 Chunk 1 + Column Metadata> 
<Column 2 Chunk 1 + Column Metadata> 
... 
<Column N Chunk 1 + Column Metadata> 
<Column 1 Chunk 2 + Column Metadata> 
<Column 2 Chunk 2 + Column Metadata> 
... 
<Column N Chunk 2 + Column Metadata>
... 
<Column 1 Chunk M + Column Metadata> 
<Column 2 Chunk M + Column Metadata> 
... 
<Column N Chunk M + Column Metadata> 
File Metadata 4-byte length in bytes of file metadata 
4-byte magic number "PAR1"

다시 처음으로 돌아가서 아까의 에러메시지를 살펴보시면, ParquetSchemaConverter.scala 라는 파일명이 나옵니다. 그럼 이제 해당 파일을 찾아봅시다. 해당 파일을 보면 대략 convertPrimitiveField 함수에서 에러가 난 것을 쉽게 찾을 수 있습니다.

  private def convertPrimitiveField(field: PrimitiveType): DataType = {
    val typeName = field.getPrimitiveTypeName
    val originalType = field.getOriginalType

    def typeString =
      if (originalType == null) s"$typeName" else s"$typeName ($originalType)"

    def typeNotSupported() =
      throw new AnalysisException(s"Parquet type not supported: $typeString")

    def typeNotImplemented() =
      throw new AnalysisException(s"Parquet type not yet supported: $typeString")

    def illegalType() =
      throw new AnalysisException(s"Illegal Parquet type: $typeString")

    // When maxPrecision = -1, we skip precision range check, and always respect the precision
    // specified in field.getDecimalMetadata.  This is useful when interpreting decimal types stored
    // as binaries with variable lengths.
    def makeDecimalType(maxPrecision: Int = -1): DecimalType = {
      val precision = field.getDecimalMetadata.getPrecision
      val scale = field.getDecimalMetadata.getScale

      ParquetSchemaConverter.checkConversionRequirement(
        maxPrecision == -1 || 1 <= precision && precision <= maxPrecision,
        s"Invalid decimal precision: $typeName cannot store $precision digits (max $maxPrecision)")

      DecimalType(precision, scale)
    }

    typeName match {
      case BOOLEAN => BooleanType

      case FLOAT => FloatType

      case DOUBLE => DoubleType

      case INT32 =>
        originalType match {
          case INT_8 => ByteType
          case INT_16 => ShortType
          case INT_32 | null => IntegerType
          case DATE => DateType
          case DECIMAL => makeDecimalType(Decimal.MAX_INT_DIGITS)
          case UINT_8 => typeNotSupported()
          case UINT_16 => typeNotSupported()
          case UINT_32 => typeNotSupported()
          case TIME_MILLIS => typeNotImplemented()
          case _ => illegalType()
        }

      case INT64 =>
        originalType match {
          case INT_64 | null => LongType
          case DECIMAL => makeDecimalType(Decimal.MAX_LONG_DIGITS)
          case UINT_64 => typeNotSupported()
          case TIMESTAMP_MICROS => TimestampType
          case TIMESTAMP_MILLIS => TimestampType
          case _ => illegalType()
        }

      case INT96 =>
        ParquetSchemaConverter.checkConversionRequirement(
          assumeInt96IsTimestamp,
          "INT96 is not supported unless it's interpreted as timestamp. " +
            s"Please try to set ${SQLConf.PARQUET_INT96_AS_TIMESTAMP.key} to true.")
        TimestampType

      case BINARY =>
        originalType match {
          case UTF8 | ENUM | JSON => StringType
          case null if assumeBinaryIsString => StringType
          case null => BinaryType
          case BSON => BinaryType
          case DECIMAL => makeDecimalType()
          case _ => illegalType()
        }

      case FIXED_LEN_BYTE_ARRAY =>
        originalType match {
          case DECIMAL => makeDecimalType(Decimal.maxPrecisionForBytes(field.getTypeLength))
          case INTERVAL => typeNotImplemented()
          case _ => illegalType()
        }

      case _ => illegalType()
    }
  }
 그럼 이제 저 함수를 어디서 부르는지 확인해 보면 될 것 같습니다. 그런데 convertPrimitiveField 함수는 convertField 함수에서 부르고 있습니다. 그리고 Parameter로 Type이 함께 넘어옵니다.
  def convertField(parquetType: Type): DataType = parquetType match {
    case t: PrimitiveType => convertPrimitiveField(t)
    case t: GroupType => convertGroupField(t.asGroupType())
  }

그리고 convertField 다시 convert 라는 함수에서 불려지고 있습니다. 코드를 보면 아시겠지만 파라매터로 GroupType 으로 Schema가 넘어오고 있고, map 후에 각각의 컬럼을 convertField로 처리한다는 것을 쉽게(?) 알 수 있습니다. 그럼 이제 convert를 부르는 곳을 확인해봅시다.

  private def convert(parquetSchema: GroupType): StructType = {
    val fields = parquetSchema.getFields.asScala.map { field =>
      field.getRepetition match {
        case OPTIONAL =>
          StructField(field.getName, convertField(field), nullable = true)

        case REQUIRED =>
          StructField(field.getName, convertField(field), nullable = false)

        case REPEATED =>
          // A repeated field that is neither contained by a `LIST`- or `MAP`-annotated group nor
          // annotated by `LIST` or `MAP` should be interpreted as a required list of required
          // elements where the element type is the type of the field.
          val arrayType = ArrayType(convertField(field), containsNull = false)
          StructField(field.getName, arrayType, nullable = false)
      }
    }

    StructType(fields.toSeq)
  }

그리고 다시 convert 함수는 readSchemaFromFooter라는 함수에서 넘겨주는 fileMetaData를 사용합니다.

  def readSchemaFromFooter(
      footer: Footer, converter: ParquetToSparkSchemaConverter): StructType = {
    val fileMetaData = footer.getParquetMetadata.getFileMetaData
    fileMetaData
      .getKeyValueMetaData
      .asScala.toMap
      .get(ParquetReadSupport.SPARK_METADATA_KEY)
      .flatMap(deserializeSchemaString)
      .getOrElse(converter.convert(fileMetaData.getSchema))
  }

이렇게 주룩주룩 고구마 줄기 처럼 다시 따라가기 전에 처음으로 돌아가서 젤 먼저 호출해서 schema 메서드를 살펴봅시다. 아래와 같이 schema 는 userSpecifiedSchema 로 저장됩니다. (이 이름을 잘 기억해 둡시다. 우리가 준 Custom Schema를 담고 있습니다.)

  def schema(schema: StructType): DataFrameReader = {
    this.userSpecifiedSchema = Option(schema)
    this
  }

  def schema(schemaString: String): DataFrameReader = {
    this.userSpecifiedSchema = Option(StructType.fromDDL(schemaString))
    this
  }

그리고 다시 parquet 함수는 load 함수를 부르고 이것은 다시 loadV1Source를 호출하게 됩니다. 뭔가 userSpecifiedSchema 값을 넘기고 있는 것을 볼 수 있습니다.

  private def loadV1Source(paths: String*) = {
    // Code path for data source v1.
    sparkSession.baseRelationToDataFrame(
      DataSource.apply(
        sparkSession,
        paths = paths,
        userSpecifiedSchema = userSpecifiedSchema,
        className = source,
        options = extraOptions.toMap).resolveRelation())
  }

여기서 다시 DataSource의 resolveRelation 를 호출하고 그 안에서, 다시 getOrInferFileFormatSchema 를 호출합니다. 그리고 이 안에서 다시userSpecifiedSchema 를 건드립니다. 결론적으로 dataSchema 는 userSpecifiedSchema 가 있으면 그걸 그대로 사용하고, 없으면 format 에 맞는 inferSchema를 하면서 해당 파일내의 Schema를 가져오게 되는데, 여기서 아까 말한 convert가 호출되게 됩니다.

    val dataSchema = userSpecifiedSchema.map { schema =>
      StructType(schema.filterNot(f => partitionSchema.exists(p => equality(p.name, f.name))))
    }.orElse {
      format.inferSchema(
        sparkSession,
        caseInsensitiveOptions,
        tempFileIndex.allFiles())
    }.getOrElse {
      throw new AnalysisException(
        s"Unable to infer schema for $format. It must be specified manually.")
    }

그냥 요약하면 schema 메서도를 쓰면, 내부적으로 Parquet의 전체 Schema를 읽지 않고, 주어진 Schema로 읽어오기 때문에 문제가 없다라고 보시면 될 것 같습니다. 이게 무슨 소리냐!!!

이 글은, 제가 Parquet에서 Unsigned Integer를 읽으면서 에러가 난다라고 하자, 아 그거 될텐데요 하면서 순식간에 Spark 코드를 찾아서 이 과정을 알려주신 옆자리 동료님께 바칩니다.

[책 리뷰] 파이썬 알고리즘 인터뷰

개인적으로 먼저 고백하자면, 나는 알고리즘 인터뷰를 그렇게 좋아하지 않는다.(내가 좋은 결과를 본적이 없어서 그렇다 크하하하…) 2012년에 아마존 RDS팀과 트위터 캐시팀과 면접을 본적이 있는데, 아마존은 전화면접에서…(생애 첫 해외 면접이었던…), 트위터는 On-site 인터뷰(본사에서 직원들과 하루종일 보게 되는… 이 날, 5번의 면접과 1번의 점심 면접을 본…) 를 샌프란시스코에서 보고 딱 떨어졌다.

그때는 사실 해외의 IT면접이라는게 이렇게 알고리즘 면접이라는 걸 몰랐던 시기이기도 하고… 면접이라는 압박감에 떨기도 했는데…

사람들마다 다른 생각이 있겠지만, 개인적으로 나는 알고리즘 인터뷰라는 건, 일종의 학습이라고 생각한다. 즉, 누군가 천재라서 모르는 문제를 딱하고 한번에 풀어내는게 아니라, 해당 분야에 대해서 약간의 경험이 있는게 좋다라는 것, 아는 분 중에 구글에 가신 한분은 구글은 400문제 풀면 붙고, 그것보다 덜 풀면 떨어진다고 하셨는데, 입사 동기들도 다 400문제 이상씩 풀고 들어왔다라는 얘기를 해주셨는데(리트코드, 해커랭크 같은 사이트들) 많이 풀면서 어떤 방식들을 사용하는지, 기저에 깔려있는 내용들을 이해해야 한다.

그리고 실제 풀이에 들어가면, 개인적으로는 C/C++/JAVA 보다는 좀 더 생각을 표현하기 쉬운 언어를 좋아하는데, 파이썬 같은 경우도 대부분의 문제 풀이 사이트나, 회사들에서도 알고리즘 인터뷰에서 허용하는 언어이다. 튜플이나, 맵, 리스트 같은걸 표현하기도 쉽고…

또한, 알고리즘 인터뷰가 무조건 맞춰야 유리하기는 하지만, 갑자기 뿅 하고 정답만을 말한다면 도리어 떨어지기 더 쉬울 수도 있다, 내가 이 문제를 어떻게 이해했는지 설명하고, 모르면 힌트를 요구해도 된다. 사실 알고리즘 인터뷰는 문제를 푸는 과정에서 지원자가 어떻게 자신의 내용을 표현하고, 커뮤니케이션을 하는지를 보는 것도 큰 일부중에 하나이기 때문에, 힌트도 주고, 그 힌트에서 정답을 향해 잘 가는 모습을 보면 더 좋아한다. 그러나, 당연히 기본적인 지식을 잘 알고 이를 풀 수 있는 모습을 보여주는게 첫번째이므로, 핵심 내용을 잘 알아야 한다.

개인적으로 이 책을 보면서, 신기했던 부분은 문제 풀이가 거의 대부분 크게 2가지 형태로 되어 있다. 일반적으로 문제를 해결하는 방법과 파이썬의 특성을 이용해서 이용하는 방법이다. 개인적으로 알고리즘 인터뷰면 일반적으로 문제를 해결해야 하는 방법으로 풀어야 하지 않을까 생각을 하지만, 파이썬의 특성을 이용해서 해결 하는 방법은 훨씬 더 간결하다.(실무에서 구현해야 한다면 파이썬의 특성을 이용하는 방법을 쓰는게 더 좋다.) 그리고 거기에 대한 성능 측정도 함께 해두었는데…

초반에 나오는 문자열 뒤집기 같은 경우는 이걸 출력만 하면 되는지, 아니면, 실제 값을 바꾼채로 전달해야 되는지에 따라서, 뒤에서 부터 그냥 출력하는 방법과, 처음과 끝에서 swap을 이용해서 처리할 수 있는 방법이 있는데… 파이썬에서 제공하는 방법을 쓰면 그냥 reverse() 함수를 쓰면된다. reverse 함수는 리스트에서만 적용할 수 있는데… 스트링은 [::-1] 이런 방식을 통해서도 가능하다.(나도 이 문법은 이번에 처음… 배운… 크…)

재미난 건 이런 방식들이 리트코드에서 되는지 안되는지 이런 부분도 나온다는 것… 상길님의 엄청난 노력이 눈에 보이는듯하다.

책 전체에서 지금까지의 알고리즘 서적, 또는 알고리즘 인터뷰 서적과는 조금 다르게 만들고 싶어하시는게 보이는데, 예전에 PHP스쿨로 유명하신 정진호님의 일러스트도 읽는데 굉장히 도움을 준다.

당연히 알고리즘 서적이기 때문에, 쉽게 적었다고 해서 읽기에도 무조건 쉬운책이 아니라서, 이해를 하려고 노력을 많이 해야 할듯 하다. 문제를 하나하나 이해하면서 풀어본다면, “파이썬 알고리즘 인터뷰” 라는 책은 알고리즘 인터뷰를 해야 하는 상황이거나, 그렇지 않은 상황이라도, 파이썬을 쓴다면 파이썬 답게 문제를 해결하는 방법을 알려주는 좋은 길잡이가 될것이다.

어떤 언어를 쓴다면, 해당 언어를 잘 알고 써야 한다는 상길님의 말은… 나도 꽤 동감하는 편이다. 물론 모든 언어를 내부까지 잘 알고 쓸 수는 없겠지만…(입금 드리븐 으로 일해야 할 경우도 있으니…) 내가 뭔가를 공부하고자 하는 언어라면… 그 언어에 맞게 짜는 건 꼭 필요한 일이 아닌가 싶다.

[입개발] Druid에서 transform 시 알아야 할 팁.

Apache Druid 는 빠른 속도로 데이터를 Aggregation 할 수 있는 툴이지만, 처음 사용하면 이것저것 미묘하게 힘든 부분들이 있다.

다음과 같은 걸 기억해두자. transform에서 오래걸렸던 부분은 컬럼명은 더블 쿼터로 “__time” 이런식으로 그리고 날짜 포맷부분등은 리터럴이라 쿼터로 ‘yyyy-MM-dd’ 형식으로 감싸야 한다는 것이다. TimeZone 도 리터럴이다.


{
"type": "index_parallel",
"spec": {
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "s3",
"prefixes": [
"s3://test-bucket/path1/path2/2020-06-30/"
]
},
"inputFormat": {
"type": "parquet"
},
"appendToExisting": true
},
"tuningConfig": {
"type": "index_parallel",
"partitionsSpec": {
"type": "dynamic"
},
"maxNumConcurrentSubTasks": 4
},
"dataSchema": {
"dataSource": "test_druid_log",
"granularitySpec": {
"type": "uniform",
"queryGranularity": "HOUR",
"rollup": true,
"segmentGranularity": "HOUR"
},
"timestampSpec": {
"column": "timestamp_column",
"format": "iso"
},
"dimensionsSpec": {
"dimensions": [
"id",
"date"
"name",
"email",
"country",
"user_id",
"service"
]
},
"transformSpec": {
"transforms": [
{
"type": "expression",
"name": "date",
"expression": "timestamp_format(\"__time\", 'yyyy-MM-dd', 'Asia/Seoul')"
}
]
}
}
}
}

[Tip] Spring Boot 2.1.0 에서의 CORS 설정

뭔가 내가 손대면 안되는 건지… 인터넷에서 찾으면 꽤 많은 방법이 나온다.

좀 예전 글이긴 하지만 다음 문서를 보면 XML로 설정하는 방법

<mvc:cors>
<mvc:mapping path="/api/"
allowed-origins="http://domain1.com, http://domain2.com
allowed-methods="GET, PUT
allowed-headers="header1, header2, header
exposed-headers="header1, header2" allow-credentials="false"
max-age="123" />
<mvc:mapping path="/resources/
"
allowed-origins="http://domain1.com/
</mvc:cors>

아니면 addCorsMappings 을 override 하는 방법

@Override
            public void addCorsMappings(CorsRegistry registry) {
                registry.addMapping("/**").allowedOrigins("http://localhost:4200");
            }

아니면 corsFilter() 함수를 Bean으로 노출하는 방법

@Bean
	public FilterRegistrationBean corsFilter() {
		UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
		CorsConfiguration config = new CorsConfiguration();
		config.setAllowCredentials(true);
		config.addAllowedOrigin("http://domain1.com");
		config.addAllowedHeader("*");
		config.addAllowedMethod("*");
		source.registerCorsConfiguration("/**", config);
		FilterRegistrationBean bean = new FilterRegistrationBean(new CorsFilter(source));
		bean.setOrder(0);
		return bean;
	}


 그런데 xml은 내가 시도해보지 않았고, 2,3번은 다 안되었다.(왜 안되는지는 시간이 없어서 확인 안함...)
최후에 되는 방법은 그냥 Filter를 이용한 방법이다. 왜 이것만 되지 -_-;;;


@Component
@Slf4j
public class CORSFilter implements Filter {
    @Override
    public void init(FilterConfig filterConfig) {
    }

    @Override
    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
        throws IOException, ServletException {
        HttpServletResponse resp = (HttpServletResponse) response;
        resp.setHeader("Access-Control-Allow-Origin", "*");
        resp.setHeader("Access-Control-Allow-Methods", "POST, GET, PUT, DELETE");
        resp.setHeader("Access-Control-Max-Age", "3600");
        resp.setHeader("Access-Control-Allow-Headers", "*");
        chain.doFilter(request, response);
    }

    @Override
    public void destroy() {

    }
}