ppft package documentation

ppft: distributed and parallel Python

About Ppft

ppft is a friendly fork of Parallel Python (pp). ppft extends Parallel Python to provide packaging and distribution with pip and setuptools, support for Python 3, and enhanced serialization using dill.source. ppft uses Parallel Python to provide mechanisms for the parallel execution of Python code on SMP (systems with multiple processors or cores) and clusters (computers connected via network).

Software written in Python finds applications in a broad range of the categories including business logic, data analysis, and scientific calculations. This together with wide availability of SMP computers (multi-processor or multi-core) and clusters (computers connected via network) on the market create the demand in parallel execution of Python code.

The most common way to write parallel applications for SMP computers is to use threads. However, the Python interpreter uses the GIL (Global Interpreter Lock) for internal bookkeeping, where the GIL only allows one Python byte-code instruction to execute at a time, even on an SMP computer. Parallel Python overcomes this limitation, and provides a simple way to write parallel Python applications. Internally, processes and IPC (Inter Process Communications) are used to organize parallel computations. Parallel Python is written so that the details and complexity of IPC are handled internally, and the calling application just submits jobs and retrieves the results. Software written with Parallel Python works in parallel on many computers connected via a local network or the Internet. Cross-platform portability and dynamic load-balancing allows Parallel Python to parallelize computations efficiently even on heterogeneous and multi-platform clusters. Visit http://www.parallelpython.com for further information on Parallel Python.

ppft is part of pathos, a Python framework for heterogeneous computing. ppft is in active development, so any user feedback, bug reports, comments, or suggestions are highly appreciated. A list of issues is located at https://github.com/uqfoundation/ppft/issues, with a legacy list maintained at https://uqfoundation.github.io/project/pathos/query.

Major Features

ppft provides:

  • parallel execution of Python code on SMP and clusters

  • easy-to-understand job-based parallelization

  • automatic detection of the number of effective processors

  • dynamic processor allocation (at runtime)

  • low overhead for jobs with the same function (through transparent caching)

  • dynamic load balancing (jobs are distributed at runtime)

  • fault-tolerance (if a node fails, tasks are rescheduled on the others)

  • auto-discovery of computational resources

  • dynamic allocation of computational resources

  • SHA based authentication for network connections

  • enhanced serialization, using dill.source

Current Release

The latest released version of ppft is available from:

ppft is distributed under a 3-clause BSD license, and is a fork of pp-1.6.6.

Development Version

You can get the latest development version with all the shiny new features at:

If you have a new contribution, please submit a pull request.

Installation

ppft can be installed with pip:

$ pip install ppft

To include enhanced serialization, using dill.source, install:

$ pip install ppft[dill]

If Parallel Python is already installed, it should be uninstalled before ppft is installed – otherwise, import pp may point to the original and not to the ppft fork.

Requirements

ppft requires:

  • python (or pypy), >=3.8

  • setuptools, >=42

Optional requirements:

  • dill, >=0.3.8

Basic Usage

ppft is a fork of the Parallel Python package (pp) that has been converted from Python 2 to Python 3, made PEP 517 compliant, and augmented with dill.source. For simple parallel execution, first create a job Server where the number nodes available is autodetected:

>>> import ppft as pp
>>> job_server = pp.Server()

The number of nodes can be specified by passing an int as the first argument when creating the server (i.e. Server(4) creates a server with four nodes). The server uses submit to execute jobs in parallel. submit takes a function, a tuple of the arguments to pass to the function, a tuple of any functions used but not imported in the function, and a tuple of any modules required to produce the function:

>>> import math
>>> f1 = job_server.submit(math.sin, (math.pi/2,), (), ('math',))
>>> f2 = job_server.submit(min, (3.2, 10.0, 7.5), (), ())
>>> f3 = job_server.submit(sum, ([1,2,3],), (), ())

The functions are serialized by dill.source (as opposed to dill), by extracting and passing the source code to the server. The server compiles and executes the source code, and then calls the function with the arguments passed in the tuple. Any function and module dependencies are imported before exec is called on the source code. Results are retrieved by calling the object returned from submit:

>>> f1()
1.0
>>> f2()
3.2
>>> f3()
6

Job server execution statistics can be printed with:

>>> job_server.print_stats()
Job execution statistics:
 job count | % of all jobs | job time sum | time per job | job server
         3 |        100.00 |       0.0051 |     0.001684 | local
Time elapsed since server creation 148.48280715942383
0 active tasks, 4 cores

ppft also can execute jobs on remote computational nodes, if a ppserver is first started on the node. Here the ppserver is started on 127.0.0.1, and will listen on port 35000:

