[입 개발] redis-cli 에서 –rdb는 주의해서 사용하셔야 합니다.

최근에 지인에게서 문의가 왔습니다. 뭔가 특별히 한게 없는데, Redis 서버가 죽어버리고 재시작했다고, 이것저것 알아보니, 일단 메모리를 많이 사용하고 있던 상태였습니다. 웬지 느낌은, 아주 전통적인 Redis가 데이터 저장을 위해서 fork하면서 메모리를 추가로 사용하고, 이걸로 인해서 OOM등이 발생한게 아닐까라고 생각을 했는데…

지인의 얘기로는 다음과 같은 –rdb 명령을 사용했다고 합니다.

redis-cli --rdb

일단 저도 한번도 해당 옵션을 써본적이 없어서 소스를 확인해보니 다음과 같습니다. 해당 소스는 redis-cli.c를 보시면 됩니다.

        } else if (!strcmp(argv[i],"--rdb") && !lastarg) {
            config.getrdb_mode = 1;
            config.rdb_filename = argv[++i];
        }

getrdb_mode라는 옵션을 셋팅합니다. 해당 부분을 다시 찾아보면 getrdb_mode가 켜져있을 때 sendCapa()라는 함수와 getRDB를 호출합니다.

    /* Get RDB mode. */
    if (config.getrdb_mode) {
        if (cliConnect(0) == REDIS_ERR) exit(1);
        sendCapa();
        getRDB(NULL);
    }

sendCapa를 호출해서 eof를 설정하면 diskless replication을 시도합니다. 여기서 주의할 것은 diskless replication은 서버에 RDB파일을 만들지 않을뿐, fork를 해야하는 것은 동일합니다. 그리고 이 부분이 문제를 일으키게 됩니다. 다만 여기서는 실제 RDB를 만들지 않고 해당 클라이언트에게는 diskless replication을 하라고 설정만 하는 단계입니다.(다만 서버가 repl_diskless_sync를 켜두지 않으면 만구땡…)

void sendCapa() {
    sendReplconf("capa", "eof");
}

void sendReplconf(const char* arg1, const char* arg2) {
    printf("sending REPLCONF %s %s\n", arg1, arg2);
    redisReply *reply = redisCommand(context, "REPLCONF %s %s", arg1, arg2);

    /* Handle any error conditions */
    if(reply == NULL) {
        fprintf(stderr, "\nI/O error\n");
        exit(1);
    } else if(reply->type == REDIS_REPLY_ERROR) {
        fprintf(stderr, "REPLCONF %s error: %s\n", arg1, reply->str);
        /* non fatal, old versions may not support it */
    }
    freeReplyObject(reply);
}

getRDB 함수는 이제 실제로 SYNC 명령을 보내서 replication을 수행합니다. 원래의 Replication은 그 뒤로 계속 master로 부터 데이터를 받아야 하지만, 여기서 getRDB는 그냥 현재 메모리 상태의 스냅샷만 전달하고 종료하게 됩니다.

코드를 보면 sendSync 후에 payload를 가져오게 되는데, 해당 payload는 RDB생성시 diskless_sync 방식이 아닐경우에만 파일 사이즈를 전달해줘서 가져올 수 있고, diskless_sync 방식을 쓸때는 eof_mark 라는 40바이트의 랜덤값을 처음에 전달해주고, 마지막에 해당 값이 나오면 RDB생성을 종료하게 됩니다.

static void getRDB(clusterManagerNode *node) {
    int s, fd;
    char *filename;
    if (node != NULL) {
        assert(node->context);
        s = node->context->fd;
        filename = clusterManagerGetNodeRDBFilename(node);
    } else {
        s = context->fd;
        filename = config.rdb_filename;
    }
    static char eofmark[RDB_EOF_MARK_SIZE];
    static char lastbytes[RDB_EOF_MARK_SIZE];
    static int usemark = 0;
    unsigned long long payload = sendSync(s, eofmark);
    char buf[4096];

    if (payload == 0) {
        payload = ULLONG_MAX;
        memset(lastbytes,0,RDB_EOF_MARK_SIZE);
        usemark = 1;
        fprintf(stderr,"SYNC sent to master, writing bytes of bulk transfer "
                "until EOF marker to '%s'\n", filename);
    } else {
        fprintf(stderr,"SYNC sent to master, writing %llu bytes to '%s'\n",
            payload, filename);
    }

    /* Write to file. */
    if (!strcmp(filename,"-")) {
        fd = STDOUT_FILENO;
    } else {
        fd = open(filename, O_CREAT|O_WRONLY, 0644);
        if (fd == -1) {
            fprintf(stderr, "Error opening '%s': %s\n", filename,
                strerror(errno));
            exit(1);
        }
    }

    while(payload) {
        ssize_t nread, nwritten;

        nread = read(s,buf,(payload > sizeof(buf)) ? sizeof(buf) : payload);
        if (nread = RDB_EOF_MARK_SIZE) {
                memcpy(lastbytes,buf+nread-RDB_EOF_MARK_SIZE,RDB_EOF_MARK_SIZE);
            } else {
                int rem = RDB_EOF_MARK_SIZE-nread;
                memmove(lastbytes,lastbytes+nread,rem);
                memcpy(lastbytes+rem,buf,nread);
            }
            if (memcmp(lastbytes,eofmark,RDB_EOF_MARK_SIZE) == 0)
                break;
        }
    }
    if (usemark) {
        payload = ULLONG_MAX - payload - RDB_EOF_MARK_SIZE;
        if (ftruncate(fd, payload) == -1)
            fprintf(stderr,"ftruncate failed: %s.\n", strerror(errno));
        fprintf(stderr,"Transfer finished with success after %llu bytes\n", payload);
    } else {
        fprintf(stderr,"Transfer finished with success.\n");
    }
    close(s); /* Close the file descriptor ASAP as fsync() may take time. */
    fsync(fd);
    close(fd);
    fprintf(stderr,"Transfer finished with success.\n");
    if (node) {
        sdsfree(filename);
        return;
    }
    exit(0);
}

다시 정리하자면 redis-cli에서 –RDB 옵션을 주면 서버에 diskless_sync를 요청합니다.(서버가 설정이 되어있어야만 동작합니다.) 그러나 해당 작업은 Redis 서버를 fork하게 만들기 때문에, 메모리 사용량이 높고 Write가 많은 상황에서는 절대적으로 피하시는게 좋습니다.

Advertisements

[입개발] Spring-data-redis 에서 Jedis로 TLS를 쓰면서 인증서 체크는 Disable 하는 방법

