caffe-parallel概要

    caffe-parallel通过参数服务器的方式对caffe进行了多机数据并行,底层通信采用MPI。下面记录一些看源码过程中的一些笔记,以免自己忘记。

    caffe-parallel主要修改了solver.cpp net.cpp data_layer.cpp等几个文件。采用多线程的方式实现了参数服务器。0号处理器为参数服务器,有n个线程(n表示共有n个核),其中n-1个ComputeValueThreadClient线程分别对应其余n-1个核,用于异步的接收n-1个核上传来的参数,并向他们发送数据。剩余的n-1个核都是client。1个ComputeValueThreadServer线程用于更新全局参数。

    server上维护一个idleQ队列,存放当前空闲的client,server将每一个迭代分发给了client,从idleQ中取出一个空闲client。

client的实现

    client负责一次迭代的forwardbackward,这部分代码没有变,因为caffe-parallel实现的是数据并行,每一个client的计算任务是独立的,并将新的参数异步通信的发送给server,server的ComputeValueThreadClient线程接收到新参数之后会将参数存到tempDiff[tid]下(getValue函数),注意client发送给server的是两次参数的diff而不是新参数,这样可以利用压缩手段节约通信带宽,实际上caffe-parallel没有进行这部分优化。参数存完之后flagCC被标记为1,++upNum,表示这个线程对应的client发送来了新参数。接下来线程在ComputeValueClient的如下代码位置进行等待,直到ComputeValueThreadServer阶段性的更新完全局参数,broadcast出condCtrl条件变量。然后将新参数的diff发送个对应的client,并将该client压入idleQ队列。此时的client将处在等待接收新参数以及接收一个新的迭代步的阻塞中。

lockmutex lockm(&mutexData);
while(upNum!=0){
    pthread_cond_wait(&condCtrl,&mutexData);
    break;
}
flagCC[tid]=0;

server的实现

    server上的ComputeValueThreadServer会采用pthread_cond_timedwait的方式等待wait_time秒的时间,如果所有client都上传了参数或者等待时间超过,则调用ComputeValueServer()函数对全局参数进行更新。并将diff拷贝到tempDiff中,upNum=0。具体实现代码如下

lockmutex lockm(&mutexData);
while(upNum < upSum){
    timeoutret=pthread_cond_timedwait(&condUp,&mutexData,&wait_time);
    if(timeoutret==ETIMEDOUT){
        LOG(INFO)<<"time out " << upNum;
        break;
    }
}
if(upNum>0){
    layer->ComputeValueServer();
    pthread_cond_broadcast(&condCtrl);
}

其中ComputeValueServer()函数调用了ComputeUpdateValueServerThread函数,进行全局参数的更新,并调用net->Update()对网络进行更新。

瓶颈和优化

    从caffe-parallel的实现中可以看出一下一些问题:

    每一个client对应一个thread,意味着client的数量不能太多,否则thread过多已经通信拥堵将成为瓶颈。可以采用树形结构的多机的server进行解决。

    一个client计算完一个迭代,参数采用mpi_isend的异步方式发送回server,但是此时的client还是会被阻塞在接收新参数和新任务的位置,这里的异步作用不大,也可以看出,参数服务器设计的越异步越能利用client的计算能力。

    参数的通信没有经过压缩,一个diff及其小的参数甚至可以不发送以减小通信数据量,因为多机并行的目的就是为了训练更大的网络更多的参数,这意味着参数的通信将是一个比较大的数据量压缩是必要的。

net的改动   

net的改动不大,当backward的时候每一个layer Backward结束后将这一个layer的参数异步发送到server。

data_layer的数据并行

    data_layer 的改动在于如何读取数据,每个client独立训练模型,他们的数据应该是不同的的batch。caffe支持两种数据格式,leveldb和lmdb。 其中lmdb支持多个模型同时读取一个数据集。并行计算机的数据通常存在全局硬盘中,所有的机器都能访问到同一个文件,lmdb的特性使得每一个 client进程可以独立的读取自己的数据。leveldb必须有0进程通过mpi_send发送给client。所以在DataLayerSetup函 数中,leveldb格式数据的数据集只在rank==0打开,而lmdb在每一个进程都打开。函数最后事先读取一份数据。

leveldb数据并行

    dataLayer 继承了BasePrefetchingDataLayer,数据的读取是在单独的线程中进行,每一组数据都是提前开始读取,在forward的时候可以减 少读数据的等待。从BasePrefetchingDataLayer<Dtype>::Forward_cpu的代码来看,leveldb 格式是mpi_recv接收来自0进程的数据。这部分数据来源要从solver分发迭代给client的时候说起。solver每发出一个迭代都会调用一 次net的ForwardBackwardRoot函数,几经辗转调用了ForwardPrefilledRoot函数,这里将类型为data的 layer调用了ForwardRoot函数,layer的ForwardRoot函数最终调用了父类的 BasePrefetchingDataLayer<Dtype>::Forward_cpu_root,这里0进程会 prefetchdata和mpi_send。

