A fault tolerant BSP framework on Hadoop YARN
We looked for a distributed computing framework that could be utilized for running iterative scientific computing algorithms in the cloud, but most solutions proved lacking in some respect. This is why we decided to create our own with the following goals:
- Provide automatic fault recovery.
- Retain the program state after fault recovery.
- Provide a convenient programming interface.
- Support (iterative) scientific computing applications.
The following is an example of a linear system of equations solver, using the conjugate gradient method, implemented in NEWT:
- One has to register the program definition with the application master.
public class CGTestMaster {
public static void main(String[] args) {
//simply initialize the default configuration
Configuration conf = NEWTConfiguration.createDefault();
//the test data will be generated by the worker nodes
//so only use commandline arguments as input and
//specify the number of processes to request
Input input = new CommandLineInput(conf, args, Integer.parseInt(args[0]));
//create an application master, using the configuration,
//the state class definition and the input
FaultTolerantJobMaster am =
new FaultTolerantJobMaster(conf, CGState.class, input);
//register the application code with the master
am.addStage("init", new CG.Init());
am.addStage("beginLoop", new CG.BeginLoop());
am.addStage("checkCondition", new CG.CheckCondition());
am.addStage("doMatVec", new CG.DoMatVec());
//register our custom message type
am.registerMessageType(DoubleArrayWritable.class);
//finally, run the application master
am.run();
}
}
The CGState.class contains the definitions of all state variables used by the algorithm and instructions to serialize them to a bitstream. The objects that are given as arguments to am.addStage contain the code of the program, an example of these stages is:
//verbose Java 1.6 "closure" definition
//can be made much more concise with Java 1.7
//or a Scala interface
public static class CheckCondition extends Stage {
//the method takes the BSPComm communicatior,
//the state object and the superstep number as arguments
@Override
public String execute(BSPComm bsp, CGState state, int superstep) {
//we know that on the previous superstep each process
//sent a message of 2 doubles, containing the partial
//dot product and local maximum norm of the residual
//vector (which is used as an error estimate)
double[] recv = getFromAll(bsp, 2);
state.u += recv[0];
state.error = Math.max(state.error, recv[1]);
//depending on some state variables, determine what should be done next
if (state.error > TOLERANCE && state.it <= MAX_IT) {
if (state.it == 0) {
for (int i = 0; i < state.p.size(); i++) {
state.p.getData()[i] = state.z.getData()[i];
}
} else {
double v = state.u/state.ou;
CGMath.scalevector(state.p.getData(), v);
CGMath.addVector(state.p.getData(), state.z.getData());
}
//we need to synchronize the vector p for matrix vector multiplication
sendToNeighbors(bsp, state, state.p.getData());
//do matrix vector multiplication on the next superstep
return "doMatVec";
} else {
//we're done, write result to disk and stop
return Stage.STAGE_END;
}
}
}
Here the getFromAll and sendToNeighbors are implemented using the communicator’s send function to transmit parts of some of the state from the CGState object. Since these are common communication patterns, these might eventually be implemented on the framework level. Some mathematical operations that need to be performed on the state variables are implemented in a separate class CGMath.
- To run the program, it has to be submitted to the YARN cluster:
public class TestJob {
public static void main(String[] args) {
NEWTClient client = new NEWTClient();
client.submit(CGTestMaster.class, args, true);
client.close();
}
}
For some general information on NEWT, refer to this poster.
While the framework is not in the shape where it can be used for any serious projects yet, it’s open-source and available at: https://bitbucket.org/mobilecloudlab/newt