흐음… 최근에 Redis를 TLS로 사용해야 할 일이 생겼습니다.(아시는 분은 아시겠지만, 저는 자바맹, Spring 맹이라…) Spring-data-redis 를 쓰는 서비스에서 TLS를 키는 것은 사실 아주아주아주 쉽습니다. 일단 저희는 spring-data-redis에서 Jedis를 쓰고 있는 상황입니다.(여기서 왜 lettuce 안쓰고 Jedis 쓰냐고 물으시면, 원래 그렇게 만들어져 있었으니까라고 대답을 ㅎㅎㅎ)

그런데 JedisConenctionFactory에서 TLS를 쓰는 건 아주 쉽습니다. 그냥 setUseSsl(true)만 호출하면 그 때부터 TLS가 딱!!! 됩니다.

딱!!! 되는데 제대로 접속하기 위해서는 인증서가 제대로 된 위치에 존재해야 합니다. 그런데, 내부 레디스 서버마다 인증서를 만들기는 귀찮을 수 있습니다. 그럴때는 살포시… 인증서를 무시해주면 되는데…

구글링을 해봐도, Spring Data Redis에 TLS를 적용하는 방법은 많이 나옵니다. 또, 인증서 체크를 Disable 하는 방법도 꽤 많이 나옵니다. 그런데 인증서 체크를 끄는 법은 다 HTTPS 관련 Rest Template 설정에 관한 것들입니다. 즉 아주 편하게 할 수 있는 Spring Data Redis 에서 TLS를 인증서 체크를 끄는 방법이 검색해도 안나오더라는…(아마도 제가 못 찾은걸 확신합니다만…)

일단 기본적인 방법은 다음과 같습니다.(다음 StackOverflow 를 참고합니다.)

  1. TrustManager를 생성한다.
  2. SSLContext를 가져온다.
  3. SSLContext의 init를 아까 생성한 TrushManager를 이용하도록 설정한다.
  4. 해당 SSLContext의 SSLSocketFactory를 이용하도록 설정한다.
import javax.net.ssl.*;
import java.security.*;
import java.security.cert.X509Certificate;

public final class SSLUtil{

    private static final TrustManager[] UNQUESTIONING_TRUST_MANAGER = new TrustManager[]{
            new X509TrustManager() {
                public java.security.cert.X509Certificate[] getAcceptedIssuers(){
                    return null;
                }
                public void checkClientTrusted( X509Certificate[] certs, String authType ){}
                public void checkServerTrusted( X509Certificate[] certs, String authType ){}
            }
        };

    public  static void turnOffSslChecking() throws NoSuchAlgorithmException, KeyManagementException {
        // Install the all-trusting trust manager
        final SSLContext sc = SSLContext.getInstance("SSL");
        sc.init( null, UNQUESTIONING_TRUST_MANAGER, null );
        HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());
    }

    public static void turnOnSslChecking() throws KeyManagementException, NoSuchAlgorithmException {
        // Return it to the initial state (discovered by reflection, now hardcoded)
        SSLContext.getInstance("SSL").init( null, null, null );
    }

    private SSLUtil(){
        throw new UnsupportedOperationException( "Do not instantiate libraries.");
    }
}

위의 코드는 HttpsURLConnection 을 위한 SSL 인증서 체크를 무시하는 방법입니다. 다만, 우리는 이걸 보면, 아 Spring Data Redis의 JedisConnectionFactory 도 유사하다고 추측할 수 있습니다. 실제로 JedisConnectionFactory 코드를 보면 SSLSocketFactory를 가져오는 함수는 보이지 않습니다.

조금 더 파보면 JedisConnectionFactory 안에는 clientConfiguration 변수가 있고, 다음과 같은 setSslSocketFactory, getSslSocketFactory 와 같은 함수들이 보입니다.

static class MutableJedisClientConfiguration implements JedisClientConfiguration {
......
		@Override
		public Optional getSslSocketFactory() {
			return Optional.ofNullable(sslSocketFactory);
		}

		public void setSslSocketFactory(SSLSocketFactory sslSocketFactory) {
			this.sslSocketFactory = sslSocketFactory;
		}

		/*
		 * (non-Javadoc)
		 * @see org.springframework.data.redis.connection.jedis.JedisClientConfiguration#getSslParameters()
		 */
		@Override
		public Optional getSslParameters() {
			return Optional.ofNullable(sslParameters);
		}

		public void setSslParameters(SSLParameters sslParameters) {
			this.sslParameters = sslParameters;
		}
......
}

그리고 createJedis 같은 함수를 보면 아래와 같이 Jedis 인스턴스를 생성할 때 clientConfiguration의 getSslSocketFactory 함수를 쓰고 있는걸 볼 수 있습니다.

	private Jedis createJedis() {

		if (providedShardInfo) {
			return new Jedis(getShardInfo());
		}

		Jedis jedis = new Jedis(getHostName(), getPort(), getConnectTimeout(), getReadTimeout(), isUseSsl(),
				clientConfiguration.getSslSocketFactory().orElse(null), //
				clientConfiguration.getSslParameters().orElse(null), //
				clientConfiguration.getHostnameVerifier().orElse(null));

		Client client = jedis.getClient();

		getRedisPassword().map(String::new).ifPresent(client::setPassword);
		client.setDb(getDatabase());

		return jedis;
	}

오호 이제 저 setSslSocketFactory 함수를 통해서 아까 얻은 SSLContext의 SSLSocketFactory 로 바꿔주면 될듯 합니다. 그런데 아주 사소한 문제가 있습니다. 어떻게 JedisConnectionFactory에서 저 값을 바꿀 수 있을까요? 살짝 살펴보니 오오 다음과 같은 함수가 존재합니다.

public class JedisConnectionFactory implements InitializingBean, DisposableBean, RedisConnectionFactory {
        ......
        public JedisClientConfiguration getClientConfiguration() {
		return clientConfiguration;
	}
        ......
}

그런데, 오예!! 하면서 받아서 setSslSocketFactory를 호출해보려고 하면…. 문제가 발생합니다. 그것은!!!, JedisClientConfiguration 이 interface 인데… ReadOnly Interface라는 것입니다. 대략 다음과 같습니다.

public interface JedisClientConfiguration {
	boolean isUseSsl();
	Optional getSslSocketFactory();
	Optional getSslParameters();
	Optional getHostnameVerifier();
	boolean isUsePooling();
	Optional getPoolConfig();
	Optional getClientName();
	Duration getConnectTimeout();
	Duration getReadTimeout();
        ......
}