void BasePrefetchingDataLayer<Dtype>::Forward_cpu(const vector<Blob<Dtype>*>& bottom,
      vector<Blob<Dtype>*>* top) {
       switch (this->layer_param_.data_param().backend()){
        case DataParameter_DB_LEVELDB:
        {
    MPI_Status status;
        status.MPI_ERROR=0;
    caffe_mpi_recv<Dtype>((*top)[0]->mutable_cpu_data(),prefetch_data_.count(),
                0,TAG_DATA_OUT,MPI_COMM_WORLD,&status);
    DLOG(INFO)<<"Recv Dataout status "<<status.MPI_ERROR;
    if (this->output_labels_) {
        caffe_mpi_recv<Dtype>((*top)[1]->mutable_cpu_data(),prefetch_label_.count(),
                0,TAG_DATA_OUT_IF,MPI_COMM_WORLD,&status);
    DLOG(INFO)<<"Recv Dataout if status "<<status.MPI_ERROR;
    }}
        break;
        case DataParameter_DB_LMDB:
        {
        Forward_cpu_test(bottom,top);
        }
        break;
        default:
    LOG(FATAL) << "Unknown database backend";
        }}
void BasePrefetchingDataLayer<Dtype>::Forward_cpu_root(const vector<Blob<Dtype>*>& bottom,
      vector<Blob<Dtype>*>* top,const int source) {
       switch (this->layer_param_.data_param().backend()){
        case DataParameter_DB_LEVELDB:
        {
    Forward_cpu_test(bottom,top);
    caffe_mpi_send<Dtype>((*top)[0]->mutable_cpu_data(),prefetch_data_.count(),
                source,TAG_DATA_OUT,MPI_COMM_WORLD);
    if (this->output_labels_) {
        caffe_mpi_send<Dtype>((*top)[1]->mutable_cpu_data(),prefetch_label_.count(),
                source,TAG_DATA_OUT_IF,MPI_COMM_WORLD);
    }
    }
        break;
        case DataParameter_DB_LMDB:
        {
        }
        break;
        default:
    LOG(FATAL) << "Unknown database backend";
        }}

lmdb数据并行

    lmdb的预取相对简单,每个client自行读取就可以。为了保证每个client取到不同的数据,每个client的mdb_cursor都向前跳了itr = this->taskiter * batch_size的位置,其中taskiter是该client获得的迭代任务的iter。代码在void DataLayer<Dtype>::InternalThreadEntry(),该函数是PrefectchThread执行的函数。

void BasePrefetchingDataLayer<Dtype>::Forward_cpu_test(
    const vector<Blob<Dtype>*>& bottom, vector<Blob<Dtype>*>* top) {
  // First, join the thread  JoinPrefetchThread();
  // Copy the data  caffe_copy(prefetch_data_.count(), prefetch_data_.cpu_data(),
             (*top)[0]->mutable_cpu_data());
  if (this->output_labels_) {
    caffe_copy(prefetch_label_.count(), prefetch_label_.cpu_data(),
               (*top)[1]->mutable_cpu_data());
  }
  // Start a new prefetch thread  CreatePrefetchThread();}



留言:
han 说:
2016/4/5 8:43:0

有几个疑问,希望大大能回答。

所有client发送参数以后,server怎么处理接收到的参数?server全局更新以后发送给各个client的参数一样吗?是发送正确率最高的参数吗?

龙门外的鱼 回复: han
2016/4/5 9:44:14

这里只是记一下大概,所以不是太详细。caffe-parallel实现的是异步梯度下降算法,不需要所有client的参数,每次只是更新一小撮client发来的参数的diff(导数,梯度),处理的过程就是梯度下降。这些可以在异步梯度下降算法的相关资料里看到。

han 回复: 龙门外的鱼
2016/4/5 10:11:19

那么全局参数是指的什么,发回去的新参数是通过梯度下降得出的结果?是发给一个client还是所有client

龙门外的鱼 回复: han
2016/4/5 10:15:54

你应该先看一点梯度下降的材料。全局参数就是从client那里获取的diff,不断的被更新,一个client发来一个diff,server处理完这一批的diff之后会给这个client返回一个当前看来比较新的全局参数,client用这个新参数进行下一次的forwardBackward。请看“client的实现”第一段最后一句

han 回复: 龙门外的鱼
2016/4/5 10:39:12

感谢!前几天给longmenwaideyu@126.com发了封邮件不知道你看到了没有

龙门外的鱼 回复: han
2016/4/5 11:43:13

已加qq!咱们多交流,我也刚开始学习

顶一个 回复: 龙门外的鱼
2016/6/1 2:18:47

挺好的例子,性能问题?打开都要等以下才可以回复。

龙门外的鱼 回复: 顶一个
2016/6/1 2:50:33

比较懒,编辑器用的全功能版,几百K的大小,如果页面一打开就加载,在一些老电脑上会卡顿,所以setTimeout了一下。

guyang 说:
2017/5/29 10:42:36

请问能够共享一下源码吗?正在研究这个。

龙门外的鱼 回复: guyang
2017/5/30 14:17:20

源码在github上。https://github.com/sailorsb/caffe-parallel

guyang 回复: 龙门外的鱼
2017/5/31 1:50:4

thanks!