Parallelize - a simple yet powerful high-level interface to multiprocessing

When I was developing Cloudtask, I discovered none of the interfaces in the Python multiprocessing module were powerful enough for my needs so I had to roll my own. The result is the generically useful multiprocessing_utils module in turnkey-pylib which from my totally subjective perspective provides a far superior interface to parallelization than the built-in multiprocessing interfaces.

Like any good tool the new primitive is extremely robust and has survived during my testing insults more severe than anything you should run into in practice (e.g., running millions of inputs through hundreds of workers while randomly killing off workers).

Though the implementation was quite tricky to get working right, all of the underlying complexity is abstracted from you via a trivially simple interface:

  1. You pass a sequence of callables to Parallelize. In the most simple usage case these will be functions, but they can also be callable instances.

  2. You get back a Parallelize instance which is another callable, which you call with input arguments to queue a parallelized call in one of the underlying processes.

  3. The .wait() method waits for all queued execution to finish.

  4. The .stop() method stops parallel execution. This can be after wait has completed, or in the middle (e.g., handling an exception such as Ctrl-C) in which case you get back a list of aborted inputs.

  5. The .results is an array that holds the return values. A background thread updates it in real-time as soon as the underlying functions in various processes finish executing and return values.

    After .wait() its length will be the same size as the number of calls to the Parallelize instance.

Parallelized function example:

import os
from multiprocessing_utils import Parallelize

def sleeper(seconds):
    print "pid %d: sleeping %d seconds"  % (os.getpid(), seconds)
    time.sleep(seconds)
    return seconds

sleeper = Parallelize([ sleeper ] * 50)
print "Allocated workers"

try:
    for i in range(250):
        sleeper(1)

    print "Queued parallelized invocations. Ctrl-C to abort!"
    sleeper.wait()

    print "Finished sleeping..."

finally:
    aborted = sleeper.stop()
    if aborted:
        print "len(aborted) = %d" % len(aborted)
        print "len(aborted) + len(results) = %d" % (len(aborted) + len(sleeper.results))

    print "len(pool.results) = %d" % len(sleeper.results)

Callable class example:

import os
import time
import random

from multiprocessing_utils import Parallelize

class ExampleExecutor:
    def __init__(self, name):

        self.name = name
        print "__init__(%s)" % self.name

    def __call__(self, *args):
        print "pid %d: self.name=%s __call__(%s)" % (os.getpid(), self.name, `args`)
        return args

p = Parallelize([ ExampleExecutor(str(i)) for i in range(50) ])

try:
    for i in range(250):
        p(i)

    p.wait()
    print "p.results: " + `p.results`

finally:
    p.stop()
    print "after stop"

Note that in the example above ExampleExecutor is initialized serially in a single process before the array of initialized callable executors is passed to Parallelize.

In some cases you may need to parallelize the initialization of the executor as well (e.g., because the initialization is IO bound and takes significant time). Parallelize supports this with a mechanism that defers initialization of the executor class until it is "inside" Parallelize.

Example of callable class with deferred initialization:

from multiprocessing_utils import Parallelize
import os
import time
import random

class ExampleExecutor:
    def __init__(self, name):

        self.name = name
        self.pid = os.getpid()

        ## if we want to test what happens to failed initializations
        #if random.randint(0, 1):
        #    raise Exception

        print "%s.__init__: pid %d" % (self.name, self.pid)

    def __call__(self, *args):
        print "%s.__call__(%s)" % (self.name, `args`)
        return args

    def __del__(self):
        import os
        print "%s.__del__: self.pid=%d, os.getpid=%d" % (self.name, self.pid, os.getpid())

deferred = []
for i in range(50):
    deferred_executor = Deferred(ExampleExecutor, i)
    deferred.append(deferred_executor)

p = Parallelize(deferred)

try:

    # if necessary .executors will wait for initialization of
    # deferred executor instances to finish

    print "len(p.executors) = %d" % len(p.executors)

    for executor in p.executors:
        print executor.pid

    for i in range(250):
        p(i)

    p.wait()
    print "p.results: " + `p.results`

finally:
    p.stop()
    print "after stop"

Add new comment