흐음, 그러면 어차피 내부는 MutableJedisClientConfiguration 클래스이니, 그냥 강제 형변환해버리면 되지 않을까요? 일단 제가 자바에 깊은 지식이 없어서… 실패했을 수도 있지만, Inner Class 고, 같은 패키지가 아니면 사용할 수가 없습니다. 그럼 무슨 말이야… 앞에 시도한 방법은 모두 실패… 우리는 이상한 산으로 가고 있던 것입니다.

그럼 어떻게 해야 하며, 해당 소스를 살펴보니, 다행히 다음과 같은 생성자가 있습니다. 잘 살펴보면 RedisStandaloneConfiguration 과 JedisClientConfiguration 을 생성자로 받고 있습니다.

public JedisConnectionFactory(RedisStandaloneConfiguration standaloneConfig, JedisClientConfiguration clientConfig) {

		this(clientConfig);

		Assert.notNull(standaloneConfig, "RedisStandaloneConfiguration must not be null!");

		this.standaloneConfig = standaloneConfig;
	}

이제 대충 방법이 떠오르시나요? 즉, 다음과 같이 JedisClientConfiguration 을 상속받는 다른 클래스를 만들어서 JedisConnectionFactory 의 생성자로 넘기면, 우리가 원하는 동작을 하도록 만들 수가 있습니다.

대략 저는 다음과 같은 JedisSSLClientConfiguration 이라는 클라스를 만들었습니다. 그리고 JedisConnectionFactory 에 넘겨주니, 인증서 체크를 잘 회피하면서 동작하게 되었습니다. 역시 소스를 보면 길이 보이는 경우가 종종 있네요. 자바를 잘하시는 분들이 부럽습니다. 전 맨날 삽질만…

    JedisSSLClientConfiguration() {
        setUsePooling(true);
        setUseSsl(true);

        TrustManager[] trustAllCerts = new TrustManager[] {
                new X509TrustManager() {
                    public java.security.cert.X509Certificate[] getAcceptedIssuers()
                    {
                        return new X509Certificate[0];
                    }
                    public void checkClientTrusted(java.security.cert.X509Certificate[] certs, String authType)
                    {
                    }
                    public void checkServerTrusted(java.security.cert.X509Certificate[] certs, String authType)
                    {}
                }
        };

        try {
            SSLContext sc = SSLContext.getInstance("TLS");
            sc.init(null, trustAllCerts, null);
            sslSocketFactory = sc.getSocketFactory();
            sslParameters = new SSLParameters();
            sslParameters.setEndpointIdentificationAlgorithm("");
        } catch (Exception e) {
            throw new RuntimeException(e.toString());
        }

        hostnameVerifier = new HostnameVerifier() {
            @Override
            public boolean verify(String s, SSLSession sslSession) {
                return true;
            }
        };
    }

[입 개발] Redis 버그 – Dataset 사이즈가 200GB가 넘어가면 죽는다구요?

오늘은 최근에 이슈가 되었던, Redis 버그에 대해서 분석해보는 시간을 가지도록 하겠습니다.
해당 이슈는 Redis issue 4493 를 보시면 됩니다.

실제로 해당 이슈는 2017년 11월 30일에 올라왔습니다. 실제로 현재 버전에는 다 패치가 되어있습니다. 그러나 DBMS나 캐시등의 툴은 큰 보안버그가 없는 이상 업데이트가 굉장히 느립니다.(성능에 영향을 주기 때문에…)

다른 건 일단 모두 제외하고 어떤 이슈가 있고, 어떻게 패치되었는지 확인해보도록 하겠습니다.

일단 데이터가 200GB가 넘어가면 죽는다는 뭔가 쉽게이해하기 어려운 상황입니다. Redis 는 기본적으로 Key에 512MB까지 그리고 Value에 512MB가 할당되고, 메모리가 넘치기 전까지는 저장이 되어야 합니다. 그런데 보통 특정 사이즈가 문제가 되는것은, 누구나 예측이 되는 아주 간단한 이유가 있습니다. 바로 overflow 나 underflow, 이런 생각을 가지고 가면, 문제가 좀 더 해결하기 쉽지만, 갑자기 Redis가 죽으면 알기가 힘들죠.(이건 다, 우리는 현재 모든 정보를 알고 있기 때문에 쉽게 이해를 할 수 있는…)

Redis 는 보통 죽기 전에 자신의 정보를 뿌리고 죽는데, 위의 링크를 보시면 다음과 같은 정보들이 있습니다. 우와 755GB 메모리를 가진 장비에 211GB의 메모리를 쓰고 있네요. 부럽습니다. 사실 메모리 영역만 보면 사실 큰 문제가 될께 없어보입니다.

Memory
used_memory:226926628616
used_memory_human:211.34G
used_memory_rss:197138104320
used_memory_rss_human:183.60G
used_memory_peak:226926628616
used_memory_peak_human:211.34G
used_memory_peak_perc:117.84%
used_memory_overhead:137439490190
used_memory_startup:486968
used_memory_dataset:89487138426
used_memory_dataset_perc:39.43%
total_system_memory:811160305664
total_system_memory_human:755.45G

Keyspace
db2:keys=2147483651,expires=0,avg_ttl=0

그런데 Keyspace를 보니 조금 다르네요. db2의 key가 2147483651개가 있습니다. 흐음…

다음과 같이 signed 변수들의 범위를 보면

      char : 127
      short : 32767
      int : 2147483647

입니다. 흐음 일단 key의 개수가 int의 범위를 넘어갔네요. 2147483651을 음수로 바꾸면
-2147483645 이 됩니다. 흐음… 이렇게 바뀌면 문제가 발생할 수도 있겠네요. 그런데 뭔가 이상합니다.

분명히 해당 info정보에서는 제대로 2147483651 가 나와있는데요?
해당 정보를 출력하는 info Command는 실제로 genRedisInfoString 라는 함수를 이용합니다.
아래 코드를 보면 keys, vkeys는 long long 입니다. 그걸 사용하고 있는 dictSize 함수는 매크로로 그냥 값을 가져옵니다.
실제로 redisDb 구조체는 dict를 가지고 있고 dict는 다시 dictht 라는 해시 테이블을 가지고 있습니다. 거기서 used 변수를
가져오는게 dictSize 함수입니다.

#define dictSize(d) ((d)->ht[0].used+(d)->ht[1].used)

typedef struct redisDb {
    dict *dict;                 /* The keyspace for this DB */
    dict *expires;              /* Timeout of keys with a timeout set */
    dict *blocking_keys;        /* Keys with clients waiting for data (BLPOP)*/
    dict *ready_keys;           /* Blocked keys that received a PUSH */
    dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS */
    int id;                     /* Database ID */
    long long avg_ttl;          /* Average TTL, just for stats */
} redisDb;

