Overview
Petuum is a machine learning system for distributed machine learning system. The underlying idea of this system is Parameter Server. Parameter server is proposed to solve model training with huge amount of data and huge model size.
This kind of modeling usually found in online advertising business. Only in this level, we have billions of training data, billions of parameters for model. Because of capacity can’t be handled by a single machine, parameter server will distribute the model parameter and parameter updating across multiple machines.
For distributed system, there are several problems always need to be addressed:
-
Consistency handling
-
Synchronization across multiple machines
Role Assignment
For a normal parameter server system (i.e., the one implemented by Mu Li), each machined will be assigned a single role: Scheduler, Server, Worker.
Usually, scheduler will organize & coordinate workers and servers, issue the order to start the training, control the distribution of workload across workers, etc.
Worker role usually responsible for computing sufficient statistics from training data, request workload from scheduler, send sufficient statistics to server for model updating. requesting new parameters from servers.
Server role usually responsible for parameter storage, receiving sufficient statistics from worker, updating parameter on local machine and answering the parameter request from worker node.
In this design, following steps will be carried out:
-
start scheduler, waiting for registration of servers and workers.
-
start servers and workers, connect to scheduler to something. In the same time, scheduler will broadcast new joining node to all existing node to share the global information.
-
scheduler node will issue the command to start work.
-
worker node will request workload from scheduler, request parameters needed from server, computing sufficient statistics from local data, send sufficient statistics to server.
-
server node will receive sufficient statistics from worker node, refresh local parameters using statistics and answering requests from worker node.
-
all the training or activities stopped.
However, for Bosen, there is no such role assignment explicitly. This is kind of a weird design for me. How does each machine know its role, know the global network information, all the machines involving this work?
There is no scheduler node? This is bosen adopt a very fixed design system. For each running node, there is a specification file which will list all the ip address/port number, involved in one parameter server running.
Even better, each ip address/port will get an assigned number which is unique. So in this way, without the involvement of Scheduler node, the network topology information is acquired before it starts.
Of course, I think this isn’t a very flexible design, but it solve the problem in a different way 🙂
But there is another question, if we got the list of available machines, how do we know its role from its id? The answer is ever more astonishing! For each involved machine, there are two kinds of thread: background thread and server thread.
In this way, worker role and server role existed in the same machine. Also in this way, thread will be the first citizen of this new system, worker/server role will just be played by a thread rather than a machine.
We get a even better things, each machine could have multiple background/server threads severing the role of worker/server.
But do we really got no scheduler node? The answer is no. Inside the machine of ID 0, there is a special thread “Node Thread”. Node thread play the role of scheduler, all the background thread, server thread will register to this “name thread”, get associated topology fro name node.
So inside this framework, here are the summary of thread type:
-
application thread: thread doing business logic, using parameters to some computing.
-
background thread: maintaining the local parameters, responsible the request from application thread and communicate with server thread.
-
server thread: hold the ground truth of parameters, maintaining global synchronization and server request from background thread.
So this means, application thread only work with background thread.
Table Creation Flow
So how to create a table inside Bosen system? Here are the steps:
-
Create a PSTableGroupOption, you need to give global information in this struct: #tables will be created, #communication channel in each client.
-
Create TableOption struct to create each table specifically.
-
Finish the table creation and start application thread to access the table directly.
There are several important information behind those scenarios:
-
There are class provide static method for global access: PSTableGroup, GlobalContext. This means all the importance request are issued through static method of PSTableGroup class. And GlobalContext class contain all the shared information across different treads.
-
application thread can only access global interface like PSTableGroup, PSTableGroup will forward its message to related thread for processing.
What does this happens behind the API calls?
-
init thread (i.e., main thread) will issue a request to create table (which contain meta information about this table) to background thread.
-
background thread receive this message request from init thread.
-
Then the first background thread will send a create table request to Name Node.
-
Upon receiving a request of create table, Name Node will issue a request to all server thread to create table with specified meta.
-
Server thread will create table using meta information, then send back responses which indicate success creation of table in server thread.
-
Name Node receive success message from all server thread, Name Node will send back reply message to each background thread to confirm the request
-
background thread will create a local version of table, which will store temporary parameters from server.
So from this scenario, we know the interaction mode: application thread hold local copy of table, send request to background thread, then background thread will send request to server thread, background thread will handle reply from server thread as well.
Synchronization of Table Change
Another key issue for Parameter Server is synchronization, there are two kinds of synchronization needed:
-
Synchronization of actions across different works & servers.
-
Synchronization of parameter update between worker and server.
In PS (by Mu Li), all the synchronization is controlled by a message timestamp. Even though each parameter table can have different timestamp, but once you set them with controlled, then you can control the sync.
I think the design is similar in Bosen. In Bosen, each application thread has its own timestamp, controlled by a vector_block which accessible by all application thread.
Each time when a application thread make a change, it will change its corresponding timestamp. Once global minimum timestamp change, then workers in one machine will start the synchronization to worker.
But how do changes pushed to server? in what format?
The answer is: The updates are organized in the class of AbstractOpLog. Each table/row will has its own OpLog.
Difference with Other System
In general, Bosen is a very interesting parameter server system. But there are other similar system: Parameter Server by Mu Li (PS), MultiViso by Microsoft.
What’s the difference between different system?
Bosen designed a system which provide indirect access to local table directly. If application thread want to access the table, the application thread can only get a pointer to underlying table, request all the changes through a static class interface. All the relevant changes, sync message usage are controlled by background thread. Background thread will also responsible for communicating with server.
However, PS use a different design, which expose all the technical details to the table user. You will have a direct access to the table, you will need to know how the sync design, you need to know how to control it, the dependencies between different timestamp. This is a much stronger control, more flexible than previous one.
MultiViso is somewhere between those two. So you get the table access, but you don’t need to worry about sync too much. I think only synchronous algorithm is allowed?
Written with StackEdit.