$ ppserver -a -p 35000

Then, locally, instantiate a Server with the connection information for the remote node, submit some jobs, and retrieve the results:

>>> job_server = pp.Server(ppservers=('127.0.0.1:35000',))
>>> f1 = job_server.submit(math.sin, (math.pi/2,), (), ('math',))
>>> f2 = job_server.submit(math.sin, (0,), (), ('math',))
>>> f3 = job_server.submit(math.sin, (-math.pi/2,), (), ('math',))
>>> f1(),f2(),f3()
(1.0, 0.0, -1.0)
>>>

However, the stats show that all of the jobs were run locally:

>>> job_server.print_stats()
Job execution statistics:
 job count | % of all jobs | job time sum | time per job | job server
         3 |        100.00 |       0.0024 |     0.000812 | local
Time elapsed since server creation 31.755322217941284
0 active tasks, 4 cores

This is due because we don’t specify the number of nodes. The number of nodes are specified both in the ppserver and in the local job Server. Thus, the above is actually “autobalance” between 4 local nodes and 4 remote nodes. The former is naturally going to be preferred; however, if the local server is flooded with jobs, some will get sent to the remote ppserver, and that will be reflected in the stats. To run all jobs remotely, set the number of local nodes to zero:

>>> job_server = pp.Server(0, ppservers=('127.0.0.1:35000',))
>>> f1 = job_server.submit(math.sin, (math.pi/2,), (), ('math',))
>>> f2 = job_server.submit(math.sin, (0,), (), ('math',))
>>> f3 = job_server.submit(math.sin, (-math.pi/2,), (), ('math',))
>>> f1(),f2(),f3()
(1.0, 0.0, -1.0)
>>> job_server.print_stats()
Job execution statistics:
 job count | % of all jobs | job time sum | time per job | job server
         3 |        100.00 |       0.0016 |     0.000518 | 127.0.0.1:35000
Time elapsed since server creation 15.123202800750732
0 active tasks, 0 cores
>>>

Get help on the command line options for ppserver:

$ ppserver --help
Parallel Python Network Server (pp-1.7.6.8)
Usage: ppserver [-hdar] [-f format] [-n proto] [-c config_path] [-i interface] [-b broadcast] [-p port] [-w nworkers] [-s secret] [-t seconds] [-k seconds] [-P pid_file]

Options:
-h                 : this help message
-d                 : set log level to debug
-f format          : log format
-a                 : enable auto-discovery service
-r                 : restart worker process after each task completion
-n proto           : protocol number for pickle module
-c path            : path to config file
-i interface       : interface to listen
-b broadcast       : broadcast address for auto-discovery service
-p port            : port to listen
-w nworkers        : number of workers to start
-s secret          : secret for authentication
-t seconds         : timeout to exit if no connections with clients exist
-k seconds         : socket timeout in seconds
-P pid_file        : file to write PID to

To print server stats send SIGUSR1 to its main process (unix only).

Due to the security concerns always use a non-trivial secret key.
Secret key set by -s switch will override secret key assigned by
pp_secret variable in .pythonrc.py

More Information

Probably the best way to get started is to look at the documentation at http://ppft.rtfd.io. Also, you can see a set of example scripts in ppft.tests. You can run the test suite with python -m ppft.tests. ppft will create and execute jobs on local workers (automatically created using python -u -m ppft). Additionally, remote servers can be created with ppserver (or python -m ppft.server), and then jobs can be distributed to remote workers. See --help for more details on how to configure a server. Please feel free to submit a ticket on github, or ask a question on stackoverflow (@Mike McKerns). If you would like to share how you use ppft in your work, please send an email (to mmckerns at uqfoundation dot org).

Citation

If you use ppft to do research that leads to publication, we ask that you acknowledge use of ppft by citing the following in your publication:

M.M. McKerns, L. Strand, T. Sullivan, A. Fang, M.A.G. Aivazis,
"Building a framework for predictive science", Proceedings of
the 10th Python in Science Conference, 2011;
http://arxiv.org/pdf/1202.1056

Michael McKerns and Michael Aivazis,
"pathos: a framework for heterogeneous computing", 2010- ;
https://uqfoundation.github.io/project/pathos

Please see https://uqfoundation.github.io/project/pathos or http://arxiv.org/pdf/1202.1056 for further information.

exception DestroyedServerError

Bases: RuntimeError

class Server(ncpus='autodetect', ppservers=(), secret=None, restart=False, proto=2, socket_timeout=3600)

Bases: object

Parallel Python SMP execution server class

Creates Server instance

