当执行到listen()方法后(如下): void listen(int port) { //testTheDb(); log() << "waiting for connections _disibledevent=>" << port << endl; OurListener l(cmdLine.bind_ip, port); l.setAsTimeTracker(); //启动复制 startReplication(); if ( !noHttpInterface ) boost::thread web( boost::bind(&webServerThread, new RestAdminAccess() /* takes ownership */)); ..... }
mongodb会紧跟着执行repl.cpp文件中的startReplication()方法,如下: //repl.cpp void startReplication() { /* if we are going to be a replica set, we aren't doing other forms of replication. */ if( !cmdLine._replSet.empty() ) { //如果使用分布集群,则不允许带slave或master选项 if( replSettings.slave || replSettings.master || replPair ) { log() << "***" << endl; log() << "ERROR: can't use --slave or --master replication options with --replSet" << endl; log() << "***" << endl; } //绑定函数,oplog.cpp ->_logOpRS(), 用于处理replset类型的oplog操作 newRepl(); return; } //绑定函数,oplog.cpp ->_logOpOld(), 用于处理master-slave类型的oplog操作 oldRepl(); /* this was just to see if anything locks for longer than it should -- we need to be careful not to be locked when trying to connect() or query() the other side. */ //boost::thread tempt(tempThread); //当设置参数不正确时(非slave,master且不是replPair) if( !replSettings.slave && !replSettings.master && !replPair ) return; { dblock lk; cc().getAuthenticationInfo()->authorize("admin"); pairSync->init(); } //如果是slave,则开启相关访问(master server)线程 if ( replSettings.slave || replPair ) { if ( replSettings.slave ) { assert( replSettings.slave == SimpleSlave ); log(1) << "slave=true" << endl; } else replSettings.slave = ReplPairSlave; //构造并启动线程方法replSlaveThread boost::thread repl_thread(replSlaveThread); } //如果是master,则构造并启动线程方法replMasterThread if ( replSettings.master || replPair ) { if ( replSettings.master ) log(1) << "master=true" << endl; replSettings.master = true; createOplog();//构造oplog集合“local.oplog.$main” boost::thread t(replMasterThread); } // don't allow writes until we've set up from log while( replSettings.fastsync ) sleepmillis( 50 ); }
上面方法在完成必要的参数分析检查之后,就会根据slave的配置信息来构造启动线程方法replSlaveThread,如下: //repl.cpp void replSlaveThread() { sleepsecs(1); Client::initThread("replslave"); cc().iAmSyncThread(); { dblock lk; //获取认证信息(mongodb支持使用安全认证不管哪种replicate方式, //只要在master/slave中创建一个能为各个database认识的用户名/密码即可) cc().getAuthenticationInfo()->authorize("admin"); ...... } while ( 1 ) { try { //repl主函数 replMain(); if ( debug_stop_repl ) break; sleepsecs(5);//休眠5秒 } catch ( AssertionException& ) { ReplInfo r("Assertion in replSlaveThread(): sleeping 5 minutes before retry"); problem() << "Assertion in replSlaveThread(): sleeping 5 minutes before retry" << endl; sleepsecs(300); } } }
上面方法中有mongodb进行认证的逻辑,之后就会用一个while(1)来循环执行(注:每5秒执行一次)replMain()方法,如下: //repl.cpp void replMain() { ReplSource::SourceVector sources; while ( 1 ) { int s = 0; { dblock lk; //复制失败 if ( replAllDead ) {//该标识符会在同步出现异常如(out of sync)时为true //autoresync:自动地重新执行完整的同步,如果这个从结点脱离了与主结点的同步。 if ( !replSettings.autoresync || !ReplSource::throttledForceResyncDead( "auto" ) ) break; } // i.e., there is _disibledevent=>// we will want to change/fix this. assert( syncing == 0 ); syncing++; } try { int nApplied = 0; //repl主函数 s = _replMain(sources, nApplied); if( s == 1 ) { if( nApplied == 0 ) s = 2; else if( nApplied > 100 ) { // sleep very little - just enought that we aren't truly hammering master sleepmillis(75); s = 0; } } } catch (...) { out() << "caught exception in _replMain" << endl; s = 4; } ...... }
上面代码又是一个while(1)循环,它会判断当前slave是否处于“out of sync”状态,如果是,但未开启autoresync时,则复制将被挂起。否则会执行_replMain方法来进行同步,如下: //repl.cpp int _replMain(ReplSource::SourceVector& sources, int& nApplied) { { ReplInfo r("replMain load sources"); dblock lk; ReplSource::loadAll(sources);//绑定相应的master sources,以便后面遍历同步 /*fastsync: 从主结点的快照中启动一个从结点。与完整的同步相比, 这个选项允许一个从结点更快地启动,如果它的数据目录已经使用主结点的快照进行初始化。*/ replSettings.fastsync = false; // 初始重置时需要该参数 } ..... int sleepAdvice = 1; for ( ReplSource::SourceVector::iterator i = sources.begin(); i != sources.end(); i++ ) { ReplSource *s = i->get(); int res = -1; try { res = s->sync(nApplied); //是否有其它要同步的数据库 bool moreToSync = s->haveMoreDbsToSync(); if( res < 0 ) { sleepAdvice = 3; } else if( moreToSync ) { sleepAdvice = 0; } else if ( s->sleepAdvice() ) { sleepAdvice = s->sleepAdvice(); } else sleepAdvice = res; if ( res >= 0 && !moreToSync /*&& !s->syncedTo.isNull()*/ ) { pairSync->setInitialSyncCompletedLocking(); } } ...... } return sleepAdvice; }
上面的_replMain()方法首先会从“local.sources”数据集中获取主(master)节点的信息。这里解释一下,mongod从结点在启动时会将启动参数(--source)中的信息保存到"local.sources"数据集中,并通过此函数(loadAll())中加载到一个集合对象中。这里我们看一下其加载代码,如下: /* slave: pull some data from the master's oplog note: not yet in db mutex at this point. @return -1 error 0 ok, don't sleep 1 ok, sleep */ //repl.cpp void ReplSource::loadAll(SourceVector &v) { Client::Context ctx("local.sources"); SourceVector old = v; v.clear(); ...... //遍历local.sources,并将其中的replsource绑定到SourceVector中 shared_ptr
完成了source的加载之后,就可以开始对source进行同步了,即如下代码: int ReplSource::sync(int& nApplied) { _sleepAdviceTime = 0; ReplInfo r("sync"); if ( !cmdLine.quiet ) { Nullstream& l = log(); l << "repl: from "; if( sourceName() != "main" ) { l << "source:" << sourceName() << ' '; } l << "host:" << hostName << endl; } nClonedThisPass = 0; // 对于localhost这类本机地址,则不能同步 //FIXME Handle cases where this db isn't _disibledevent=> if ( (string("localhost") == hostName || string("127.0.0.1") == hostName) && cmdLine.port == CmdLine::DefaultDBPort ) { log() << "repl: can't sync from self (localhost). sources configuration may be wrong." << endl; sleepsecs(5); return -1; } //oplogReader链接时出现异常 if ( !oplogReader.connect(hostName) ) { log(4) << "repl: can't connect to sync source" << endl; if ( replPair && paired ) { assert( startsWith(hostName.c_str(), replPair->remoteHost.c_str()) ); replPair->arbitrate();//paired模式下同级结点进行仲裁 } return -1; } if ( paired ) { int remote = replPair->negotiate(oplogReader.conn(), "direct"); int nMasters = ( remote == ReplPair::State_Master ) + ( replPair->state == ReplPair::State_Master ); //主库不为1时,则异常 if ( getInitialSyncCompleted() && nMasters != 1 ) { log() << ( nMasters == 0 ? "no master" : "two masters" ) << ", deferring oplog pull" << endl; return 1; } } ...... return sync_pullOpLog(nApplied);//根据oplog日志进行同步 }
上面方法先进行一些异常判断,比如source为localhost地址,以及无法链接(oplogReader.connect)等情况。之后,调用sync_pullOpLog方法,从master上获取oplog来同步数据信息,如下: //repl.cpp 获取主库的oplog数据 int ReplSource::sync_pullOpLog(int& nApplied) { int okResultCode = 1; //获取主库中oplog的名空间信息 string ns = string("local.oplog.$") + sourceName(); log(2) << "repl: sync_pullOpLog " << ns << " syncedTo:" << syncedTo.toStringLong() << '\n'; bool tailing = true; oplogReader.tailCheck(); if ( replPair && replPair->state == ReplPair::State_Master ) { dblock lk; idTracker.reset(); } OpTime localLogTail = _lastSavedLocalTs; //是否初始化 bool initial = syncedTo.isNull(); //无游标支持或要初始化时 if ( !oplogReader.haveCursor() || initial ) { if ( initial ) { //显示数据库信息之前,获取最新(last)的 oplog日志时间戳. syncToTailOfRemoteLog(); BSONObj info; bool ok = oplogReader.conn()->runCommand( "admin", BSON( "listDatabases" << 1 ), info ); massert( 10389 , "Unable to get database list", ok ); BSONObjIterator i( info.getField( "databases" ).embeddedObject() ); while( i.moreWithEOO() ) { BSONElement e = i.next(); if ( e.eoo() ) break; string name = e.embeddedObject().getField( "name" ).valuestr(); if ( !e.embeddedObject().getBoolField( "empty" ) ) { if ( name != "local" ) { if ( _disibledevent=>|| _disibledevent=>== name ) { log( 2 ) << "adding to 'addDbNextPass': " << name << endl; addDbNextPass.insert( name ); } } } } dblock lk; save(); } //本次同步的日志时间戳(gte:大于或等于) BSONObjBuilder q; q.appendDate("$gte", syncedTo.asDate()); BSONObjBuilder query; query.append("ts", q.done()); if ( !only.empty() ) { // note we may here skip a LOT of data table scanning, a lot of work for the master. query.appendRegex("ns", string("^") + _disibledevent=>// maybe append "\\." here? } BSONObj queryObj = query.done(); // e.g. queryObj = { ts: { $gte: syncedTo } } //tailingQuery查询,并绑定其相应游标信息 oplogReader.tailingQuery(ns.c_str(), queryObj); tailing = false; } else { log(2) << "repl: tailing=true\n"; } //如果依旧无游标信息,可能链接被关闭,则尝试重置链接 if( !oplogReader.haveCursor() ) { problem() << "repl: dbclient::query returns null (conn closed?)" << endl; oplogReader.resetConnection(); return -1; }
// show any deferred(延期,搁置)database creates from a previous pass { set
//将获取到的oplog操作信息应用到“从结点(slave)”上 //apply operations { int n = 0; time_t saveLast = time(0); while ( 1 ) { bool moreInitialSyncsPending = !addDbNextPass.empty() && n; // we need "&& n" to assure we actually process at least _disibledevent=> if ( moreInitialSyncsPending || !oplogReader.more() ) { dblock lk; OpTime nextLastSaved = nextLastSavedLocalTs(); { dbtemprelease t; if ( !moreInitialSyncsPending && oplogReader.more() ) { if ( getInitialSyncCompleted() ) { // if initial sync hasn't completed, break out of loop so we can set to completed or clone more dbs continue; } } else { setLastSavedLocalTs( nextLastSaved ); } } if( oplogReader.awaitCapable() && tailing ) okResultCode = 0; // don't sleep //syncedTo设置时间戳,保存从结点的更新时间,用于下次从结点需要查询新的oplog时, //它使用“syncedTo”获得它需要的新的oplog,或者用于发现它已经out-of-sync。 syncedTo = nextOpTime; save(); // note how far we are synced up to now log() << "repl: applied " << n << " operations" << endl; nApplied = n; log() << "repl: end sync_pullOpLog syncedTo: " << syncedTo.toStringLong() << endl; break; } else { } ..... BSONObj op = oplogReader.next(); unsigned b = replApplyBatchSize; bool justOne = b == 1; scoped_ptr
上面方法代码比较长,首先它会初始要链接的master的oplog名空间信息,之后尝试链接到master上以获取相应的cursor id信息。该cursor id用于标识当前slave访问master时所使用的cursor(因为master中的oplog cursor会在返回oplog信息时不关闭,它支持下次slave链接使用该cursor时从相应的pos位置继续获取。这块内容可以参见这篇文章 Mongodb源码分析--Replication之主从模式--Master). 如果cursorid正常,则使用oplogReader对象来获取相应的oplog信息(注:oplogReader为mongodb为了查询oplog而声明的一个类,详情参见其源码文件oplogreader.h) 除此以外,上面方法还会设置本地的syncedTo来记录下次同步时使用的时间戳。同时根据本地保存的时间戳与oplogReader所返回的master oplog信息中的ts进行比较,以判断是否出现out of sync的情况,以进而执行resync操作。有关这类逻辑说明参见上面的代码注释即可。 当获取的master oplog符合(本地时间戳)要求时,则将oplog对象进行分解,以进而将其中的数据对象保存到本地并执行持久化操作(上面代码段:getDur().commitIfNeeded())。这里要说明的是,将oplog进行分解时执行的逻辑通过sync_pullOpLog_applyOperation方法中执行,因为这里牵扯到oplog的数据结构,由于篇幅所限,这部分内容我会在后面章节中进行说明,我们只要知道在分解结束后,最终调用如下方法将数据放到本机上: //repl.cpp void ReplSource::applyOperation(const BSONObj& op) { //oplog.cpp void applyOperation_inlock(const BSONObj& op , bool fromRepl ) { OpCounters * opCounters = fromRepl ? &replOpCounters : &globalOpCounters; if( logLevel >= 6 ) log() << "applying op: " << op << endl; assertInWriteLock(); OpDebug debug; BSONObj o = op.getObjectField("o");//实际操作的document const char *ns = op.getStringField("ns"); // operation type -- see logOp() comments for types const char *opType = op.getStringField("op"); if ( *opType == 'i' ) {//插入 opCounters->gotInsert(); const char *p = strchr(ns, '.'); if ( p && strcmp(p, ".system.indexes") == 0 ) { // updates aren't allowed for indexes -- so we will do a regular insert. if index already // exists, that is ok. theDataFileMgr.insert(ns, (void*) o.objdata(), o.objsize()); } else { // do upserts for inserts as we might get replayed more than _disibledevent=> BSONElement _id; if( !o.getObjectID(_id) ) { /* No _id. This will be very slow. */ Timer t; updateObjects(ns, o, o, true, false, false , debug ); if( t.millis() >= 2 ) { RARELY OCCASIONALLY log() << "warning, repl doing slow updates (no _id field) for " << ns << endl; } } else { BSONObjBuilder b; b.append(_id); RARELY ensureHaveIdIndex(ns); // otherwise updates will be slow updateObjects(ns, o, b.done(), true, false, false , debug ); } } } else if ( *opType == 'u' ) {//更新 opCounters->gotUpdate(); RARELY ensureHaveIdIndex(ns); // otherwise updates will be super slow updateObjects(ns, o, op.getObjectField("o2"), /*upsert*/ op.getBoolField("b"), /*multi*/ false, /*logop*/ false , debug ); } else if ( *opType == 'd' ) {//删除 opCounters->gotDelete(); if ( opType[1] == 0 ) deleteObjects(ns, o, op.getBoolField("b")); else assert( opType[1] == 'b' ); // "db" advertisement } else if ( *opType == 'n' ) {//无操作 // no op } else if ( *opType == 'c' ) {//command opCounters->gotCommand(); BufBuilder bb; BSONObjBuilder ob; _runCommands(ns, o, bb, ob, true, 0); } else { stringstream ss; ss << "unknown opType [" << opType << "]"; throw MsgAssertionException( 13141 , ss.str() ); } }
到这里,关于如何进行crud的操作,就与我之前的几篇文章的内容关联上了,大家回顾一下即可。最后用一次时序图来回顾一下slave的执行流程: 好了,今天的内容到这里就告一段落了。 原文链接:http://www.cnblogs.com/daizhj/archive/2011/06/20/mongodb_sourcecode_repl_slave_run.html 作者: daizhj, 代震军 微博: http://t.sina.com.cn/daizhj Tags: mongodb,c++,Replica,master-slave
最新评论