typedef struct dict {
    dictType *type;
    void *privdata;
    dictht ht[2];
    long rehashidx; /* rehashing not in progress if rehashidx == -1 */
    unsigned long iterators; /* number of iterators currently running */
} dict;

typedef struct dictht {
    dictEntry **table;
    unsigned long size;
    unsigned long sizemask;
    unsigned long used;
} dictht;

sds genRedisInfoString(char *section) {
    sds info = sdsempty();
    ......

    /* Key space */
    if (allsections || defsections || !strcasecmp(section,"keyspace")) {
        if (sections++) info = sdscat(info,"\r\n");
        info = sdscatprintf(info, "# Keyspace\r\n");
        for (j = 0; j < server.dbnum; j++) {
            long long keys, vkeys;

            keys = dictSize(server.db[j].dict);
            vkeys = dictSize(server.db[j].expires);
            if (keys || vkeys) {
                info = sdscatprintf(info,
                    "db%d:keys=%lld,expires=%lld,avg_ttl=%lld\r\n",
                    j, keys, vkeys, server.db[j].avg_ttl);
            }
        }
    }
    return info;
}

그런데 used라는 변수는 unsigned long 입니다. 32bit에서는 문제가 될 수 있지만, 64bit에서는 8바이트가 할당되는 변수입니다.
64bit 에서 다음과 같은 코드를 돌려보시면 알 수 있습니다.

#include

int main(int argc, char *argv[]) {
printf(“%d\n”, sizeof(unsigned long));
return 0;
}

그럼 제대로 64bit가 되어있는데 무엇이 문제인가!!!, 처음부터 다 분석하면 어려우니 해당 문제를 일으키는 곳을 바로 확인해 보겠습니다. 코멘트를 자세히 읽어보면… 실제로 아이템을 추가할 때가 문제가 됩니다. 다시 앞으로 돌아가서 아이템 개수가 int의 범위를 넘어갔다는 것을 기억해둡니다.

다음 dictAddRaw 함수를 보면 index가 int 입니다. 헉… 바로 눈치 채시겠죠. 저기 int 로 인해서 아래의 ht->table[index]
라는 코드가 overflow 로 음수가 들어가게 됩니다. 바로 Redis는 저세상으로…

dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{
    int index;
    dictEntry *entry;
    dictht *ht;

    if (dictIsRehashing(d)) _dictRehashStep(d);

    if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)
        return NULL;

    ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
    entry = zmalloc(sizeof(*entry));
    entry->next = ht->table[index];
    ht->table[index] = entry; /* used++;

    /* Set the hash entry fields. */
    dictSetKey(d, entry, key);
    return entry;
}

그럼 해당 실제 patch는 어떻게 적용되었을까요?

해당 코드를 보시면 해당 int는 long으로, 그 외에 많은 부분들이 64bit unsigned 또는 signed 로 변경된것을 볼 수 있습니다.
실제로 해당 PR은 두개로 나뉘어져 각각 반영되었고, Redis 4.0.7에 다음 두개의 commits으로 볼 수 있습니다.
dict: fix the int problem for defrag

dict: fix the int problem

즉 해당 버그를 피하실려면 최소한 4.0.7 이후의 버전을 선택하셔야 하고 가능하면 최신 버전을 고르시는 걸 추천드립니다.
그런데 21억개를 넘어가는 아이템이라… 웬만한 규모에서는 발생하지 않을 문제겠지만, 엄청 많은 데이터를 쓰는 곳에서만 발생할 수 있었던 문제로 보입니다.

[입 개발] Hive MetaStore 에서 Location은 어떻게 관리될까?

최근에 아주 이상한 에러를 경험했습니다. 다음과 같은 managed table 이 있다고 가정합니다.

