Supporting control flow in the CIEL execution engine
How do you write a program that runs on hundreds or thousands of computers? Over the last decade, this has become a real concern for many companies that must be able to handle ever-growing data sets in order to stay in business. When those data sets grow to terabytes or petabytes in size, a single disk (or even a RAID array) can't deliver the data fast enough, so a solution is needed to exploit the throughput of hundreds or thousands of disks in parallel. In this post, I'll introduce various solutions to this problem, and explain how our CIEL execution engine supports a larger class of algorithms than existing systems.
The last decade has seen the rise of distributed execution engines: systems that automatically execute a program, in parallel, on a distributed computing cluster. The most influential system in this field has been Google's MapReduce, which runs simple programs defined by functions: map()
, which operates on the input records to produce intermediate data; and reduce()
, which operates on the intermediate data to produce a final result. The simplicity of MapReduce led to several clones, including a popular open-source version called Apache Hadoop MapReduce. Around the same time, Microsoft developed a more-general execution engine, called Dryad, which operates on programs that are written as data-flow graphs. In general, these distributed execution engines take a declarative specification of some computation, extract parallelism by examining the data flow between steps in the computation, and execute the resulting program on a cluster of hundreds or thousands of computers. Does this answer our initial question, then?
Unfortunately not. Many algorithms contain data-dependent control flow, and cannot be expressed using existing execution engines. Consider the following iterative algorithm:
do { next = do_lots_of_work(inputs); converged = aggregate(next); } while (!converged);
Here, the result of the do_lots_of_work()
function is used to decide whether or not the while loop should terminate. In general, this means that the amount work depends on the input data, and can only be determined by actually running the algorithm. MapReduce and Dryad require a complete list of tasks to be provided when a job is submitted, so they cannot natively handle this type of algorithm. Instead, the user must write a separate driver program, which submits multiple jobs, fetches their results, and makes the decision about when to terminate the computation. Since the driver program runs outside the cluster, it doesn't enjoy the benefits of running on an execution engine, in particular transparent fault tolerance. So if the driver program crashes, or loses network connectivity to the cluster, the entire computation is lost.
To solve this problem, we have developed CIEL, a distributed execution engine that executes dynamic task graphs. A dynamic task graph is like a Dryad data-flow graph, but it also allows tasks to rewrite the graph by spawning new tasks and delegating their outputs.
In the above example, you can see the dynamic task graph before and after running task A, which takes one input, u, and has one output, z. When task A is run, it spawns three tasks (B, C and D), and delegates production of its output (z) to D. After task A runs, tasks B and C can now run in parallel. Another thing to note is that any task can similarly decide to spawn new tasks, so for example B or C might decide to spawn more tasks in a divide-and-conquer fashion, and D could decide to spawn another iteration of the same graph. By giving tasks the ability to create more work dynamically based on their inputs, we can implement iterative and recursive algorithms in CIEL. A task can spawn any acyclic graph of tasks, so CIEL can also implement MapReduce- and Dryad-style computations.
However, these dynamic task graphs are more complicated to specify than the static graphs of Dryad, or the two functions of MapReduce. To solve that problem, we created a scripting language called Skywriting, which allows users to create dynamic task graphs using a familiar imperative programming style:
z = 0; do { x = spawn(b, [v, z]); y = spawn(c, [w, z]); z = *x + *y; } while (z < 100);
In Skywriting, the spawn()
built-in function takes the name of a Skywriting function and a list of arguments, and creates a task to execute that function asynchronously. The *
(dereference) operator takes the result of a call to spawn()
and blocks the current task until the result is available. Hence a Skywriting script can both create more work, and use the result of that work to decide what to do next, as in the example above. The Skywriting runtime transforms the whole script into a dynamic task graph, which allows it to run as a single CIEL job, and hence enjoy all the benefits of running on an execution engine (such as fault tolerance). The details of this transformation are described more fully in our recent paper about CIEL and Skywriting.
Hopefully this post has whetted your appetite for CIEL. If you would like more information, please visit our project website, or download the source code on github. We're currently developing new features for CIEL, including support for many-core machines and additional programming languages. Watch this space for more details soon!
Leave a comment