caffe-parallel通过参数服务器的方式对caffe进行了多机数据并行,底层通信采用MPI。下面记录一些看源码过程中的一些笔记,以免自己忘记。
caffe-parallel主要修改了solver.cpp net.cpp data_layer.cpp等几个文件。采用多线程的方式实现了参数服务器。0号处理器为参数服务器,有n个线程(n表示共有n个核),其中n-1个ComputeValueThreadClient线程分别对应其余n-1个核,用于异步的接收n-1个核上传来的参数,并向他们发送数据。剩余的n-1个核都是client。1个ComputeValueThreadServer线程用于更新全局参数。
server上维护一个idleQ队列,存放当前空闲的client,server将每一个迭代分发给了client,从idleQ中取出一个空闲client。
client负责一次迭代的forwardbackward,这部分代码没有变,因为caffe-parallel实现的是数据并行,每一个client的计算任务是独立的,并将新的参数异步通信的发送给server,server的ComputeValueThreadClient线程接收到新参数之后会将参数存到tempDiff[tid]下(getValue函数),注意client发送给server的是两次参数的diff而不是新参数,这样可以利用压缩手段节约通信带宽,实际上caffe-parallel没有进行这部分优化。参数存完之后flagCC被标记为1,++upNum,表示这个线程对应的client发送来了新参数。接下来线程在ComputeValueClient的如下代码位置进行等待,直到ComputeValueThreadServer阶段性的更新完全局参数,broadcast出condCtrl条件变量。然后将新参数的diff发送个对应的client,并将该client压入idleQ队列。此时的client将处在等待接收新参数以及接收一个新的迭代步的阻塞中。
lockmutex lockm(&mutexData); while(upNum!=0){ pthread_cond_wait(&condCtrl,&mutexData); break; } flagCC[tid]=0;
server上的ComputeValueThreadServer会采用pthread_cond_timedwait的方式等待wait_time秒的时间,如果所有client都上传了参数或者等待时间超过,则调用ComputeValueServer()函数对全局参数进行更新。并将diff拷贝到tempDiff中,upNum=0。具体实现代码如下
lockmutex lockm(&mutexData); while(upNum < upSum){ timeoutret=pthread_cond_timedwait(&condUp,&mutexData,&wait_time); if(timeoutret==ETIMEDOUT){ LOG(INFO)<<"time out " << upNum; break; } } if(upNum>0){ layer->ComputeValueServer(); pthread_cond_broadcast(&condCtrl); }
其中ComputeValueServer()函数调用了ComputeUpdateValueServerThread函数,进行全局参数的更新,并调用net->Update()对网络进行更新。
从caffe-parallel的实现中可以看出一下一些问题:
每一个client对应一个thread,意味着client的数量不能太多,否则thread过多已经通信拥堵将成为瓶颈。可以采用树形结构的多机的server进行解决。
一个client计算完一个迭代,参数采用mpi_isend的异步方式发送回server,但是此时的client还是会被阻塞在接收新参数和新任务的位置,这里的异步作用不大,也可以看出,参数服务器设计的越异步越能利用client的计算能力。
参数的通信没有经过压缩,一个diff及其小的参数甚至可以不发送以减小通信数据量,因为多机并行的目的就是为了训练更大的网络更多的参数,这意味着参数的通信将是一个比较大的数据量压缩是必要的。
net的改动不大,当backward的时候每一个layer Backward结束后将这一个layer的参数异步发送到server。
data_layer 的改动在于如何读取数据,每个client独立训练模型,他们的数据应该是不同的的batch。caffe支持两种数据格式,leveldb和lmdb。 其中lmdb支持多个模型同时读取一个数据集。并行计算机的数据通常存在全局硬盘中,所有的机器都能访问到同一个文件,lmdb的特性使得每一个 client进程可以独立的读取自己的数据。leveldb必须有0进程通过mpi_send发送给client。所以在DataLayerSetup函 数中,leveldb格式数据的数据集只在rank==0打开,而lmdb在每一个进程都打开。函数最后事先读取一份数据。
dataLayer 继承了BasePrefetchingDataLayer,数据的读取是在单独的线程中进行,每一组数据都是提前开始读取,在forward的时候可以减 少读数据的等待。从BasePrefetchingDataLayer<Dtype>::Forward_cpu的代码来看,leveldb 格式是mpi_recv接收来自0进程的数据。这部分数据来源要从solver分发迭代给client的时候说起。solver每发出一个迭代都会调用一 次net的ForwardBackwardRoot函数,几经辗转调用了ForwardPrefilledRoot函数,这里将类型为data的 layer调用了ForwardRoot函数,layer的ForwardRoot函数最终调用了父类的 BasePrefetchingDataLayer<Dtype>::Forward_cpu_root,这里0进程会 prefetchdata和mpi_send。
void BasePrefetchingDataLayer<Dtype>::Forward_cpu(const vector<Blob<Dtype>*>& bottom, vector<Blob<Dtype>*>* top) { switch (this->layer_param_.data_param().backend()){ case DataParameter_DB_LEVELDB: { MPI_Status status; status.MPI_ERROR=0; caffe_mpi_recv<Dtype>((*top)[0]->mutable_cpu_data(),prefetch_data_.count(), 0,TAG_DATA_OUT,MPI_COMM_WORLD,&status); DLOG(INFO)<<"Recv Dataout status "<<status.MPI_ERROR; if (this->output_labels_) { caffe_mpi_recv<Dtype>((*top)[1]->mutable_cpu_data(),prefetch_label_.count(), 0,TAG_DATA_OUT_IF,MPI_COMM_WORLD,&status); DLOG(INFO)<<"Recv Dataout if status "<<status.MPI_ERROR; }} break; case DataParameter_DB_LMDB: { Forward_cpu_test(bottom,top); } break; default: LOG(FATAL) << "Unknown database backend"; }}
void BasePrefetchingDataLayer<Dtype>::Forward_cpu_root(const vector<Blob<Dtype>*>& bottom, vector<Blob<Dtype>*>* top,const int source) { switch (this->layer_param_.data_param().backend()){ case DataParameter_DB_LEVELDB: { Forward_cpu_test(bottom,top); caffe_mpi_send<Dtype>((*top)[0]->mutable_cpu_data(),prefetch_data_.count(), source,TAG_DATA_OUT,MPI_COMM_WORLD); if (this->output_labels_) { caffe_mpi_send<Dtype>((*top)[1]->mutable_cpu_data(),prefetch_label_.count(), source,TAG_DATA_OUT_IF,MPI_COMM_WORLD); } } break; case DataParameter_DB_LMDB: { } break; default: LOG(FATAL) << "Unknown database backend"; }}
lmdb的预取相对简单,每个client自行读取就可以。为了保证每个client取到不同的数据,每个client的mdb_cursor都向前跳了itr = this->taskiter * batch_size的位置,其中taskiter是该client获得的迭代任务的iter。代码在void DataLayer<Dtype>::InternalThreadEntry(),该函数是PrefectchThread执行的函数。
void BasePrefetchingDataLayer<Dtype>::Forward_cpu_test( const vector<Blob<Dtype>*>& bottom, vector<Blob<Dtype>*>* top) { // First, join the thread JoinPrefetchThread(); // Copy the data caffe_copy(prefetch_data_.count(), prefetch_data_.cpu_data(), (*top)[0]->mutable_cpu_data()); if (this->output_labels_) { caffe_copy(prefetch_label_.count(), prefetch_label_.cpu_data(), (*top)[1]->mutable_cpu_data()); } // Start a new prefetch thread CreatePrefetchThread();}