ncpus - the number of worker processes to start on the local computer, if parameter is omitted it will be set to the number of processors in the system ppservers - list of active parallel python execution servers to connect with secret - passphrase for network connections, if omitted a default passphrase will be used. It’s highly recommended to use a custom passphrase for all network connections. restart - restart the worker process after each task completion proto - protocol number for pickle module socket_timeout - socket timeout in seconds, which is the maximum time a remote job could be executed. Increase this value if you have long running jobs or decrease if connectivity to remote ppservers is often lost.

With ncpus = 1 all tasks are executed consequently. For the best performance either use the default “autodetect” value or set ncpus to the total number of processors in the system.

__add_to_active_tasks(num)

Updates the number of active tasks

__connect()

Connects to all remote ppservers

__del__()
__detect_ncpus()

Detects the number of effective CPUs in the system

__dumpsfunc(funcs, modules)

Serializes functions and modules

__find_modules(prefix, dict)

recursively finds all the modules in dict

__gentid()

Generates a unique job ID number

__get_source(func)

Fetches source of the function

__scheduler()

Schedules jobs for execution

__stat_add_job(node)

Increments job count on the node

__stat_add_time(node, time_add)

Updates total runtime on the node

__update_active_rworkers(id, count)

Updates list of active rworkers

_run_local(job, sfunc, sargs, worker)

Runs a job locally

_run_remote(job, sfunc, sargs, rworker)

Runs a job remotelly

connect1(host, port, persistent=True)

Conects to a remote ppserver specified by host and port

default_port = 60000
default_secret = 'epo20pdosl;dksldkmm'
destroy()

Kills ppworkers and closes open files

get_active_nodes()

Returns active nodes as a dictionary [keys - nodes, values - ncpus]

get_ncpus()

Returns the number of local worker processes (ppworkers)

get_stats()

Returns job execution statistics as a dictionary

insert(sfunc, sargs, task=None)

Inserts function into the execution queue. It’s intended for internal use only (in ppserver).

print_stats()

Prints job execution statistics. Useful for benchmarking on clusters

set_ncpus(ncpus='autodetect')

Sets the number of local worker processes (ppworkers)

ncpus - the number of worker processes, if parammeter is omitted

it will be set to the number of processors in the system

submit(func, args=(), depfuncs=(), modules=(), callback=None, callbackargs=(), group='default', globals=None)

Submits function to the execution queue

func - function to be executed args - tuple with arguments of the ‘func’ depfuncs - tuple with functions which might be called from ‘func’ modules - tuple with module names to import callback - function which will be called with argument list equal to callbackargs+(result,) as soon as calculation is done callbackargs - additional arguments for callback function group - job group, is used when wait(group) is called to wait for jobs in a given group to finish globals - dictionary from which all modules, functions and classes will be imported, for instance: globals=globals()

wait(group=None)

Waits for all jobs in a given group to finish. If group is omitted waits for all jobs to finish

class Template(job_server, func, depfuncs=(), modules=(), callback=None, callbackargs=(), group='default', globals=None)

Bases: object

Template class

Creates Template instance

jobs_server - pp server for submitting jobs func - function to be executed depfuncs - tuple with functions which might be called from ‘func’ modules - tuple with module names to import callback - function which will be called with argument list equal to callbackargs+(result,) as soon as calculation is done callbackargs - additional arguments for callback function group - job group, is used when wait(group) is called to wait for jobs in a given group to finish globals - dictionary from which all modules, functions and classes will be imported, for instance: globals=globals()

submit(*args)

Submits function with *arg arguments to the execution queue

class _RWorker(host, port, secret, server, message, persistent, socket_timeout)

Bases: CSocketTransport

Remote worker class

Initializes remote worker

__del__()

Closes connection with remote server

connect(message=None)

Connects to a remote server

class _Statistics(ncpus, rworker=None)

Bases: object

Class to hold execution statisitcs for a single node

Initializes statistics for a node

class _Task(server, tid, callback=None, callbackargs=(), group='default')

Bases: object

Class describing single task (job)

Initializes the task

_Task__unpickle()

Unpickles the result of the task

__call__(raw_result=False)

Retrieves result of the task

finalize(sresult)

Finalizes the task.

For internal use only

wait()

Waits for the task

class _Worker(restart_on_free, pickle_proto)

Bases: object

Local worker class

Initializes local worker

command = ['/home/docs/checkouts/readthedocs.org/user_builds/ppft/envs/latest/bin/python', '-u', '-m', 'ppft', '2>/dev/null']
free()

Frees local worker

restart()

Restarts local worker

start()

Starts local worker

stop()

Stops local worker

citation()

print citation

getname(obj)
importable(func)
license()

print license

Indices and tables