6. Distributed Task Manager Overview

DTM is a distributed task manager which works over many communication layers. Currently, all modern MPI implementations are supported through mpi4py, and an experimental TCP backend is available.

Warning

As on version 0.2, DTM is still in alpha stage, meaning that some specific bugs may appear; the communication and operation layers are quite stable, though. Feel free to report bugs and performance issues if you isolate a problematic situation.

6.1. DTM Main Features

DTM has some very interesting features :

  • Offers a similar interface to the Python’s multiprocessing module
  • Automatically balances the load between workers (and therefore supports heterogeneous networks and different task duration)
  • Supports an arbitrary number of workers without changing a byte in the program
  • Abstracts the user from the communication management (the same program can be run over MPI, TCP or multiprocessing just by changing the communication manager)
  • Provides easy-to-use parallelization paradigms
  • Offers a trace mode, which can be used to tune the performance of the running program (still experimental)

6.2. Introductory example

First, lets take a look to a very simple distribution example. The sequential program we want to parallelize reads as follow :

def op(x):
    return x + 1./x     # Or any operation

if __name__ == "__main__":
    nbrs = range(1, 1000)
    results = map(op, nbrs)

This program simply applies an arbitrary operation to each item of a list. Although it is a very trivial program (and therefore would not benefit from a parallelization), lets assume we want to distribute the workload over a cluster. We just import the task manager, and use the DTM parallel version of map() :

from deap import dtm

def op(x):
    return x + 1./x     # Or any operation

def main():
    nbrs = range(1, 1000)
    results = dtm.map(op, nbrs)

dtm.start(main)

And we are done! This program can now run over MPI, with an arbitrary number of processors, without changing anything. We just run it like a normal MPI program (here with OpenMPI)

$ mpirun -n * python myProgram.py

The operation done in the op() function can be virtually any operation, including other DTM calls (which in turn may also spawn sub-tasks, and so on).

Note

The encapsulation of the main execution code into a function is required by DTM, in order to be able to control which worker will start the execution.

6.3. Functions documentation

class deap.dtm.manager.Control

Control is the main DTM class. The dtm object you receive when you use from deap import dtm is a proxy over an instance of this class.

Most of its methods are used by your program, in the execution tasks; however, two of thems (start() and setOptions()) MUST be called in the MainThread (i.e. the thread started by the Python interpreter).

As this class is instancied directly in the module, initializer takes no arguments.

apply(function, *args, **kwargs)

Equivalent of the apply() built-in function. It blocks till the result is ready. Given this blocks, apply_async() is better suited for performing work in parallel. Additionally, the passed in function is only executed in one of the workers of the pool.

apply_async(function, *args, **kwargs)

A non-blocking variant of the apply() method which returns a AsyncResult object.

filter(function, iterable)

Same behavior as the built-in filter(). The filtering is done localy, but the computation is distributed.

getWorkerId()

Return a unique ID for the current worker. Depending of the communication manager type, it can be virtually any Python immutable type.

Note

With MPI, the value returned is the MPI slot number.

imap(function, iterable, chunksize=1)

An equivalent of itertools.imap().

The chunksize argument can be used to tell DTM how many elements should be computed at the same time. For very long iterables using a large value for chunksize can make make the job complete much faster than using the default value of 1.

imap_unordered(function, iterable, chunksize=1)

Not implemented yet.

map(function, *iterables, **kwargs)

A parallel equivalent of the map() built-in function. It blocks till the result is ready. This method chops the iterables into a number of chunks determined by DTM in order to get the most efficient use of the workers. It takes any number of iterables (though it will shrink all of them to the len of the smallest one), and any others kwargs that will be transmitted as is to the function target.

map_async(function, iterable, callback=None)

A non-blocking variant of the map() method which returns a AsyncResult object.

Note

As on version 0.2, callback is not implemented.

repeat(function, n, *args, **kwargs)

Repeat the function function n times, with given args and keyworded args. Return a list containing the results.

setOptions(*args, **kwargs)

Set a DTM global option.

Warning

This function must be called BEFORE start(). It is also the user responsability to ensure that the same option is set on every worker.

Currently, the supported options are :
  • communicationManager : can be deap.dtm.mpi4py (default) or deap.dtm.commManagerTCP.
  • loadBalancer : currently only the default PDB is available.
  • printSummary : if set, DTM will print a task execution summary at the end (mean execution time of each tasks, how many tasks did each worker do, ...)
  • setTraceMode : if set, will enable a special DTM tracemode. In this mode, DTM logs all its activity in XML files (one by worker). Mainly for DTM debug purpose, but can also be used for profiling.

This function can be called more than once. Any unknown parameter will have no effect.

start(initialTarget, *args, **kwargs)

Start the execution with the target initialTarget. Calling this function create and launch the first task on the root worker (defined by the communication manager, for instance, with MPI, the root worker is the worker with rank 0.).

Warning

This function must be called only ONCE, and after the target has been parsed by the Python interpreter.

testAllAsync()

Check whether all pending asynchronous tasks are done. It does not lock if it is not the case, but returns false.

waitForAll()

