MongoDB 소스로 살펴본 Replica Set 방식에서의 FailOver – 1

MongoDB는 10gen에서 만든 가장 인기있는 NoSQL 중의 하나입니다. 간단하게 그 인기를 소개하자면 NoSQL계의 MySQL이라고 할까요?

그리고 대부분의 이런 종류는 FailOver를 위한 기능을 제공합니다. MongoDB는 Master-Slave 방식과 Replica Set이라는 두 가지 방식을 제공하는데, 대부분 Replica Set을 이용합니다.

그럼 이 Replica Set에서 실제 장애가 났을 때 어떻게 FailOver 되는지가 궁금하지 않으신가요?

그렇죠. 거기 머리 크신 분… 너무너무 궁금하다구요. 안 알려주면 직접 찾아오시겠다구요? 찾아오실 때는 양손을 무겁게 먹을것으로(퍽… 돌만 가득…) 이번에는 실제로 MongoDB에서 어떻게 Replica Set에서 FailOver를 하게 되는지 살짝 살펴보고 소스에서는 그 부분이 어디인지, 살펴보도록 하겠습니다.

먼저 FailOver 가 되기 전에, 이와 밀접한 관계가 있는 Replication을 살펴보도록 하겠습니다.

repl.cpp 파일을 보면 startReplication 함수에서 replication이 시작하게 됩니다.

void startReplication() {
//replSet이 설정되어 있을 때, master나 slave가 설정되면 오류를 표시하고 종료합니다.
//즉 우리는 여기서, replSet 과 master/slave 는 MongoDB에서 공존할 수 없다 라는 것을 알 수 있습니다.</pre>
if( !cmdLine._replSet.empty() ) {
 if( replSettings.slave || replSettings.master ) {
 log() << "***" << endl;
 log() << "ERROR: can't use --slave or --master replication options with --replSet" << endl;
 log() << "***" << endl;
 }
//newRepl()을 호출하고, startReplSets 라는 Thread를 동작 시킵니다.
//newRepl은 단순히 replSettings.master 를 true로 셋팅하고 _logOp를 _logOpUninitialized로 설정합니다.
newRepl();

replSet = true;
 ReplSetCmdline *replSetCmdline = new ReplSetCmdline(cmdLine._replSet);
 boost::thread t( boost::bind( &startReplSets, replSetCmdline) );

return;
 }

oldRepl();

/* this was just to see if anything locks for longer than it should -- we need to be careful
 not to be locked when trying to connect() or query() the other side.
 */
 //boost::thread tempt(tempThread);

if( !replSettings.slave && !replSettings.master )
 return;

{
 dblock lk;
 replLocalAuth();
 }

if ( replSettings.slave ) {
 assert( replSettings.slave == SimpleSlave );
 log(1) << "slave=true" << endl;
 boost::thread repl_thread(replSlaveThread);
 }

if ( replSettings.master ) {
 log(1) << "master=true" << endl;
 replSettings.master = true;
 createOplog();
 boost::thread t(replMasterThread);
 }

while( replSettings.fastsync ) // don't allow writes until we've set up from log
 sleepmillis( 50 );
 }

이제 rs.cpp 의 startReplSets 함수를 따라가 봅니다.

    void startReplSets(ReplSetCmdline *replSetCmdline) {
        Client::initThread("rsStart");
        try {
            assert( theReplSet == 0 );
            if( replSetCmdline == 0 ) {
                assert(!replSet);
                return;
            }
            replLocalAuth();
            //replication에 사용되는 중요한 전역변수 theReplSet을 생성합니다.
            //그리고 theReplSet 의 go 를 호출합니다.
            (theReplSet = new ReplSet(*replSetCmdline))->go();
        }
        catch(std::exception& e) {
            log() << "replSet caught exception in startReplSets thread: " << e.what() << rsLog;
            if( theReplSet )
                theReplSet->fatal();
        }
        cc().shutdown();
    }

_go 함수 안에서 loadLastOpTimeWritten 을 호출해서 가장 최신의 Operation Time 을 가져옵니다.

    /* call after constructing to start - returns fairly quickly after launching its threads */
    void ReplSetImpl::_go() {
        try {
            loadLastOpTimeWritten();
        }
        catch(std::exception& e) {
            log() << "replSet error fatal couldn't query the local " << rsoplog << " collection.  Terminating mongod after 30 seconds." << rsLog;
            log() << e.what() << rsLog;
            sleepsecs(30);
            dbexit( EXIT_REPLICATION_ERROR );
            return;
        }

        changeState(MemberState::RS_STARTUP2);
       //실제 FailOver를 위한 heartbeat 스레드 생성 및 replication 스레드 생성
        startThreads();
        newReplUp(); // oplog.cpp
    }