CREATE TABLE `test1`(
  `id` bigint
PARTITIONED BY (
  `datestamp` date)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  'hdfs://a.b.c:8020/user/hive/warehouse/charsyam.db/test1'
TBLPROPERTIES (
  'transient_lastDdlTime'='1556186715')

test1 이라는 table에 drop table 을 시도했는데!!! 다음과 같은 에러가 발생했습니다.

FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. MetaException(message:java.lang.IllegalArgumentException: Wrong FS: hdfs://a.b.c:8020/user/hive/warehouse/charsyam.db/test1, expected: hdfs://ip-b.b.c:8020)

사실 이 에러는 hadoop 에서 checkPath 라는 함수에 의해서 발생하게 됩니다.(https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java#L711) 코드를 보면 실제 schema에 있는 Location 경로와 실제 hadoop 에서 인식하는 자신의 도메인이 맞지 않을 때 발생하는 에러입니다. 즉 a.b.c 도 ip 가 1.1.1.1 이고 b.b.c 도 ip 가 1.1.1.1 이지만 table location 이 hdfs://a.b.c:8020 인것과 hdfs://b.b.c:8020 은 서로 다른 도메인이 되는 거죠.

  protected void checkPath(Path path) {
    URI uri = path.toUri();
    String thatScheme = uri.getScheme();
    if (thatScheme == null)                // fs is relative
      return;
    URI thisUri = getCanonicalUri();
    String thisScheme = thisUri.getScheme();
    //authority and scheme are not case sensitive
    if (thisScheme.equalsIgnoreCase(thatScheme)) {// schemes match
      String thisAuthority = thisUri.getAuthority();
      String thatAuthority = uri.getAuthority();
      if (thatAuthority == null &&                // path's authority is null
          thisAuthority != null) {                // fs has an authority
        URI defaultUri = getDefaultUri(getConf());
        if (thisScheme.equalsIgnoreCase(defaultUri.getScheme())) {
          uri = defaultUri; // schemes match, so use this uri instead
        } else {
          uri = null; // can't determine auth of the path
        }
      }
      if (uri != null) {
        // canonicalize uri before comparing with this fs
        uri = canonicalizeUri(uri);
        thatAuthority = uri.getAuthority();
        if (thisAuthority == thatAuthority ||       // authorities match
            (thisAuthority != null &&
             thisAuthority.equalsIgnoreCase(thatAuthority)))
          return;
      }
    }
    throw new IllegalArgumentException("Wrong FS: " + path +
                                       ", expected: " + this.getUri());
  }

그런데 이런 일이 왜 일어날까가 사실 더 궁금할껍니다. 보통은 발생하면 안되는 일이죠. 사실 이런 현상은 Hadoop 노드를 계속 새로운 장비에 재 구축하는데, Hive Metastore 는 동일하게 저장하기 때문에 발생하는 현상입니다. 보통 하둡 장비들이 크게 바뀔일이 없으니… 거의 발생할 일도 없는거죠. 그런데 이런 일이 발생하게 되면, 다음과 같은 에러를 맞게 됩니다. 그래서 이걸 파다보니 실제로 Hive MetaStore 에서 Location 을 어떻게 저장하는가가 궁금해졌습니다. 보통 우리는 show create table 등으로 볼 수 있습니다. 그럼 이 정보들은 어디에 저장될까요?

일단 Hive Metastore 는 실제 저장은 다른 곳 DBMS나 glue 등에 저장을 할 수 있습니다. 여기서는 MYSQL에 저장된 것으로 설명을 하겠습니다. 먼저 mysql 에 hive_metastore 라는 DB가 생성이 됩니다. 그리고 여기서 우리가 살펴볼 table 은 DBS, TBLS, SDS 입니다.(이거 세 개만 보면 됩니다.) 마지막 S는 전부 복수의 S로 보이고 각각 Database, Table, StorageDescriptor 로 보입니다.

먼저 hive 등에서 create database 로 DB를 생성할 때 마다 해당 정보가 Database 에 저장이 됩니다. 먼저 DBS 테이블의 스키마는 다음과 같습니다.

CREATE TABLE `DBS` (
  `DB_ID` bigint(20) NOT NULL,
  `DESC` varchar(4000) CHARACTER SET latin1 COLLATE latin1_bin DEFAULT NULL,
  `DB_LOCATION_URI` varchar(4000) CHARACTER SET latin1 COLLATE latin1_bin NOT NULL,
  `NAME` varchar(128) CHARACTER SET latin1 COLLATE latin1_bin DEFAULT NULL,
  `OWNER_NAME` varchar(128) CHARACTER SET latin1 COLLATE latin1_bin DEFAULT NULL,
  `OWNER_TYPE` varchar(10) CHARACTER SET latin1 COLLATE latin1_bin DEFAULT NULL,
  PRIMARY KEY (`DB_ID`),
  UNIQUE KEY `UNIQUE_DATABASE` (`NAME`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1

여기서 주목할 것은 역시 primary key 인 DB_ID 와 DB_LOCATION_URI 입니다. 만약 charsyam db라면, 이 DB_LOCATION_URI 에 들어가는 값이 hdfs://a.b.c/user/hive/warehouse/charsyam.db/ 까지가 들어가게 됩니다. 그리고 이 db에 테이블이 생성되면 table의 location이 항상 이 밑에 생기게 됩니다.

실제로 TBLS 테이블을 보면 내부에 Location이 없습니다. 그래서 해당 DBS의 DB_LOCATION_URI를 바꾸면 바로 되지 않을까라고 생각을 처음에 했었는데… 사실 이건 말도 안되는 소리입니다. 왜냐하면 hive 등의 툴을 써보신 분들은 바로 아시겠지만, location 자체는 얼마든지 다른 것으로 바꿀 수 있습니다. 그렇다면 무엇인가? 테이블에는 자기만의 Location 정보가 꼭 있어야 한다는 것입니다. 즉 TBLS에 없다면, 다른곳에 있어야 합니다. 다음은 TBLS 의 schema 입니다.

CREATE TABLE `TBLS` (
  `TBL_ID` bigint(20) NOT NULL,
  `CREATE_TIME` int(11) NOT NULL,
  `DB_ID` bigint(20) DEFAULT NULL,
  `LAST_ACCESS_TIME` int(11) NOT NULL,
  `OWNER` varchar(767) CHARACTER SET latin1 COLLATE latin1_bin DEFAULT NULL,
  `RETENTION` int(11) NOT NULL,
  `SD_ID` bigint(20) DEFAULT NULL,
  `TBL_NAME` varchar(256) CHARACTER SET latin1 COLLATE latin1_bin DEFAULT NULL,
  `TBL_TYPE` varchar(128) CHARACTER SET latin1 COLLATE latin1_bin DEFAULT NULL,
  `VIEW_EXPANDED_TEXT` mediumtext,
  `VIEW_ORIGINAL_TEXT` mediumtext,
  `IS_REWRITE_ENABLED` bit(1) NOT NULL,
  PRIMARY KEY (`TBL_ID`),
  UNIQUE KEY `UNIQUETABLE` (`TBL_NAME`,`DB_ID`),
  KEY `TBLS_N50` (`SD_ID`),
  KEY `TBLS_N49` (`DB_ID`),
  CONSTRAINT `TBLS_FK1` FOREIGN KEY (`SD_ID`) REFERENCES `SDS` (`SD_ID`),
  CONSTRAINT `TBLS_FK2` FOREIGN KEY (`DB_ID`) REFERENCES `DBS` (`DB_ID`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1

그렇다면 해당 정보는 어디에 있을까요? 당연히!!! 아까 언급했지만, 아직까지 안나온 SDS 입니다. 어떻게 SD 가 StorageDescriptor라고 생각하냐고 하면… Hive 소스를 보면 ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java 에 getPath 라는 메서드가 있고 여기서 getSd() 라는 메서드를 호출합니다.

final public Path getPath() {
    String location = tTable.getSd().getLocation();
    if (location == null) {
      return null;
    }
    return new Path(location);
  }

이걸 자세히 보면 다음과 같이 StorageDescriptor 라는 클래스를 리턴합니다. 아마도 그러니 저것도…

  public StorageDescriptor getSd() {
    return this.sd;
  }

이제 SDS 테이블을 살펴보시죠. Location 항목이 보입니다. 여기에 저장이 되는겁니다.

CREATE TABLE `SDS` (
  `SD_ID` bigint(20) NOT NULL,
  `CD_ID` bigint(20) DEFAULT NULL,
  `INPUT_FORMAT` varchar(4000) CHARACTER SET latin1 COLLATE latin1_bin DEFAULT NULL,
  `IS_COMPRESSED` bit(1) NOT NULL,
  `IS_STOREDASSUBDIRECTORIES` bit(1) NOT NULL,
  `LOCATION` varchar(4000) CHARACTER SET latin1 COLLATE latin1_bin DEFAULT NULL,
  `NUM_BUCKETS` int(11) NOT NULL,
  `OUTPUT_FORMAT` varchar(4000) CHARACTER SET latin1 COLLATE latin1_bin DEFAULT NULL,
  `SERDE_ID` bigint(20) DEFAULT NULL,
  PRIMARY KEY (`SD_ID`),
  KEY `SDS_N49` (`SERDE_ID`),
  KEY `SDS_N50` (`CD_ID`),
  CONSTRAINT `SDS_FK1` FOREIGN KEY (`SERDE_ID`) REFERENCES `SERDES` (`SERDE_ID`),
  CONSTRAINT `SDS_FK2` FOREIGN KEY (`CD_ID`) REFERENCES `CDS` (`CD_ID`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1

그런데 이거랑 실제 Table과 어떻게 매칭을 시킬 수 있을까요? 그 비밀은 Primary Key인 SD_ID가 중요합니다. 위에서 언급한 TBLS 테이블을 보면 DB_ID, SD_ID, TBL_NAME 이 있습니다. 즉, 먼저 DBS 에서 database 이름으로 DB_ID 값을 찾고, 그 DB_ID와 TBL_NAME을 이용해서 해당 테이블의 SD_ID를 찾을 수 있습니다. 그리고 그 SD_ID를 이용하면 마지막으로 SDS 테이블에서 최종적으로 해당 테이블의 StorageDescriptor 정보를 볼 수 있는 겁니다. 그리고 SDS 의 Location 값이 show create table 에서 볼 수 있는 바로 그값입니다. 그래서 여기서 해당 컬럼의 값을 바꿔버리면… show create table 시에도 값이 바뀌게 됩니다.

당연하지만, SDS 테이블에 데이터가 많은 것은 StorageDescriptor는 테이블마다 하나씩이지만, 당연히 Partition 마다도 존재합니다. 그렇기 때문에 최소한 전체 테이블 수 + 전체 파티션 수 만큼 존재하는게 맞습니다. 🙂

[입 개발] Spark DataFrameWriter에서 saveAsTable 의 동작

s3 에 external table을 만들고 거기에 데이터를 넣는 작업을 하다가 이상한 현상을 경험했습니다.
다음과 같은 테이블을 만들고

create external table(
    id int,
    name varchar(22)
) LOCATION
  's3://bucket/tmp/tmp1';

아래 코드를 돌렸는데…

val tb = spark.sql(
        """
        |SELECT
        |  id, name
        |FROM original
        """
)
tb.write.mode(SaveMode.Overwrite).saveAsTable("tmp1")

당연히 tmp1 테이블은 데이터를 s3에 그리고 external 테이블로 제대로 저장이 될 것을 기대했는데, 아래와 같이, managed 테이블로 바뀌고 저장 위치로 hdfs 로 바뀌는 것이었습니다. -_-(왜왜왜)

create table(
    id int,
    name varchar(22)
) LOCATION
  'hdfs://tmp/blahblah/tmp1';

Spark DataFrameWriter의 saveAsTable 을 SaveMode.Overwrite mode로 사용하게 되면 이런 일이 벌어지게 됩니다. 왜 그런가 해서 소스 코드를 까봤습니다. DataFrameWriter.scala를 보시면 됩니다.

그냥 saveAsTable 소스를 보면 간단합니다. 아래를 보면 SaveMode.Overwrite 가 true 일때… 밑에서 dropTable, createTable 을 부르는군요. 어랏…. dropTable???, 이거 실화인가요? 즉, 이 때 테이블을 날려버립니다. 그리고 createTable로 재생성해줍니다.

  private def saveAsTable(tableIdent: TableIdentifier): Unit = {
    val catalog = df.sparkSession.sessionState.catalog
    val tableExists = catalog.tableExists(tableIdent)
    val db = tableIdent.database.getOrElse(catalog.getCurrentDatabase)
    val tableIdentWithDB = tableIdent.copy(database = Some(db))
    val tableName = tableIdentWithDB.unquotedString

    (tableExists, mode) match {
       ......
       case (true, SaveMode.Overwrite) =>
        // Get all input data source or hive relations of the query.
        val srcRelations = df.logicalPlan.collect {
          case LogicalRelation(src: BaseRelation, _, _, _) => src
          case relation: HiveTableRelation => relation.tableMeta.identifier
        }

        val tableRelation = df.sparkSession.table(tableIdentWithDB).queryExecution.analyzed
        EliminateSubqueryAliases(tableRelation) match {
          // check if the table is a data source table (the relation is a BaseRelation).
          case LogicalRelation(dest: BaseRelation, _, _, _) if srcRelations.contains(dest) =>
            throw new AnalysisException(
              s"Cannot overwrite table $tableName that is also being read from")
          // check hive table relation when overwrite mode
          case relation: HiveTableRelation
              if srcRelations.contains(relation.tableMeta.identifier) =>
            throw new AnalysisException(
              s"Cannot overwrite table $tableName that is also being read from")
          case _ => // OK
        }

        // Drop the existing table
        catalog.dropTable(tableIdentWithDB, ignoreIfNotExists = true, purge = false)
        createTable(tableIdentWithDB)
        // Refresh the cache of the table in the catalog.
        catalog.refreshTable(tableIdentWithDB)
      ......
    }
  }

그럼 external이 왜 managed가 되는지 살펴보시죠. createTable 코드를 보면 storage.locationUri.isDefined를 보고 EXTERNAL, MANAGED가 결정됩니다.(다른 글에서 쓰겠지만 Spark Sql에서 현재는 alter table 을 이용한 external, managed 변경이 안됩니다. Spark 2.4 기준)

  private def createTable(tableIdent: TableIdentifier): Unit = {
    val storage = DataSource.buildStorageFormatFromOptions(extraOptions.toMap)
    val tableType = if (storage.locationUri.isDefined) {
      CatalogTableType.EXTERNAL
    } else {
      CatalogTableType.MANAGED
    }

    val tableDesc = CatalogTable(
      identifier = tableIdent,
      tableType = tableType,
      storage = storage,
      schema = new StructType,
      provider = Some(source),
      partitionColumnNames = partitioningColumns.getOrElse(Nil),
      bucketSpec = getBucketSpec)

    runCommand(df.sparkSession, "saveAsTable")(CreateTable(tableDesc, mode, Some(df.logicalPlan)))
  }

해당 값은 buildStorageFormatFromOptions 를 보면 가져옵니다. options map에서 path가 있으면 가져오는 군요.

  def buildStorageFormatFromOptions(options: Map[String, String]): CatalogStorageFormat = {
    val path = CaseInsensitiveMap(options).get("path")
    val optionsWithoutPath = options.filterKeys(_.toLowerCase(Locale.ROOT) != "path")
    CatalogStorageFormat.empty.copy(
      locationUri = path.map(CatalogUtils.stringToURI), properties = optionsWithoutPath)
  }

여기서 path가 있으면 그냥 locationUri 에 복사해줍니다. 그럼 이제 위의 문제를 어떻게 해결해야 할까요? storage.locationUri.isDefined 를 true로 만들어주는 방법은, 넵 path option을 설정해 주면 간단하게 해결됩니다.

val tb = spark.sql(
        """
        |SELECT
        |  id, name
        |FROM original
        """
)
tb.write
  .mode(SaveMode.Overwrite)
  .option("path", "s3://bucket/tmp/tmp1")
  .saveAsTable("tmp1")

이렇게 하는 방법말고 만약에 그냥 쉽게 덮어쓰고 싶다면, insertInto 메서드를 쓰셔도 간단하게 해결됩니다.

[입개발] NAGLE 알고리즘과 TCP_CORK

어쩌다보니, 오늘 처음으로…(정말로 처음으로!!!) TCP_CORK라는 옵션에 대해서 찾아보게 되었습니다.(아니 이게 무엇이오 여러분!!!)

TCP_CORK라는 옵션을 설명하기 전에 먼저 TCP_NODELAY라는 옵션이 있습니다. 원래 데이터 전송의 효율성을 취하기 위해서 기본적으로 TCP 전송에 Nagle이라는 알고리즘이 적용되어 있습니다. 대용량 파일을 보낼 때는 유리하지만, 짧은 길이의 데이터를 보낼때는 사실 유용하지 않습니다. 네트웍 관련 서비스를 만들다 보면, 뭔가 응답이, 아무이유 없이, 아주 늦게 가는 케이스를 만나게 되는 경우가 있는데, 실제로, Nagle 알고리즘의 영향을 받아서 늦게 가는 경우가 종종 있습니다.

그러면 먼저 Nagle 알고리즘에 대해서 알아보면, 그냥 어느정도 데이터가 쌓일 때 까지 패킷을 보내지 않고 기다려 놓다가… 일정 사이즈가 되면 보내겠다라는 알고리즘입니다. 동네 버스가, 사람이 적을 때는 출발하지 않고, 몇명 와야 출발하는 것 처럼…(판교역 앞에 계시면 이런 경우를 많이 보시게 됩니다.)

이렇게 모아서 보내면 효율은 좋지만, 먼저 버스에 탄 사람은 인원 수가 모일때까지 가지 못하고, 기다려야 하는 단점이 있습니다. TCP 전송에서도 이게 그대로 발생합니다. 작은 패킷을 보내면 다른 패킷이 추가되어서 특정 사이즈가 되기 전까지는 전달이 되지 않습니다.(정확히는 send timeout 이 되면 전송됩니다.)

예전에 서비스를 운영하다보면, 마지막 2바이트가 몇초 뒤에 전송되어서 항상 문제가 되는 경우가 발생한 적이 있는데, 결국 해당 서비스는 TCP_NODELAY를 적용함으로써 해결했습니다.(또다른 문제의 시작일수도?)

앞에 Nagle을 이렇게 설명한 것을 잘생각해보면 DELAY가 생기는 거고, TCP_NODELAY는 바로 이 Nagle 알고리즘을 끄는 것입니다. 즉 패킷이 들어오면 바로바로 전송하는 거죠. 사실 여기까지가 제가 아는 Nagle 알고리즘이었습니다. 그런데 갑자기 TCP_CORK 가 딱!!!, TCP_CORK 는 Nagle과 유사한 알고리즘(?) 입니다.

https://stackoverflow.com/questions/22124098/is-there-any-significant-difference-between-tcp-cork-and-tcp-nodelay-in-this-use 해당 링크가 잘 설명이 되어 있는데, 요약하면 Nagle은 TCP_CORK의 약화버전이고, Nagle은 ACK를 체크하지만, TCP_CORK는 사이즈만 본다? 라는 뭔가 설명이 있는데…(그렇습니다. 저는 영어가…)

여기서 ACK는 TCP에서도 패킷을 보내고 나면 거기에 대한 ACK를 받게 됩니다. 혼잡제어나, 재전송이나… 그리고 사이즈라… 저 사이즈는 뭘까요. 패킷을 모아보내는 사이즈면 설정가능하지 않을까 하고 찾아보면, 따른 설정은 안보입니다. 네트워크 좀 아시는 분들은 아 그거 단위로 보내겠다고 쉽게 생각하시겠지만… 전 몰라요~~~

그럼 이제 커널 소스를 보면서 간단하게 생각해보도록 하겠습니다. net/ipv4/tcp_output.c 파일을 보면 tcp_write_xmit 라는 함수가 있습니다. 여기서는 다시 tcp_mss_split_point 를 호출합니다.

......
		limit = mss_now;
		if (tso_segs > 1 && !tcp_urg_mode(tp))
			limit = tcp_mss_split_point(sk, skb, mss_now,
						    min_t(unsigned int,
							  cwnd_quota,
							  max_segs),
						    nonagle);

......

tcp_mss_split_point 를 보면 needed 가 버퍼에 존재하는 패킷의 사이즈로 예측이 됩니다. 소켓 버퍼에 있는 사이즈와 window 사이즈 중에 적은게 선택이 됩니다. 그리고 max_len 이 needed 보다 작으면 max_len 이 전송이 되고, 중요한 부분은 partial 은 구하는 것입니다. 모듈러 하는 변수명이 보이시나요? 아까 말한 그 사이즈는 바로 mss_now 인 것입니다. 여기서 partial은 원래 네트웍에서 패킷을 MSS 단위로 보내기 때문에, 모듈러 mss_now 하면, mss가 1024일 때 우리가 600만 보낸다면, 424바이트가 MSS에 모자라기 때문에 partial 은 424 바이트가 됩니다. 코드를 보면 tcp_nagle_check 하고나서 true면 nagle을 적용해야 하는 상황일테니… needed – partial 만큼의 사이즈를 리턴합니다. 즉 MSS 단위로 패킷을 보내도록 짤라준거죠.

static unsigned int tcp_mss_split_point(const struct sock *sk,
					const struct sk_buff *skb,
					unsigned int mss_now,
					unsigned int max_segs,
					int nonagle)
{
	const struct tcp_sock *tp = tcp_sk(sk);
	u32 partial, needed, window, max_len;

	window = tcp_wnd_end(tp) - TCP_SKB_CB(skb)->seq;
	max_len = mss_now * max_segs;

	if (likely(max_len len, window);

	if (max_len packets_out 이 0보다 커야 합니다.(보내는게 있다는 뜻으로...) 그리고 Nagle 알고리즘에서는 마지막으로 minshall 체크라는 걸 합니다. TCP_CORK는 별 다른게 없는데, 아까 Nagle과의 체크에서 ACK를 확인한다는 걸 기억하시나요?


static bool tcp_nagle_check(bool partial, const struct tcp_sock *tp,
			    int nonagle)
{
	return partial &&
		((nonagle & TCP_NAGLE_CORK) ||
		 (!nonagle && tp->packets_out && tcp_minshall_check(tp)));
}

minshall 이라는 분이 계시더군요(먼산…) 이 알고리즘은 그냥 보낸 패킷의 시퀀스와 ACK 받은 패킷의 시퀀스를 비교하기만 합니다. 아래 before 와 after는 그냥 before는 앞에 파라매터가 작으면 true, after는 앞에 파라매터가 크면 true 입니다. 저기서 snd_sml은 보낸 패킷의 시퀀스, snd_una는 ACK 받은 패킷의 시퀀스입니다. “!after(tp->snd_sml, tp->snd_nxt)” 이 코드는 시퀀스가 오버플로우 난 걸 확인하는 걸로 보입니다. 하여튼!!!, 즉 여기서 중요한 것은 ack를 다 받았다면 tp->snd_sml 과 tp->snd_una는 같은 값일 것이므로 false가 리턴됩니다. 즉 tcp_minshall_check가 true는 현재 ack 받아야할 패킷이 더 있다라는 뜻이고 false는 현재 모든 패킷의 ack를 받았다가 됩니다.

static inline bool before(__u32 seq1, __u32 seq2)
{
        return (__s32)(seq1-seq2) snd_sml, tp->snd_una) &&
		!after(tp->snd_sml, tp->snd_nxt);
}

그럼 요약을 하면 TCP_CORK는 이것저것 확인안하고 켜지면 무조건 MSS 단위로만 보내겠다가 됩니다. 그런데 Nagle은 전부 ACK를 받았다면 5 byte만 보낸다고 하더라도… ACK를 모두 받았으므로 tcp_minshall_check 가 false 가 되어서, 패킷이 보내집니다. 요약하면 mss가 1024이고 4100 바이트를 보낸다면 partial = 4100 % 1024 = 4, TCP_CORK에서는 마지막 4바이트는 전송이 되지 않습니다. 언제까지? timeout 이 발생할때까지…, 그러나 Nagle은… ACK를 받을 패킷이 남아있다면 마지막 4바이트가 전송이 안되지만, ACK를 모두 받았다면… 마지막 4바이트도 전송이 되게 됩니다. 이게 두 가지 옵션의 차이이고, Nagle이, TCP_CORK 보다 조금 약한 제약이라는 의미입니다.

[입 개발] spark-submit 시에 –properties-file 와 파라매터에서의 우선 순위

어쩌다보니… 갑자기 SparkSubmit 시에 사용되는 –properties-file(일종의 spark-defaults.conf)와 그냥 파라매터로 넘기는 것의 우선순위가 어떻게 적용되는지가 궁금해 졌습니다. 뭐, 당연히 일반적으로 생각하면 파라매터로 넘기는 것이 분명히 spark-defaults.conf 에 들어가있는 것 보다는 우선이 되는게 당연하겠지라는 생각을 가지고 있었고, 결론부터 말하자면, 이게 맞습니다.(다를 수가 없잖아!!! 퍽퍽퍽)

그러나, 우리는 공돌이니 그래도 명확하게 해두자라는 생각이 들어서, 소스를 가볍게 살펴봤습니다.
실제로 해당 내용은 “core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala” 파일을 살펴보면 들어있습니다. 일단 main 코드는 다음과 같습니다. 여기서는 아주 간단히 확인할 것인데… 이름 부터 이미 parse 와 mergeDefaultSparkProperties 가 있습니다. 우리는 우선순위가 궁금할 뿐이니… parse 에서 가져온 것들을 mergeDefaultSparkProperties 에서 덮어쓸까만 확인하면 됩니다.

  parse(args.asJava)
  // Populate `sparkProperties` map from properties file
  mergeDefaultSparkProperties()
  // Remove keys that don't start with "spark." from `sparkProperties`.
  ignoreNonSparkProperties()
  // Use `sparkProperties` map along with env vars to fill in any missing parameters
  loadEnvironmentArguments()
  useRest = sparkProperties.getOrElse("spark.master.rest.enabled", "false").toBoolean
  validateArguments()

parse를 확인해 봅시다. 특별히 중요한 것은 없고 findCliOption 가 넘겨진 opts 중에서 해당 옵션이 있는지 확인하는 코드이고 handle 에서 실제로 해당 값을 셋팅하는 코드가 있습니다.

  protected final void parse(List args) {
    Pattern eqSeparatedOpt = Pattern.compile("(--[^=]+)=(.+)");

    int idx = 0;
    for (idx = 0; idx 
      val properties = Utils.getPropertiesFromFile(filename)
      properties.foreach { case (k, v) =>
        defaultProperties(k) = v
      }
      // Property files may contain sensitive information, so redact before printing
      if (verbose) {
        Utils.redact(properties).foreach { case (k, v) =>
          logInfo(s"Adding default property: $k=$v")
        }
      }
    }
    defaultProperties
  }

즉 defaultProperties -> sparkProperties 로 저장이 되는 겁니다. 그러면. 실제로 이 값의 우선순위는 어디에 저장이 되는가? 실제로 loadEnvironmentArguments 에서 해당 값이 설정이 됩니다. 아래에 보시면 Option에 먼저 executorMemory 가 NULL 이면 orElse 로 아까 저장한 sparkProperties 에서 가져오고 그래도 없으면 환경 변수에서 가져오고, 그래도 없으면 Null이 리턴됩니다.

  private def loadEnvironmentArguments(): Unit = {
    ......
    executorMemory = Option(executorMemory)
      .orElse(sparkProperties.get(config.EXECUTOR_MEMORY.key))
      .orElse(env.get("SPARK_EXECUTOR_MEMORY"))
      .orNull
    ......
  }

마지막으로 정리하면 결국 우선순위는 다음과 같습니다.

  1. 파라매터로 전달함 –executor-memory 이런식으로
  2. properties-file 로 저장한 값
  3. 환경변수

그런데 무조건 되는가에 대한 고민을 더 하셔야 합니다. 예를 들어 파라매터로 넘길 수 있는 것이 100%는 아닙니다. 다른 설정이 spark 설정 파일에 있을 수 가 있는 거죠. 즉 spark.yarn.executor.memoryOverhead 이런 값이 spark 설정 파일에 있다면, 여전히 이것 때문에 문제가 발생할 수 있다라는 것을 알아야 합니다.