Wait for all pending asynchronous results. When this function returns, DTM guarantees that all ready() call on asynchronous tasks will return true.

class deap.dtm.manager.AsyncResult(control, waitingInfo, taskKey)

The class of the result returned by map_async() and apply_async().

get()

Return the result when it arrives.

Note

This is a blocking call : caller will wait in this function until the result is ready. To check for the avaibility of the result, use ready().

ready()

Return whether the asynchronous task has completed.

successful()

Return whether the task completed without error. Will raise AssertionError if the result is not ready.

wait()

Wait until the result is available

6.4. DTM launching

DTM can support many communication environment. The only real restriction on the choice of a communication backend is that is must provide an access from each worker to every other (that is, the worker 0 must be able to directly communicate with all other workers, and so for the worker 1, 2, etc.). Currently, two main backends are available : one using MPI through the mpi4py layer, and another using TCP (with SSH for the launch process).

If you already have a functionnal MPI installation, you should choose the mpi4py backend, as it can provide better performances in some cases and easier configuration. On the other side, MPI implementations may cause some strange errors when you use low-level system operations (such as fork). In any case, both backends should be ready for production use, and as the backend switch is as easy as changing only one line in your script, this choice should not be an issue.

6.4.1. Launch DTM with the mpi4py backend

When this backend is used, DTM delegates the start-up process to MPI. In this way, any MPI option can be used. For instance :

mpirun -n 32 -hostfile myHosts -npernode 4 -bind-to-core python yourScript.py --yourScriptParams ...

Will launch your script on 32 workers, distributed over the hosts listed in ‘myHosts’, without exceeding 4 workers by host, and will bind DTM workers to a CPU core. The bind-to-core option can provide a good performance improvement in some environments, and does not affect the behavior of DTM at all.

6.4.2. Launch DTM with the pure-TCP backend

The TCP backend includes a launcher which works with SSH in order to start remote DTM workers. Therefore, your execution environment must provide a SSH access to every host, and a shared file system such as NFS (or, at least, ensure that the script you want to execute and the DTM libraries are located in a common path for every worker). You also have to provide a host file which follows the same synthax as the MPI host files, for instance :

firstComputer.myNetwork.com slots=4
otherComputer.myNetwork.com slots=6
221.32.118.3 slots=4

Warning

The hostnames / addresses you write in this file must be accessible and translable on every machine used. For instance, putting ‘localhost’ in this file among other remote hosts will fail because each host will try to bind ports for ‘localhost’ (which will fail, as this is not a network-accessible address).

Then you can launch DTM with this file :

python myScript.py --dtmTCPhosts=myHostFile

Note

Do not forget to explicitly set the communication manager to deap.dtm.commManagerTCP :

dtm.setOptions(communicationManager="deap.dtm.commManagerTCP")

This backend can also be useful if you want to launch local jobs. If your hostfile contains only ‘localhost’ or ‘127.0.0.1’, DTM will start all the workers locally, without using SSH.

6.5. Troubleshooting and Pitfalls

Here are the most common errors or problems reported with DTM. Some of them are caused by a bad use of the task manager, others are limitations from the libraries and programs used by DTM.

6.5.1. Isolation per worker

In DTM, the atomic independent working units are called workers. They are separate processes, and do not share any information other than those from the communications (explicitly called). Therefore, two variables cannot interfere if they are used in different workers, and your program should not rely on this. Thus, one has to be extremely careful about which data is global, and which is local to a task. For instance, consider the following program :

from deap import dtm

foo = [0]

def bar(n):
    foo[0] += n
    return foo[0]

def main():
    listNbr = range(30)
    results = dtm.map(bar, listNbr)
    print("Results : " + str(results))

dtm.start(main)

Although it is syntactically correct, it may not produce the result you are waiting for. On a serial evaluation (using the built-in map() function), it simply produces a list containing the sums of numbers from 0 to 30 (it is a quite odd approach, but it works) :

Results : [0, 1, 3, 6, 10, 15, 21, 28, 36, 45, 55, 66, 78, 91, 105, 120, 136, 153, 171, 190, 210, 231, 253, 276, 300, 325, 351, 378, 406, 435]

But with DTM, as foo is not shared between workers, the program generate a completely unpredictable output, for instance :

Results : [0, 1, 2, 5, 5, 10, 11, 17, 25, 20, 30, 36, 48, 43, 57, 72, 88, 65, 83, 107, 127, 104, 126, 150, 150, 175, 176, 203, 203, 232]

The reverse problem should also be taken into account. If an object keeps its state, it is generally not a good idea to make it global (accessible from all tasks). For instance, if you create a log like this :

from deap import dtm

logfile = open('myLogFile', 'w+')

def task1(args):
    # [...]
    logfile.write("Log task 1")
    # [...]
def task2(args):
    # [...]
    logfile.write("Log task 2")
    # [...]

def main():
    listNbr = range(100)
    statusTask1 = dtm.map_async(task1, listNBr)
    statusTask2 = dtm.map_async(task2, listNBr)
    # [...]

dtm.start()