이제 startThread로 넘어가 보도록 하겠습니다.

    /** called during repl set startup.  caller expects it to return fairly quickly.
        note ReplSet object is only created once we get a config - so this won't run
        until the initiation.
    */      
    void ReplSetImpl::startThreads() {
        task::fork(mgr);
    //여기가 실제 Fail을 확인하기 위한 heartbeat를 하는 msgCheckNewState를 호출합니다.)
    //결국 heartbeat 의 결과는 Manager::msgCheckNewState 가 처리하게 됩니다. 이게 핵심
        mgr->send( boost::bind(&Manager::msgCheckNewState, theReplSet->mgr) );
        
    //여기서 replication이 시작됩니다.
        boost::thread t(startSyncThread);
    
        task::fork(ghost);
    
        // member heartbeats are started in ReplSetImpl::initFromConfig
    }

이제 갑자기 코드가 어려워집니다. msgCheckNewState는 health thread 에 체크한 결과로 Primary를 뽑아야 할 때, 호출됩니다.

    /** called as the health threads get new results */
    void Manager::msgCheckNewState() {
        {
            theReplSet->assertValid();
            rs->assertValid();

            RSBase::lock lk(rs);

            if( busyWithElectSelf ) return;

            checkElectableSet();
            checkAuth();

            const Member *p = rs->box.getPrimary();
            if( p && p != rs->_self ) {
                if( !p->hbinfo().up() ||
                        !p->hbinfo().hbstate.primary() ) {
                    p = 0;
                    rs->box.setOtherPrimary(0);
                }
            }

            const Member *p2;
            {
                bool two;
                p2 = findOtherPrimary(two);
                if( two ) {
                    /* two other nodes think they are primary (asynchronously polled) -- wait for things to settle down. */
                    log() << "replSet info two primaries (transiently)" << rsLog;
                    return;
                }
            }

            if( p2 ) {
                /* someone else thinks they are primary. */
                if( p == p2 ) {
                    // we thought the same; all set.
                    return;
                }
                if( p == 0 ) {
                    noteARemoteIsPrimary(p2);
                    return;
                }
                // todo xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
                if( p != rs->_self ) {
                    // switch primary from oldremotep->newremotep2
                    noteARemoteIsPrimary(p2);
                    return;
                }
                /* we thought we were primary, yet now someone else thinks they are. */
                if( !rs->elect.aMajoritySeemsToBeUp() ) {
                    /* we can't see a majority.  so the other node is probably the right choice. */
                    noteARemoteIsPrimary(p2);
                    return;
                }
                /* ignore for now, keep thinking we are master.
                   this could just be timing (we poll every couple seconds) or could indicate
                   a problem?  if it happens consistently for a duration of time we should
                   alert the sysadmin.
                */
                return;
            }

            /* didn't find anyone who wants to be primary */
            if( p ) {
                /* we are already primary */

                if( p != rs->_self ) {
                    rs->sethbmsg("error p != rs->self in checkNewState");
                    log() << "replSet " << p->fullName() << rsLog;
                    log() << "replSet " << rs->_self->fullName() << rsLog;
                    return;
                }

                if( rs->elect.shouldRelinquish() ) {
                    log() << "can't see a majority of the set, relinquishing primary" << rsLog;
                    rs->relinquish();
                }

                return;
            }

            if( !rs->iAmPotentiallyHot() ) { // if not we never try to be primary
                OCCASIONALLY log() << "replSet I don't see a primary and I can't elect myself" << endl;
                return;
            }

            /* no one seems to be primary.  shall we try to elect ourself? */
            if( !rs->elect.aMajoritySeemsToBeUp() ) {
                static time_t last;
                static int n;
                int ll = 0;
                if( ++n > 5 ) ll++;
                if( last + 60 > time(0 ) ) ll++;
                log(ll) << "replSet can't see a majority, will not try to elect self" << rsLog;
                last = time(0);
                return;
            }

            if( !rs->iAmElectable() ) {
                return;
            }

            busyWithElectSelf = true; // don't try to do further elections & such while we are already working on one.
        }
        try {
            rs->elect.electSelf();
        }
        catch(RetryAfterSleepException&) {
            /* we want to process new inbounds before trying this again.  so we just put a checkNewstate in the queue for eval later. */
            requeue();
        }
        catch(...) {
            log() << "replSet error unexpected assertion in rs manager" << rsLog;
        }
        busyWithElectSelf = false;
    }

작년에 쓴 글인데, 일단은 살짝 정리개념으로 포스팅 합니다. -_- 뒤에를 더 분석해야 제대로된 건데 말이죠. 일단은 1로 하고 2는 곧 다시 정리하도록 하겠습니다.