I had a large pile of data to feed through an expensive function. The concurrent.futures module in the Python standard library has always worked well for me as a simple way to farm out work across threads or processes.
Update: this code swallows exceptions. An improved version is at Do a pile of work better.
For example, if my work function is “workfn”, and it takes tuples of arguments as produced by “argsfn()”, this is how you could run them all:
for args in argsfn():
workfn(*args)
This is how you would run them on a number of threads:
import concurrent.futures as cf
with cf.ThreadPoolExecutor(max_workers=nthreads) as executor:
for args in argsfn():
executor.submit(workfn, *args)
But this will generate all of the arguments up-front. If I have millions of work invocations, this could be a problem. I wanted a way to feed the tasks in as they are processed, to keep the queue small. And I wanted a progress bar.
I started from this Stack Overflow answer, added in tqdm for a progress bar, and made this:
import concurrent.futures as cf
from tqdm import tqdm
def wait_first(futures):
"""
Wait for the first future to complete.
Returns:
(done, not_done): two sets of futures.
"""
return cf.wait(futures, return_when=cf.FIRST_COMPLETED)
def do_work(nthreads, argsfn, workfn):
"""
Do a pile of work, maybe in threads, with a progress bar.
Two callables are provided: `workfn` is the unit of work to be done,
many times. Its arguments are provided by calling `argsfn`, which
must produce a sequence of tuples. `argsfn` will be called a few
times, and must produce the same sequence each time.
Args:
nthreads: the number of threads to use.
argsfn: a callable that produces tuples, the arguments to `workfn`.
workfn: a callable that does work.
"""
total = sum(1 for _ in argsfn())
with tqdm(total=total, smoothing=0.1) as progressbar:
if nthreads:
limit = 2 * nthreads
not_done = set()
with cf.ThreadPoolExecutor(max_workers=nthreads) as executor:
for args in argsfn():
if len(not_done) >= limit:
done, not_done = wait_first(not_done)
progressbar.update(len(done))
not_done.add(executor.submit(workfn, *args))
while not_done:
done, not_done = wait_first(not_done)
progressbar.update(len(done))
else:
for args in argsfn():
workfn(*args)
progressbar.update(1)
There might be a better way. I don’t like the duplication of the wait_first call, but this works, and produces the right results.
BTW: my actual work function spawned subprocesses, which is why a thread pool worked to give me parallelism. A pure-Python work function wouldn’t get a speed-up this way, but a ProcessPoolExecutor could help.
Comments
I really, really wanted it to be shorter, but without with_backpressure, the imap_unordered() call will consume the whole iterator immediately (it's lazy only in some ways, see https://bugs.python.org/issue40110 for details).
Similar to your example, pure-Python code can be sped up by using multiprocessing.Pool instead.
Add a comment: