pika是360开源的一款使用rocksdb作为底层存储的kv数据库,协议层走的redis协议,所以也可以看做一款将数据持久化入rocksdb的”redis”。

pika的代码还是比较清晰的,下面简单的剖析一下它的实现架构。

线程模型

首先先来看一下线程模型,只有把核心的骨架摸透,整个流程才能了然于胸。我们简单的梳理一下核心的几个线程。

Dispatch thread

分发线程,主要处理网络IO,分为监听所有连接的ServerThread与连接IO处理线程WorkerThread,它们以底层库的形式存在于pink这个静态库中。

这是一套比较经典的网络模型了,每个线程使用一个epoll fd来监听事件,ServerThread用于监听listen fd并且accept连接,然后根据一些策略丢入WorkerThread中进行处理。

WorkerThread获取到ServerThread建立的fd之后,处理IO事件。

当然这个IO事件和DB普通的应答模式来说,不太一样,redis协议中有pipeline模式,该模式简单的来说,就是客户端不必发送每个请求之后都读取回应,可以发送一批请求后一次性的读取出来。所以这块的IO处理还有些特别。

Thread pool

除了以上两个网络处理线程之外,还有一个线程池模块,主要用于处理网络模块中解析完的请求,然后进行处理,处理完毕后继续丢入网络模块进行发送。

看完了几个核心的线程,我们来梳理一下核心的处理代码,首先是读取请求并丢入线程池的部分:

        if ((pfe->mask & EPOLLOUT) && in_conn->is_reply()) {
          WriteStatus write_status = in_conn->SendReply();
          in_conn->set_last_interaction(now);
          if (write_status == kWriteAll) {
            pink_epoll_->PinkModEvent(pfe->fd, 0, EPOLLIN);
            in_conn->set_is_reply(false);
          } else if (write_status == kWriteHalf) {
            continue;
          } else {
            should_close = 1;
          }
        }

        if (!should_close && (pfe->mask & EPOLLIN)) {
          ReadStatus read_status = in_conn->GetRequest();
          in_conn->set_last_interaction(now);
          if (read_status == kReadAll) {
            pink_epoll_->PinkModEvent(pfe->fd, 0, 0);
            // Wait for the conn complete asynchronous task and
            // Mod Event to EPOLLOUT
          } else if (read_status == kReadHalf) {
            continue;
          } else {
            should_close = 1;
          }
        }

主要看第二个if语句块。在这里假设fd可读,则解析请求,当获取到了完整的请求之后,则会将请求丢入线程池:

void PikaClientConn::AsynProcessRedisCmds(const std::vector<pink::RedisCmdArgsType>& argvs, std::string* response) {
  BgTaskArg* arg = new BgTaskArg();
  arg->redis_cmds = argvs;
  arg->response = response;
  arg->pcc = std::dynamic_pointer_cast<PikaClientConn>(shared_from_this());
  g_pika_server->Schedule(&DoBackgroundTask, arg);
}

Schedule方法就是投入线程池的消息队列并通过条件变量来唤醒工作线程来获取请求。

当线程池处理完毕之后,会丢回IO线程中:

void PikaClientConn::BatchExecRedisCmd(const std::vector<pink::RedisCmdArgsType>& argvs, std::string* response) {
  bool success = true;
  for (const auto& argv : argvs) {
    if (DealMessage(argv, response) != 0) {
      success = false;
      break;
    }
  }
  if (!response->empty()) {
    set_is_reply(true);
    NotifyEpoll(success);
  }
}

NotifyEpoll通知IO线程,将对应fd的EPOLLOUTEPOLLIN注册上去,然后当fd可写的时候发送回应。该代码片段在丢入线程池那里有,这里有一个比较巧妙的处理,当处于pipeline模式的时候,假设fd的写缓冲区写满了,然后这时候又有新请求过来了,那么会将该fd的所有事件取消,然后线程池处理请求,注意这里假设不取消所有事件,则在fd可写的时候,会造成回应数据被多个线程写,引发各种不可预测的结果。

先简单的介绍一下程序的线程架构,后面有时间再写一下blackwidow对于rocksdb的封装,用kv来模拟redis中各种数据格式。

共 0 条回复
暂时没有人回复哦,赶紧抢沙发
发表新回复

作者

sryan
today is a good day