-
Notifications
You must be signed in to change notification settings - Fork 23
Orchestrating a Unit of Work
The following class hierarchy shows the class inheritance and composition used in Exelixi:
The classes shown in white are the general purpose part of the hierarchy, reused for all workflows. Classes shown in dark gray are specific to GAs (dare we say, the "lowerarchy"). In other words, most of this framework is for general purpose distributed workflows in Python atop Mesos.
We've already covered a step-by-step tutorial about how to build a framework in Python atop Apache Mesos.
That concluded with mention of a UnitOfWork orchestrating some services to perform a distributed workflow.
Bueno.
Let's drill-down on that notion of workflow...
In terms of Exelixi, a workflow involves some sequence of distributed tasks performed on a set of distributed data.
Ostensibly the workflow being performed is too big to fit on a single computer, so it must be partitioned and sequenced --
in other words, divided into chunks and then handled piecemeal.
Hopefully those distributed tasks can be performed efficiently in parallel on a cluster of many computers, otherwise this will become a huge mess and a big waste of effort.
Think positive thoughts...
We say that the UnitOfWork performs orchestration as its process of synchronizing the distributed tasks and distributed data across the cluster.
This process leverages a set of workers --
which Mesos has previously allocated and launched on behalf of the framework.
We call each worker's partition of distributed data a shard.
Stated quite simply, the workers perform all those distributed tasks and manage all the shards.
Upton Sinclair would be proud.
Next, let's take a deeper look at how the workers function. Each worker has four primary components:
- set of REST endpoints
- a hash ring identifying the other worker services
- a shard of local data
- a task queue of parallel tasks
When a worker first launches it starts a WSGI service on a designated port.
By default, we use 9311 for the port number, because it has super-secret occult meanings attached.
Or something.
At that point, the service is up and running.
However, the worker has no UnitOfWork defined, no hash ring initialized, no shard of local data, and its task queue is completely empty.
It's just a formless blob of goo, so to speak.
To get the party started, as soon as the Mesos executors have launched all the workers,
the Exelixi framework calls its instance of UnitOfWork with a list of those workers.
The UnitOfWork then claims and configures each worker, and proceeds through its business of orchestration.
Meanwhile, a worker service implements a set of REST endpoints, effectively providing remote procedure calls that can be accessed over HTTP.
Some endpoints are general-purpose, while others are specific to the UnitOfWork and get delegated through it.
Also note that each of the endpoints is implemented as a Greenlet based on gevent cooroutines.
In theory, that allows the endpoints to run in parallel.
Eventually, Exelixi will be extended so that workers checkpoint their state to HDFS on a regular basis ...
That way, if and when a worker dies, Mesos will notify resource.MesosScheduler, which will schedule resources for a replacement worker and pick up from the last checkpoint.
That's definitely on our bucket list.
Three of the endpoints provide a lifecycle definition for the worker:
- REST
/shard/config-- configure the worker with its initial parameters - REST
/ring/init-- initialize the hash ring with a list of other workers - REST
/shard/stop-- shutdown the worker
Those endpoints get called from the UnitOfWork, which we'll cover a bit later in this section.
Looking at the /shard/config endpoint specifically, the initial parameters required for configuring a worker service are:
-
prefix-- a path within the durable storage, based on a UUID, shared among the workers and unique for thisUnitOfWork -
shard_id-- a unique identifier for this shard within the hash ring - reference by name for a Python class based on
UnitOfWorkFactory
Great. That gives the worker a special place to preserve state, a unique identity among the other workers, and a Python class to load that will define the workflow to be performed.
Note that once a worker service has been configured, it cannot be accessed without the proper credentials (prefix plus shard_id), nor can it be reconfigured.
In other words, the worker service gets dedicated to a particular UnitOfWork definition, which is referenced by name via the UnitOfWorkFactory.
Running some other workflow would require a whole new hash ring of workers to be launched and configured.
An old man walks into a store and asks, "Say, do you have any of those whaddaya-call-its, those square tuits?" The confused retail clerk replies, "Huh, what's a tuit?" The man answers, "Well, my son said he'd come visit me when he got a round tuit, but there ain't none of those apparently. So I'm looking to find a square tuit and see if that's good enough."
On that note, a few of the REST endpoints have not been implemented yet for Exelixi:
- REST
/ring/add-- add another worker to the hash ring (not impl) - REST
/ring/del-- delete a worker from the hash ring (not impl) - REST
/check/persist-- persist a checkpoint to disk, then to HDFS (not impl) - REST
/check/recover-- recover from last good checkpoint in HDFS (not impl)
We're still looking to get a round tuit. Or something.
The previous sections have been dropping a few hints. Recall that "each worker also provides a task queue, plus a global barrier to synchronize the distributed processing..." Also recall that "it helps to have some lightweight messaging available among the shards..."
A barrier pattern is a synchronization method used in parallel computing. Some set of threads or processes run in parallel before the barrier, but the program overall after the barrier relies on their results. For example, this design pattern gets used in MPI, and for that matter it is implied in MapReduce as well -- the reduce tasks must wait for all the map tasks to complete, though in practice Apache Hadoop is considerably more convoluted. We'll stick with the convenient fictions about it, for now.
Exelixi implements a barrier pattern through the workers' task queues, using an Event, a JoinableQueue, and two REST endpoints:
- REST
/queue/wait - REST
/queue/join
The former endpoint causes the UnitOfWork to wait until all shards have finished sending task queue requests.
The latter endpoint causes the UnitOfWork to wait via a join on the task queue, in other words to wait until the queue empties.
So the UnitOfWork calls a set of REST endpoints --
which it implements internally -- to orchestrate the distributed processing across the cluster based on a barrier pattern.
Think of this implemented as three steps:
- Initiate tasks on the workers, which causes them to communicate and initiate tasks within each other
- Cycle through the list of workers calling
/queue/wait, to ensure that the workers have finished initiating tasks within each other, and are now all draining their task queues internally - Cycle through the list of workers again to call
/queue/join, at which point all of the tasks are guaranteed to have completed.
This is kinda sorta like a two-phase commit.
But not really.
It's actually much more like a generalization of MapReduce.
One of the odd aspects of teaching MapReduce to larger numbers of people is to watch their reactions to the canonical WordCount example...
Most people think of counting "words" in SQL terms --
using GROUP BY and COUNT() --
so the MapReduce example at first looks totally bassackwards.
However, the SQL approach doesn't scale very well, while the MapReduce approach does.
What the MapReduce approach fails to clarify is that it's a gross oversimplification of something truly elegant and powerful: a barrier pattern applied to using monoids.
Once a student understands how all the WordCount.emit(token, 1) quackery of a MapReduce example is really just a good use of monoids, and that the "map" and "reduce" phases are examples of barriers, then this whole business of parallel processing on a cluster begins to click.
Back to Exelixi...
The tasks themselves are defined within the UnitOfWork, and the workers are entirely agnostic toward that.
Overall, this strategy is much more flexible than MapReduce, and it allows parallel processing with some lightweight messaging among the shards.
All of the above essentially implements placeholders for the real work to be performed... In the specific case of GA workflows, the big idea is that we take this framework and turn the hash ring of workers into one big, distributed content addressable memory.
Yes, that is correct.
In a GA, an Individual represents a candidate solution defined primarily by a feature set, sometimes called "chromosomes".
In Exelixi, that feature set is an arbitrary Python data structure which can be serialized and deserialized.
We also keep track of the generation in which the Individual evolved, and its fitness value, but the feature set is the unique part.
So we also generate a unique key for each Individual, constructed from a SHA-3 digest of its feature set data.
Individual instances then can be serialized and persisted in durable storage as key/value pairs, where the value consists of a tuple: [fitness value, generation, feature set]
More importantly, the key for an Individual gets used along with the hash ring --
based on hashring.HashRing --
to determine the worker on which that Individual belongs.
We use the term reify to describe the process of sending an Individual to another worker, somewhere across the cluster.
This process provides a way to partition the data into shards, balancing the distributed processing across the hash ring of workers.
It also implies that we can lookup any possible Individual --
whether it has been evaluated already or not --
simply by a lookup on the hash ring and a REST endpoint call to the appropriate worker.
In the default GA implementation in Exelixi, the UnitOfWork is subclassed by service.Population, as a distributed population of service.Individual instances.
Each shard holds one partition, one subset of the overall population.
Note that the most expensive part of GAs, potentially, is the evaluation of Individuals by calculating a fitness function.
The strategy here leverages the worker task queues to perform that processing in parallel.
When a Population shard receives a reify request from a Population shard on another worker, it puts the request into its worker task queue.
Meanwhile, the Population.perform_task() method is consuming from the task queue, so these requests run in parallel as gevent coroutines.
Another cute trick is to use a trie at the point of receiving a reify request.
The Population performs a lookup of the requested key using datrie.Trie, which determines whether the shard has ever seen the Individual before.
If so, ignore the request.
If not, place the request into the task queue.
When the task actually gets performed, add its key to the local trie.
On the one hand, we'll avoid duplicating effort --
which is an important thing when working in parallel at scale.
On the other hand, here's another data structure to checkpoint.
It's also on our bucket list, waiting for a round tuit.
Great. So to review, the steps are:
-
Individualgets bred or mutated on local worker, but not yet evaluated - The local worker performs a lookup using the hash ring to determine whether to keep the
Individualor send it over to another worker - If the latter case, then a reify request on the remote worker checks a trie to decide whether to ignore or enqueue the task
- If the latter case, then remote worker enqueues the reify task on its task queue
- Meanwhile the
perform_task()method consumes from the task queue, evaluating the fitness function for each reify request and updating the local trie
In other words, we've turned the Exelixi framework into one big, distributed content addressable memory, running parallel processing to reify and evaluate Individual instances on shards scattered across an Apache Mesos cluster.
Boom!
The default GA implemented in Exelixi uses a strategy called elitism to select the Individual instances to preserve and breed for the next generation.
The mechanism for elitism must be distributed across the shards.
So here's another cute trick...
The UnitOfWork for the default GA implements a distributed histogram.
In other words, each shard receives a REST endpoint call to calculate a partial histogram for its Individual instances which are "alive".
Essentially, in this process each Population shard takes the fitness value for each Individual, which will be a floating point number in the range of 0.0 .. 1.0, and rounds it to N digits.
See the uow.UnitOfWorkFactory.hist_granularity parameter for how many digits.
Then the Population shard performs binning, by aggregating these rounded fitness values into a local histogram.
The Population instance on the framework which is orchestrating then collects each partial histogram and aggregates into a histogram for the entire population.
It's a poor-man's method of quantile analysis, frankly, but it scales well.
A simple scan of the bins in the aggregated histogram allows the orchestrating Population instance to determine some fitness value as a cutoff metric...
perhaps the top 20% percent --
depending on the uow.UnitOfWorkFactory.selection_rate parameter,
and broadcast back out to the shards via a REST endpoint.
Individual instances with a fitness value above the cutoff metric get selected,
that is to say they live on as "parents" for the next generation.
The elites.
Others will either be mutated,
depending on the uow.UnitOfWorkFactory.mutation_rate parameter,
or they will die.
Bummer.
Life is like that.
The point is to allow this calculation to be parallelized without its complexity growing in an unbounded way.
In summary, pseudocode for the UnitOfWork orchestration process looks like:
# claim and configure the workers
send HashRing REST("shard/config", { prefix, shard_id, UnitOfWorkFactory })
send HashRing REST("ring/init", { HashRing })
# initialize the distributed GA population
send HashRing REST("pop/init")
send HashRing REST("pop/gen")
while True:
# wait on barrier
send HashRing REST("queue/wait")
send HashRing REST("queue/join")
if current_gen == UnitOfWorkFactory.n_gen:
break
for partial_histogram in send HashRing REST("pop/hist"):
aggregate histogram of fitness values
if UnitOfWorkFactory.test_termination():
break
determine fitness_cutoff from aggregate histogram
send HashRing REST("pop/next", { current_gen, fitness_cutoff })
current_gen += 1
report the best Individuals as the final results
send HashRing REST("shard/stop")
Quite a collection of cute tricks and cheap hacks, to be certain.