You may experience some unusual outputs, as task1 and task2 writings will probably overlap (because they use the same resource if, by chance, they are executed on the same worker, which will probably happens). In doubt, use local variables.

6.5.2. Exceptions

When an Python exception occurs during a task execution, DTM catchs it (and try to run another task on this worker). This exception is then raised in the parent task. If there is no such task (the task where the exception occurs is the root task), then it is thrown and DTM stops its execution.

The moment when the exception will be raised in the parent tasks depends on the child task type : if it is a synchronous call (like apply() or map()), it is raised when the parent awake (i.e. as if it has been raised by the DTM function itself). If it is an asynchronous call (like apply_async() or map_async()), the exception is raised when the parent task performs a get() on the AsyncResult object. Also, the successful() will return False if an exception occured, without raising it.

Note

When DTM catches an exception, it outputs a warning on the standard error output stating the exception type and arguments. This warning does not mean that the exception has been raised in the parent task (actually, in some situations, it may take a lot of time if every workers are busy); it is logged only for information purpose.

6.5.3. MPI and threads

Recent MPI implementations supports four levels of threading : single, funneled, serialized and multiple. However, many environments (like Infiniband backend) do not support other level than single. In that case, if you use the mpi4py backend, make sure that mpi4py does not initialize MPI environment in another mode than single (as DTM has been designed so that only the communication thread makes MPI calls, this mode works well even if there is more than one active thread in a DTM worker). This setting can be changed in the file “site-packages/mpi4py/rc.py”, with the variable thread_level.

6.5.4. Cooperative multitasking

DTM works on a cooperative philosophy. There is no preemption system to interrupt an executing thread (and eventually starts one with a higher priority). When a task begins its execution, the worker will execute it until the task returns or makes a DTM synchronous call, like map or apply. If the task enters an infinite loop or reaches a dead-lock state, then the worker will also be in dead-lock – it will be able to transfer its other tasks to other workers though. DTM is not a fair scheduler, and thus cannot guarantee any execution delay or avoid any case of starvation; it just tries to reach the best execution time knowing some information about the tasks.

6.5.5. Pickling

When dealing with non trivial programs, you may experience error messages like this :

PicklingError: Can't pickle <class '__main__.****'>: attribute lookup __main__.**** failed

This is because DTM makes use of the Python pickle module in order to serialize data and function calls (so they can be transferred from one worker to another). Although the pickle module is very powerful (it handles recursive and shared objects, multiple references, etc.), it has some limitations. Most of the time, a pickling error can be easily solved by adding __setstate__() and __getstate__() methods to the problematic class (see the Python documentation for more details about the pickling protocol).

This may also be used to accelerate pickling : by defining your own pickling methods, you can speedup the pickling operation (the same way you can speedup the deepcopy operation by defining your own __deepcopy__() method. If your program use thousands of objects from the same class, it may be worthwhile.

Take also note of the following Python interpreter limitations :

  • As on version 2.6, partial functions cannot be pickled. Python 2.7 works fine.

  • Lambda functions cannot be pickled in every Python version (up to 3.2). User should use normal functions, or tools from functools, or ensure that its parallelization never need to explicitly transfer a lambda function (not its result, but the lambda object itself) from a worker to another.

  • Functions are usually never pickled : they are just referenced, and should be importable in the unpickling environment, even if they are standard functions (defined with the keyword def). For instance, consider this (faulty) code :

    from deap import dtm
    def main():
        def bar(n):
            return n**2
    
        listNbr = range(30)
        results = dtm.map(bar, listNbr)
    
    dtm.start(main)
    

On the execution, this will produce an error like :

TypeError: can't pickle function objects

Because the pickler will not be able to find a global reference to the function bar(). The same restriction applies on classes and modules.

6.5.6. Asynchronous tasks and active waiting

DTM supports both synchronous and asynchronous tasks (that do not stop the parent task). For the asynchronous tasks, DTM returns an object with an API similar to the Python multiprocessing.pool.AsyncResult. This object offers some convenient functions to wait on a result, or test if the task is done. However, some issues may appear in DTM with a program like that :

from deap import dtm
def myTask(param):
    # [...] Long treatment
    return param+2

def main():
    listTasks = range(100)
    asyncReturn = dtm.map_async(myTask, listTasks)

    while not asyncReturn.ready():
        continue

    # Other instructions...

dtm.start(main)

This active waiting (by looping while the result is not available), although syntactically valid, might produce unexpected “half-deadlocks”. Keep in mind that DTM is not a preemptive system : if you load a worker only for waiting on another, the load balancer may not work properly. For instance, it may consider that as the parent task is still working, the asynchronous child tasks can still wait, or that one of the child tasks should remain on the same worker than its parent to balance the load between workers. As the load balancer is not completely deterministic, the child tasks should eventually complete, but in an unpredictable time.

It is way better to do something like this :

def main():
    listTasks = range(100)
    asyncReturn = dtm.map_async(myTask, listTasks)

    asyncReturn.wait()  # or dtm.waitForAll()
    # [...]

By calling one of these functions, you effectively notify DTM that you are now waiting for those asynchronous tasks, and willing to let the worker do another job. The call will then return to your parent function when all the results will be available.