
ceph源码分析之读写操作流程(2).doc
7页ceph 源码分析之读写操作流程( 2)上一篇介绍了 ceph 存储在上两层的消息逻辑, 这一篇主要介绍一下读写操作在底两层的流程下图是上一篇消息流程的一个总结 上在 ceph 中,读写操作由于分布式存储的原因,故走了不同流程对于读操作而言:1.客户端直接计算出存储数据所属于的主 osd,直接给主 osd上发送消息2.主 osd 收到消息后,可以调用 Filestore 直接读取处在底层文件系统中的主 pg 里面的内容然后返回给客户端具体调用函数在 ReplicatedPG::do_osd_ops 中实现读操作代码流程如图:如我们之前说的,当确定读操作为主 osd 的消息时( CEPH_MSG_OSD_OP 类型),会调用到ReplicatePG::do_osd_op 函数,该函数对类型做进一步判断,当发现为读类型( CEPH_OSD_OP_READ )时 ,会调用FileStore 中的函数对磁盘上数据进行读[cpp] view plain copy int ReplicatedPG::do_osd_ops(OpContext*ctx, vector<OSDOp>& ops){switch (op.op) {caseCEPH_OSD_OP_READ:++ctx->num_read;{// read into a bufferbufferlistbl; int r = osd->store->read(coll, soid, // 调用FileStore::read从底层文件系统读取}caseCEPH_OSD_OP_WRITE:++ctx->num_write; { //写操作只是做准备工作,并不实际的写 } } } FileStore::read函数是底层具体的实现,会通过调用系统函数如::open,::pread,::close 等函数来完成具体的操作。
[cpp] viewplain copy int FileStore::read( coll_t cid, constghobject_t& oid, uint64_t offset, size_t len,bufferlist& bl, bool allow_eio) {int r = lfn_open(cid, oid, false, &fd);got = safe_pread(**fd, bptr.c_str(), len, offset);//FileStore::safe_pread 中调用了 ::preadlfn_close(fd); } 而对于写操作而言,由于要保证数据写入的同步性就会复杂很多:1.首先客户端会将数据发送给主 osd,2.主 osd 同样要先进行写操作预处理,完成后它要发送写消息给其他的从 osd,让他们对副本 pg 进行更改,3.从 osd 通过 FileJournal 完成写操作到 Journal 中后发送消息告诉主 osd 说完成,进入 54.当主 osd 收到所有的从 osd 完成写操作的消息后,会通过FileJournal 完成自身的写操作到 Journal 中。
完成后会通知客户端,已经完成了写操作5.主 osd,从 osd 的线程开始工作调用 Filestore 将 Journal 中的数据写入到底层文件系统中在介绍写操作的流程前,需要先介绍一下 ceph 中的 callback 函数Context 类定义在 src/include 文件中,该类是一个回调函数类的抽象类,继承它的类只要在子类实现它的 finish 函数,在finish 函数调用自己需要回调的函数,就可以完成回调[cpp] view plain copy class Context { Context(constContext& other); const Context&operator=(const Context& other); protected:virtual void finish(int r) = 0; public: Context() {}virtual ~Context() {} // we want a virtualdestructor!!! virtual void complete(int r) { finish(r);delete this; } }; Finisher 类是在 src/common 中定义的一个专门查看操作是否结束的一个类。
在这个类里面拥有一个线程 finisher_thread 和一个类型为 Context 指针的队列finisher_queue当一个操作线程完成自己的操作后,会将Context 类型对象送入队列此时 finisher_thread 线程循环监视着自己的 finisher_queue 队列,当发现了有新进入的Context 时,会调用这个 Context::complete 函数,这个函数则会调用到 Context 子类自己实现的 finish 函数来处理操作完成后的后续工作[cpp] view plain copy class Finisher {CephContext*cct;vector<Context*>finisher_queue;void*finisher_thread_entry();struct FinisherThread : publicThread {Finisher *fin;FinisherThread(Finisher *f) : fin(f) {}void* entry(){ return (void*)fin->finisher_thread_entry(); }}finisher_thread;}void*Finisher::finisher_thread_entry(){while(!finisher_stop){while(!finisher_queue.empty()){vector<Context*> lsls.swap(finisher_queue);for(vector<Context*>::iterator p = ls.begin();p != ls.end();++p) {if(*p) {//这里面调用 Context 子类实现的 finish函数(*p)->complete(0);}}}}}在写操作中涉及了多个线程和消息队列的协同工作,需要注意的是一个类拥有一个 Finisher成员时,以为着它同时获得了一个队列和一个执行线程。
OSD 中处理读写操作是线程池和消息队列 (有很多, 其他暂时不讨论):[cpp] view plain copy ThreadPool op_tp;ThreadPool::WorkQueueVal<pair<PGRef,OpRequestRef>, PGRef> &op_wq;FileJournal 中拥有的线程和消息队列:[cpp] view plain copy Write write_thread;deque<write_item> writeq;其父类 Journal 中拥有线程和消息队列(引用自之前说的JournalingObjectStore 类中):[cpp] view plain copy Finisher finisher_thread;FileStore 中拥有的线程和消息队列:[cpp] view plain copy ThreadPool op_tp;op_wq;//Filestore 中实现,继承自ThreadPool::WorkQueue<OpSequencer>ondisk_finisher; Finisher op_finisher;OpWQFinisher前一章说过在前两层, OSD 根据不同的消息类型,选择了主 OSD 处理和从 OSD 的处理,以下介绍的写流程,就是在收到具体写操作以后,本地 OSD 开始的工作。
写的逻辑流程图如图:从图中我们可以看到写操作分为以下几步:1.OSD::op_tp 线程从 OSD::op_wq 中拿出来操作如本文开始的图上描述,具体代码流是 ReplicatePG::apply_repop 中创建回调类 C_OSD_OpCommit 和 C_OSD_OpAppliedFileStore::queue_transactions 中创建了回调类C_JournaledAhead2.FileJournal::write_thread 线程从 FileJournal::writeq 中拿出来操作,主要就是写数据到具体的 journal 中,具体代码流:3.Journal::Finisher.finisher_thread 线程从Journal::Finisher.finish_queue 中拿出来操作,通过调用C_JournalAhead 留下的回调函数 FileStore:_journaled_ahead,该线程开始工作两件事: 首先入底层 FileStore::op_wq 通知开始写,再入 FileStore::ondisk_finisher.finisher_queue 通知可以返回。
具体代码流: 4.FileStore::ondisk_finisher.finisher_thread线程从 。
