ppft package documentation
ppft: distributed and parallel Python
About Ppft
ppft
is a friendly fork of Parallel Python (pp
). ppft
extends Parallel Python to provide packaging and distribution with pip
and setuptools
, support for Python 3, and enhanced serialization using dill.source
. ppft
uses Parallel Python to provide mechanisms for the parallel execution of Python code on SMP (systems with multiple processors or cores) and clusters (computers connected via network).
Software written in Python finds applications in a broad range of the categories including business logic, data analysis, and scientific calculations. This together with wide availability of SMP computers (multi-processor or multi-core) and clusters (computers connected via network) on the market create the demand in parallel execution of Python code.
The most common way to write parallel applications for SMP computers is to use threads. However, the Python interpreter uses the GIL (Global Interpreter Lock) for internal bookkeeping, where the GIL only allows one Python byte-code instruction to execute at a time, even on an SMP computer. Parallel Python overcomes this limitation, and provides a simple way to write parallel Python applications. Internally, processes and IPC (Inter Process Communications) are used to organize parallel computations. Parallel Python is written so that the details and complexity of IPC are handled internally, and the calling application just submits jobs and retrieves the results. Software written with Parallel Python works in parallel on many computers connected via a local network or the Internet. Cross-platform portability and dynamic load-balancing allows Parallel Python to parallelize computations efficiently even on heterogeneous and multi-platform clusters. Visit http://www.parallelpython.com for further information on Parallel Python.
ppft
is part of pathos
, a Python framework for heterogeneous computing.
ppft
is in active development, so any user feedback, bug reports, comments,
or suggestions are highly appreciated. A list of issues is located at https://github.com/uqfoundation/ppft/issues, with a legacy list maintained at https://uqfoundation.github.io/project/pathos/query.
Major Features
ppft
provides:
parallel execution of Python code on SMP and clusters
easy-to-understand job-based parallelization
automatic detection of the number of effective processors
dynamic processor allocation (at runtime)
low overhead for jobs with the same function (through transparent caching)
dynamic load balancing (jobs are distributed at runtime)
fault-tolerance (if a node fails, tasks are rescheduled on the others)
auto-discovery of computational resources
dynamic allocation of computational resources
SHA based authentication for network connections
enhanced serialization, using
dill.source
Current Release
The latest released version of ppft
is available from:
ppft
is distributed under a 3-clause BSD license, and is a fork of pp-1.6.6
.
Development Version
You can get the latest development version with all the shiny new features at:
If you have a new contribution, please submit a pull request.
Installation
ppft
can be installed with pip
:
$ pip install ppft
To include enhanced serialization, using dill.source
, install:
$ pip install ppft[dill]
If Parallel Python is already installed, it should be uninstalled before ppft
is installed – otherwise, import pp
may point to the original and not to the ppft
fork.
Requirements
ppft
requires:
python
(orpypy
), >=3.8
setuptools
, >=42
Optional requirements:
dill
, >=0.3.9
Basic Usage
ppft
is a fork of the Parallel Python package (pp
) that has been
converted from Python 2 to Python 3, made PEP 517 compliant, and augmented
with dill.source
. For simple parallel execution, first create a job
Server
where the number nodes available is autodetected:
>>> import ppft as pp
>>> job_server = pp.Server()
The number of nodes can be specified by passing an int as the first argument
when creating the server (i.e. Server(4)
creates a server with four nodes).
The server uses submit
to execute jobs in parallel. submit
takes a
function, a tuple of the arguments to pass to the function, a tuple of any
functions used but not imported in the function, and a tuple of any modules
required to produce the function:
>>> import math
>>> f1 = job_server.submit(math.sin, (math.pi/2,), (), ('math',))
>>> f2 = job_server.submit(min, (3.2, 10.0, 7.5), (), ())
>>> f3 = job_server.submit(sum, ([1,2,3],), (), ())
The functions are serialized by dill.source
(as opposed to dill
), by
extracting and passing the source code to the server. The server compiles
and executes the source code, and then calls the function with the arguments
passed in the tuple. Any function and module dependencies are imported
before exec
is called on the source code. Results are retrieved by
calling the object returned from submit
:
>>> f1()
1.0
>>> f2()
3.2
>>> f3()
6
Job server execution statistics can be printed with:
>>> job_server.print_stats()
Job execution statistics:
job count | % of all jobs | job time sum | time per job | job server
3 | 100.00 | 0.0051 | 0.001684 | local
Time elapsed since server creation 148.48280715942383
0 active tasks, 4 cores
ppft
also can execute jobs on remote computational nodes, if a ppserver
is first started on the node. Here the ppserver
is started on 127.0.0.1,
and will listen on port 35000:
$ ppserver -a -p 35000
Then, locally, instantiate a Server
with the connection information
for the remote node, submit some jobs, and retrieve the results:
>>> job_server = pp.Server(ppservers=('127.0.0.1:35000',))
>>> f1 = job_server.submit(math.sin, (math.pi/2,), (), ('math',))
>>> f2 = job_server.submit(math.sin, (0,), (), ('math',))
>>> f3 = job_server.submit(math.sin, (-math.pi/2,), (), ('math',))
>>> f1(),f2(),f3()
(1.0, 0.0, -1.0)
>>>
However, the stats show that all of the jobs were run locally:
>>> job_server.print_stats()
Job execution statistics:
job count | % of all jobs | job time sum | time per job | job server
3 | 100.00 | 0.0024 | 0.000812 | local
Time elapsed since server creation 31.755322217941284
0 active tasks, 4 cores
This is due because we don’t specify the number of nodes. The number of nodes
are specified both in the ppserver
and in the local job Server
. Thus,
the above is actually “autobalance” between 4 local nodes and 4 remote nodes.
The former is naturally going to be preferred; however, if the local server is
flooded with jobs, some will get sent to the remote ppserver
, and that will
be reflected in the stats. To run all jobs remotely, set the number of local
nodes to zero:
>>> job_server = pp.Server(0, ppservers=('127.0.0.1:35000',))
>>> f1 = job_server.submit(math.sin, (math.pi/2,), (), ('math',))
>>> f2 = job_server.submit(math.sin, (0,), (), ('math',))
>>> f3 = job_server.submit(math.sin, (-math.pi/2,), (), ('math',))
>>> f1(),f2(),f3()
(1.0, 0.0, -1.0)
>>> job_server.print_stats()
Job execution statistics:
job count | % of all jobs | job time sum | time per job | job server
3 | 100.00 | 0.0016 | 0.000518 | 127.0.0.1:35000
Time elapsed since server creation 15.123202800750732
0 active tasks, 0 cores
>>>
Get help on the command line options for ppserver
:
$ ppserver --help
Parallel Python Network Server (pp-1.7.6.9)
Usage: ppserver [-hdar] [-f format] [-n proto] [-c config_path] [-i interface] [-b broadcast] [-p port] [-w nworkers] [-s secret] [-t seconds] [-k seconds] [-P pid_file]
Options:
-h : this help message
-d : set log level to debug
-f format : log format
-a : enable auto-discovery service
-r : restart worker process after each task completion
-n proto : protocol number for pickle module
-c path : path to config file
-i interface : interface to listen
-b broadcast : broadcast address for auto-discovery service
-p port : port to listen
-w nworkers : number of workers to start
-s secret : secret for authentication
-t seconds : timeout to exit if no connections with clients exist
-k seconds : socket timeout in seconds
-P pid_file : file to write PID to
To print server stats send SIGUSR1 to its main process (unix only).
Due to the security concerns always use a non-trivial secret key.
Secret key set by -s switch will override secret key assigned by
pp_secret variable in .pythonrc.py
More Information
Probably the best way to get started is to look at the documentation at
http://ppft.rtfd.io. Also, you can see a set of example scripts in
ppft.tests
. You can run the test suite with python -m ppft.tests
.
ppft
will create and execute jobs on local workers (automatically created
using python -u -m ppft
). Additionally, remote servers can be created with
ppserver
(or python -m ppft.server
), and then jobs can be distributed
to remote workers. See --help
for more details on how to configure a server.
Please feel free to submit a ticket on github, or ask a question on
stackoverflow (@Mike McKerns). If you would like to share how you use
ppft
in your work, please send an email (to mmckerns at uqfoundation dot org).
Citation
If you use ppft
to do research that leads to publication, we ask that you
acknowledge use of ppft
by citing the following in your publication:
M.M. McKerns, L. Strand, T. Sullivan, A. Fang, M.A.G. Aivazis,
"Building a framework for predictive science", Proceedings of
the 10th Python in Science Conference, 2011;
http://arxiv.org/pdf/1202.1056
Michael McKerns and Michael Aivazis,
"pathos: a framework for heterogeneous computing", 2010- ;
https://uqfoundation.github.io/project/pathos
Please see https://uqfoundation.github.io/project/pathos or http://arxiv.org/pdf/1202.1056 for further information.
- exception DestroyedServerError
Bases:
RuntimeError
- class Server(ncpus='autodetect', ppservers=(), secret=None, restart=False, proto=2, socket_timeout=3600)
Bases:
object
Parallel Python SMP execution server class
Creates Server instance
ncpus - the number of worker processes to start on the local computer, if parameter is omitted it will be set to the number of processors in the system ppservers - list of active parallel python execution servers to connect with secret - passphrase for network connections, if omitted a default passphrase will be used. It’s highly recommended to use a custom passphrase for all network connections. restart - restart the worker process after each task completion proto - protocol number for pickle module socket_timeout - socket timeout in seconds, which is the maximum time a remote job could be executed. Increase this value if you have long running jobs or decrease if connectivity to remote ppservers is often lost.
With ncpus = 1 all tasks are executed consequently. For the best performance either use the default “autodetect” value or set ncpus to the total number of processors in the system.
- __add_to_active_tasks(num)
Updates the number of active tasks
- __connect()
Connects to all remote ppservers
- __del__()
- __detect_ncpus()
Detects the number of effective CPUs in the system
- __dumpsfunc(funcs, modules)
Serializes functions and modules
- __find_modules(prefix, dict)
recursively finds all the modules in dict
- __gentid()
Generates a unique job ID number
- __get_source(func)
Fetches source of the function
- __scheduler()
Schedules jobs for execution
- __stat_add_job(node)
Increments job count on the node
- __stat_add_time(node, time_add)
Updates total runtime on the node
- __update_active_rworkers(id, count)
Updates list of active rworkers
- _run_local(job, sfunc, sargs, worker)
Runs a job locally
- _run_remote(job, sfunc, sargs, rworker)
Runs a job remotelly
- connect1(host, port, persistent=True)
Conects to a remote ppserver specified by host and port
- default_port = 60000
- default_secret = 'epo20pdosl;dksldkmm'
- destroy()
Kills ppworkers and closes open files
- get_active_nodes()
Returns active nodes as a dictionary [keys - nodes, values - ncpus]
- get_ncpus()
Returns the number of local worker processes (ppworkers)
- get_stats()
Returns job execution statistics as a dictionary
- insert(sfunc, sargs, task=None)
Inserts function into the execution queue. It’s intended for internal use only (in ppserver).
- print_stats()
Prints job execution statistics. Useful for benchmarking on clusters
- set_ncpus(ncpus='autodetect')
Sets the number of local worker processes (ppworkers)
- ncpus - the number of worker processes, if parammeter is omitted
it will be set to the number of processors in the system
- submit(func, args=(), depfuncs=(), modules=(), callback=None, callbackargs=(), group='default', globals=None)
Submits function to the execution queue
func - function to be executed args - tuple with arguments of the ‘func’ depfuncs - tuple with functions which might be called from ‘func’ modules - tuple with module names to import callback - function which will be called with argument list equal to callbackargs+(result,) as soon as calculation is done callbackargs - additional arguments for callback function group - job group, is used when wait(group) is called to wait for jobs in a given group to finish globals - dictionary from which all modules, functions and classes will be imported, for instance: globals=globals()
- wait(group=None)
Waits for all jobs in a given group to finish. If group is omitted waits for all jobs to finish
- class Template(job_server, func, depfuncs=(), modules=(), callback=None, callbackargs=(), group='default', globals=None)
Bases:
object
Template class
Creates Template instance
jobs_server - pp server for submitting jobs func - function to be executed depfuncs - tuple with functions which might be called from ‘func’ modules - tuple with module names to import callback - function which will be called with argument list equal to callbackargs+(result,) as soon as calculation is done callbackargs - additional arguments for callback function group - job group, is used when wait(group) is called to wait for jobs in a given group to finish globals - dictionary from which all modules, functions and classes will be imported, for instance: globals=globals()
- class _RWorker(host, port, secret, server, message, persistent, socket_timeout)
Bases:
CSocketTransport
Remote worker class
Initializes remote worker
- __del__()
Closes connection with remote server
- connect(message=None)
Connects to a remote server
- class _Statistics(ncpus, rworker=None)
Bases:
object
Class to hold execution statisitcs for a single node
Initializes statistics for a node
- class _Task(server, tid, callback=None, callbackargs=(), group='default')
Bases:
object
Class describing single task (job)
Initializes the task
- _Task__unpickle()
Unpickles the result of the task
- __call__(raw_result=False)
Retrieves result of the task
- finalize(sresult)
Finalizes the task.
For internal use only
- wait()
Waits for the task
- class _Worker(restart_on_free, pickle_proto)
Bases:
object
Local worker class
Initializes local worker
- command = ['/home/docs/checkouts/readthedocs.org/user_builds/ppft/envs/latest/bin/python', '-u', '-m', 'ppft', '2>/dev/null']
- free()
Frees local worker
- restart()
Restarts local worker
- start()
Starts local worker
- stop()
Stops local worker
- citation()
print citation
- getname(obj)
- importable(func)
- license()
print license