caffe-parallel通过参数服务器的方式对caffe进行了多机数据并行,底层通信采用MPI。下面记录一些看源码过程中的一些笔记,以免自己忘记。
caffe-parallel主要修改了solver.cpp net.cpp data_layer.cpp等几个文件。采用多线程的方式实现了参数服务器。0号处理器为参数服务器,有n个线程(n表示共有n个核),其中n-1个ComputeValueThreadClient线程分别对应其余n-1个核,用于异步的接收n-1个核上传来的参数,并向他们发送数据。剩余的n-1个核都是client。1个ComputeValueThreadServer线程用于更新全局参数。
server上维护一个idleQ队列,存放当前空闲的client,server将每一个迭代分发给了client,从idleQ中取出一个空闲client。
client负责一次迭代的forwardbackward,这部分代码没有变,因为caffe-parallel实现的是数据并行,每一个client的计算任务是独立的,并将新的参数异步通信的发送给server,server的ComputeValueThreadClient线程接收到新参数之后会将参数存到tempDiff[tid]下(getValue函数),注意client发送给server的是两次参数的diff而不是新参数,这样可以利用压缩手段节约通信带宽,实际上caffe-parallel没有进行这部分优化。参数存完之后flagCC被标记为1,++upNum,表示这个线程对应的client发送来了新参数。接下来线程在ComputeValueClient的如下代码位置进行等待,直到ComputeValueThreadServer阶段性的更新完全局参数,broadcast出condCtrl条件变量。然后将新参数的diff发送个对应的client,并将该client压入idleQ队列。此时的client将处在等待接收新参数以及接收一个新的迭代步的阻塞中。
lockmutex lockm(&mutexData);
while(upNum!=0){
    pthread_cond_wait(&condCtrl,&mutexData);
    break;
}
flagCC[tid]=0;server上的ComputeValueThreadServer会采用pthread_cond_timedwait的方式等待wait_time秒的时间,如果所有client都上传了参数或者等待时间超过,则调用ComputeValueServer()函数对全局参数进行更新。并将diff拷贝到tempDiff中,upNum=0。具体实现代码如下
lockmutex lockm(&mutexData);
while(upNum < upSum){
    timeoutret=pthread_cond_timedwait(&condUp,&mutexData,&wait_time);
    if(timeoutret==ETIMEDOUT){
        LOG(INFO)<<"time out " << upNum;
        break;
    }
}
if(upNum>0){
    layer->ComputeValueServer();
    pthread_cond_broadcast(&condCtrl);
}其中ComputeValueServer()函数调用了ComputeUpdateValueServerThread函数,进行全局参数的更新,并调用net->Update()对网络进行更新。
从caffe-parallel的实现中可以看出一下一些问题:
每一个client对应一个thread,意味着client的数量不能太多,否则thread过多已经通信拥堵将成为瓶颈。可以采用树形结构的多机的server进行解决。
一个client计算完一个迭代,参数采用mpi_isend的异步方式发送回server,但是此时的client还是会被阻塞在接收新参数和新任务的位置,这里的异步作用不大,也可以看出,参数服务器设计的越异步越能利用client的计算能力。
参数的通信没有经过压缩,一个diff及其小的参数甚至可以不发送以减小通信数据量,因为多机并行的目的就是为了训练更大的网络更多的参数,这意味着参数的通信将是一个比较大的数据量压缩是必要的。
net的改动不大,当backward的时候每一个layer Backward结束后将这一个layer的参数异步发送到server。
data_layer 的改动在于如何读取数据,每个client独立训练模型,他们的数据应该是不同的的batch。caffe支持两种数据格式,leveldb和lmdb。 其中lmdb支持多个模型同时读取一个数据集。并行计算机的数据通常存在全局硬盘中,所有的机器都能访问到同一个文件,lmdb的特性使得每一个 client进程可以独立的读取自己的数据。leveldb必须有0进程通过mpi_send发送给client。所以在DataLayerSetup函 数中,leveldb格式数据的数据集只在rank==0打开,而lmdb在每一个进程都打开。函数最后事先读取一份数据。
dataLayer 继承了BasePrefetchingDataLayer,数据的读取是在单独的线程中进行,每一组数据都是提前开始读取,在forward的时候可以减 少读数据的等待。从BasePrefetchingDataLayer<Dtype>::Forward_cpu的代码来看,leveldb 格式是mpi_recv接收来自0进程的数据。这部分数据来源要从solver分发迭代给client的时候说起。solver每发出一个迭代都会调用一 次net的ForwardBackwardRoot函数,几经辗转调用了ForwardPrefilledRoot函数,这里将类型为data的 layer调用了ForwardRoot函数,layer的ForwardRoot函数最终调用了父类的 BasePrefetchingDataLayer<Dtype>::Forward_cpu_root,这里0进程会 prefetchdata和mpi_send。
void BasePrefetchingDataLayer<Dtype>::Forward_cpu(const vector<Blob<Dtype>*>& bottom,
      vector<Blob<Dtype>*>* top) {
       switch (this->layer_param_.data_param().backend()){
        case DataParameter_DB_LEVELDB:
        {
    MPI_Status status;
        status.MPI_ERROR=0;
    caffe_mpi_recv<Dtype>((*top)[0]->mutable_cpu_data(),prefetch_data_.count(),
                0,TAG_DATA_OUT,MPI_COMM_WORLD,&status);
    DLOG(INFO)<<"Recv Dataout status "<<status.MPI_ERROR;
    if (this->output_labels_) {
        caffe_mpi_recv<Dtype>((*top)[1]->mutable_cpu_data(),prefetch_label_.count(),
                0,TAG_DATA_OUT_IF,MPI_COMM_WORLD,&status);
    DLOG(INFO)<<"Recv Dataout if status "<<status.MPI_ERROR;
    }}
        break;
        case DataParameter_DB_LMDB:
        {
        Forward_cpu_test(bottom,top);
        }
        break;
        default:
    LOG(FATAL) << "Unknown database backend";
        }}void BasePrefetchingDataLayer<Dtype>::Forward_cpu_root(const vector<Blob<Dtype>*>& bottom,
      vector<Blob<Dtype>*>* top,const int source) {
       switch (this->layer_param_.data_param().backend()){
        case DataParameter_DB_LEVELDB:
        {
    Forward_cpu_test(bottom,top);
    caffe_mpi_send<Dtype>((*top)[0]->mutable_cpu_data(),prefetch_data_.count(),
                source,TAG_DATA_OUT,MPI_COMM_WORLD);
    if (this->output_labels_) {
        caffe_mpi_send<Dtype>((*top)[1]->mutable_cpu_data(),prefetch_label_.count(),
                source,TAG_DATA_OUT_IF,MPI_COMM_WORLD);
    }
    }
        break;
        case DataParameter_DB_LMDB:
        {
        }
        break;
        default:
    LOG(FATAL) << "Unknown database backend";
        }}lmdb的预取相对简单,每个client自行读取就可以。为了保证每个client取到不同的数据,每个client的mdb_cursor都向前跳了itr = this->taskiter * batch_size的位置,其中taskiter是该client获得的迭代任务的iter。代码在void DataLayer<Dtype>::InternalThreadEntry(),该函数是PrefectchThread执行的函数。
void BasePrefetchingDataLayer<Dtype>::Forward_cpu_test(
    const vector<Blob<Dtype>*>& bottom, vector<Blob<Dtype>*>* top) {
  // First, join the thread  JoinPrefetchThread();
  // Copy the data  caffe_copy(prefetch_data_.count(), prefetch_data_.cpu_data(),
             (*top)[0]->mutable_cpu_data());
  if (this->output_labels_) {
    caffe_copy(prefetch_label_.count(), prefetch_label_.cpu_data(),
               (*top)[1]->mutable_cpu_data());
  }
  // Start a new prefetch thread  CreatePrefetchThread();}