Concurrent Programming

Concurrency

Concurrent programming means executing multiple things concurrently (the start and end times of those bits of code overlap, one starts before the other ends).

Parallel programming means executing multiple code statements at the same time (on multiple CPUs).

Parallel programming is a form of concurrent programming.

Concurrent programming is useful for I/O bound tasks. For CPU-bound tasks, you’ll need to go even further and practice parallel programming.

Python has a Global Interpreter Lock, so a single Python process cannot be used for parallel programming. With just one Python process we can only accomplish concurrent programming.

We’re going to look at a number of different types of concurrent (and some parallel) programming in Python.

Example Program

We’ll be using an example program which fetches many URLs for sponsors of the Python Software Foundation and checks to see whether these URLs mention the word “Python”.

We’ll be using this urls.txt file for these programs:

https://research.fb.com/
https://www.capitalone.com/
http://www.fastly.com/
http://www.rackspace.com/
https://aws.amazon.com/
http://www.google.com/
https://www.techatbloomberg.com/
http://www.redhat.com/
http://www.indeed.com/
http://www.microsoft.com/
https://travis-ci.com/
https://www.heroku.com/home/
https://www.union-investment.de/
https://zapier.com/about
http://edgestreamlp.com/
http://www.enthought.com/
https://hostingfacts.com/
https://www.aqr.com/
http://adimian.com/
https://m.do.co/c/783434964889
https://signalfx.com/
http://www.activestate.com/python
http://www.oreilly.com/
http://www.bizrate.com/
http://www.eyesopen.com/
http://www.knmp.nl/
http://lincolnloop.com/
https://anaconda.com/
http://infinite-code.com/
http://www.webucator.com/programming-training/python-training.cfm
http://tivix.com/
https://www.pythonanywhere.com/
https://www.stickermule.com/supports/python
http://www.stickeryou.com/products/custom-stickers/335
http://www.acties.nl/
https://dev.ngo/
https://fusionbox.com/
http://newlogic.io/
https://odoo-community.org/
http://www.opendataservices.coop/
http://www.solden.be/
https://www.1korting.com/
http://www.airportrentals.com/
http://bespaardeals.nl/kortingscode/otto/
http://www.mesika.org/
https://www.verzekeringvergelijk.nl/
http://www.python-academy.com/
https://www.datacamp.com/
https://www.accelebrate.com/python-training
https://www.ukwebhostreview.com/
https://crosscompute.com/
https://www.hostpapa.com/?utm_source=python.org/
http://www.revsys.com/
https://www.confirm.ch/
https://www.motorhomerepublic.com/
https://vpndiensten.nl/
https://www.yourlabs.biz/
https://pythoninstitute.org/
https://www.vpngids.nl/
https://www.cisin.com/
https://www.hrank.com/
https://www.hostinger.com/
http://icons8.com/
https://www.surveymonkey.com/

We’ll need the third-party requests module installed for this code to work.

Here’s a non-concurrent version of this program:

from pathlib import Path
import requests


sponsors = Path('urls.txt').read_text().splitlines()
responses = {}


def fetch_url(url):
    response = requests.get(url)
    try:
        response.raise_for_status()
        responses[url] = response.text
    except Exception as e:
        print(e)


for url in urls:
    fetch_url(url)

print("{} responses for {} URL.".format(len(responses), len(urls)))
for url, response in responses.items():
    if 'python' in response.lower():
        print('{url} contains the word "Python"'.format(url=url))

Here’s an example run of this program:

$ python sponsors.py
403 Client Error: Forbidden for url: http://www.enthought.com/
403 Client Error: Forbidden for url: http://www.acties.nl/

62 responses for 64 URL.
https://aws.amazon.com/ contains the word "Python"
https://www.heroku.com/home/ contains the word "Python"
http://adimian.com/ contains the word "Python"
http://www.activestate.com/python contains the word "Python"
http://www.oreilly.com/ contains the word "Python"
http://www.eyesopen.com/ contains the word "Python"
http://lincolnloop.com/ contains the word "Python"
https://anaconda.com/ contains the word "Python"
http://infinite-code.com/ contains the word "Python"
http://www.webucator.com/programming-training/python-training.cfm contains the word "Python"
https://www.pythonanywhere.com/ contains the word "Python"
https://dev.ngo/ contains the word "Python"
https://fusionbox.com/ contains the word "Python"
http://newlogic.io/ contains the word "Python"
http://www.python-academy.com/ contains the word "Python"
https://www.datacamp.com/ contains the word "Python"
https://www.accelebrate.com/python-training contains the word "Python"
http://www.revsys.com/ contains the word "Python"
https://www.confirm.ch/ contains the word "Python"
https://pythoninstitute.org/ contains the word "Python"

Threading

If you have code in your Python process that is I/O bound and you’d like to use multiple threads to ensure your I/O bound code doesn’t block the rest of your process from running, you can use Python’s threading module.

You can also inherit from the threading.Thread class but this typically isn’t necessary.

from pathlib import Path
import threading

import requests


urls = Path('urls.txt').read_text().splitlines()
responses = {}


def fetch_url(url):
    response = requests.get(url)
    response.raise_for_status()
    responses[url] = response.text


jobs = []
for url in urls:
    job = Threading.Thread(target=fetch_url, args=(url,))
    job.start()
    jobs.append(job)

for job in jobs:
    job.join()

print("{} responses for {} URL.".format(len(responses), len(urls)))
for url, response in responses.items():
    if 'python' in response.lower():
        print('{url} contains the word "Python"'.format(url=url))'{url} contains the word "Python"'.format(url=url))
$ python sponsors.py
Exception in thread Thread-35:
Traceback (most recent call last):
  File "/lib/python3.7/threading.py", line 917, in _bootstrap_inner
    self.run()
  File "/lib/python3.7/threading.py", line 865, in run
    self._target(*self._args, **self._kwargs)
  File "sponsors.py", line 79, in fetch_url
    response.raise_for_status()
  File "/lib/python3.7/site-packages/requests/models.py", line 940, in raise_for_status
    raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 403 Client Error: Forbidden for url: http://www.acties.nl/

Exception in thread Thread-16:
Traceback (most recent call last):
  File "/lib/python3.7/threading.py", line 917, in _bootstrap_inner
    self.run()
  File "/lib/python3.7/threading.py", line 865, in run
    self._target(*self._args, **self._kwargs)
  File "sponsors.py", line 79, in fetch_url
    response.raise_for_status()
  File "/lib/python3.7/site-packages/requests/models.py", line 940, in raise_for_status
    raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 403 Client Error: Forbidden for url: http://www.enthought.com/

62 responses for 64 URL.
http://infinite-code.com/ contains the word "Python"
http://www.oreilly.com/ contains the word "Python"
https://aws.amazon.com/ contains the word "Python"
http://adimian.com/ contains the word "Python"
https://anaconda.com/ contains the word "Python"
http://www.revsys.com/ contains the word "Python"
https://www.heroku.com/home/ contains the word "Python"
http://lincolnloop.com/ contains the word "Python"
http://www.eyesopen.com/ contains the word "Python"
https://www.pythonanywhere.com/ contains the word "Python"
http://newlogic.io/ contains the word "Python"
https://www.accelebrate.com/python-training contains the word "Python"
http://www.webucator.com/programming-training/python-training.cfm contains the word "Python"
https://www.confirm.ch/ contains the word "Python"
https://crosscompute.com/ contains the word "Python"
http://www.activestate.com/python contains the word "Python"
https://fusionbox.com/ contains the word "Python"
https://www.datacamp.com/ contains the word "Python"
https://pythoninstitute.org/ contains the word "Python"
https://dev.ngo/ contains the word "Python"
http://www.python-academy.com/ contains the word "Python"

Multi-Processing

If your program is CPU bound and not I/O bound, threading isn’t going to help because only one thread will ever execute at a time. You need parallelism to make CPU bound programs faster, not just concurrency.

You can use the multiprocessing module in a similar way to the threading module for CPU-bound tasks.

We’re going to take the same I/O-bound program and use multiprocessing instead of threading. This isn’t a great use case for multiprocessing because it’s an I/O-bound program, but we’ll still achieve concurrency (just with more system resources than threading would take since processes are a bit more resource intensive).

Here’s equivalent code written using the multiprocessing module:

from pathlib import Path
import multiprocessing

import requests


urls = Path('urls.txt').read_text().splitlines()
manager = multiprocessing.Manager()
responses = manager.dict()


def fetch_url(url):
    response = requests.get(url)
    response.raise_for_status()
    responses[url] = response.text


jobs = []
for url in urls:
    job = multiprocessing.Process(target=fetch_url, args=(url,))
    job.start()
    jobs.append(job)

for job in jobs:
    job.join()

print("{} responses for {} URL.".format(len(responses), len(urls)))
for url, response in responses.items():
    if 'python' in response.lower():
        print('{url} contains the word "Python"'.format(url=url))

Note that the API for multiprocessing is very similar to the API for threading.

We call multiprocessing.Process the same way we called threading.Thread before. Process objects have start and join methods the same way Thread objects do.

Note that we’re not using a regular Python dictionary to share state between these processes. The reason is that when we spawn a new process with multiprocessing, that process will not share memory with the other processes.

So we need a special multiprocessing.Manager().dict() structure to make a dictionary-like structure that maintains memory between these processes.

$ python sponsors_processes.py
Process Process-17:
Traceback (most recent call last):
  File "/home/trey/.pyenv/versions/3.7.0/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/home/trey/.pyenv/versions/3.7.0/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "sponsors_processes.py", line 14, in fetch_url
    response.raise_for_status()
  File "/home/trey/.virtualenvs/morsels/lib/python3.7/site-packages/requests/models.py", line 940, in raise_for_status
    raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 403 Client Error: Forbidden for url: http://www.enthought.com/
Process Process-36:
Traceback (most recent call last):
  File "/home/trey/.pyenv/versions/3.7.0/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/home/trey/.pyenv/versions/3.7.0/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "sponsors_processes.py", line 14, in fetch_url
    response.raise_for_status()
  File "/home/trey/.virtualenvs/morsels/lib/python3.7/site-packages/requests/models.py", line 940, in raise_for_status
    raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 403 Client Error: Forbidden for url: http://www.acties.nl/
62 responses for 64 URL.
http://www.oreilly.com/ contains the word "Python"
https://aws.amazon.com/ contains the word "Python"
http://adimian.com/ contains the word "Python"
http://infinite-code.com/ contains the word "Python"
http://www.eyesopen.com/ contains the word "Python"
http://newlogic.io/ contains the word "Python"
https://www.accelebrate.com/python-training contains the word "Python"
https://www.heroku.com/home/ contains the word "Python"
http://lincolnloop.com/ contains the word "Python"
http://www.activestate.com/python contains the word "Python"
http://www.revsys.com/ contains the word "Python"
https://fusionbox.com/ contains the word "Python"
https://anaconda.com/ contains the word "Python"
https://www.pythonanywhere.com/ contains the word "Python"
https://www.datacamp.com/ contains the word "Python"
https://www.confirm.ch/ contains the word "Python"
http://www.webucator.com/programming-training/python-training.cfm contains the word "Python"
https://pythoninstitute.org/ contains the word "Python"
http://www.python-academy.com/ contains the word "Python"
https://dev.ngo/ contains the word "Python"

Since Windows doesn’t have os.fork, there are some caveats and restrictions to keep in mind when using multiprocessing on Windows (see the docs).

Managing Shared State

Concurrent programming pretty much necessitates worrying about race conditions.

When using threads, state is shared between threads so you need to be careful to manage that state in a thread-aware way.

The built-in data types in Python are thread safe to an extent, but it’s easy to find yourself adding race conditions to your code when managing shared data structures. For example, if your code checks whether a key is in a dictionary and then adds it if it’s not, you’ve just introduced a race condition.

There are a couple ways to safely manage shared state between threads:

  1. Locks

  2. Message queues

The threading module has a threading.Lock object that you can use for locking pieces of code.

Here’s an example creation of threading.Lock:

urls = Path('urls.txt').read_text().splitlines()
responses = {}
responses_lock = threading.Lock()

And here’s an example usage of this lock in a thread:

def fetch_url(url):
    response = requests.get(url)
    response.raise_for_status()
    with responses_lock:
        responses[url] = response.text

Here’s an example using a message queue meant just for threading that’s provided by the queue module in the standard library:

from pathlib import Path
from queue import Queue
import threading

import requests


urls = Path('urls.txt').read_text().splitlines()
response_queue = Queue()


def fetch_url(url):
    response = requests.get(url)
    response.raise_for_status()
    response_queue.put((url, response.text))


for url in urls:
    threading.Thread(target=fetch_url, args=(url,)).start()

main_thread = threading.main_thread()
for thread in threading.enumerate():
    if thread is not main_thread:
        thread.join()

print("{} responses for {} URL.".format(response_queue.qsize(), len(urls)))
while not response_queue.empty():
    url, response = response_queue.get()
    if 'python' in response.lower():
        print('{url} contains the word "Python"'.format(url=url))

Note

If you’re using multiprocessing, you’ll need to use multiprocessing.Queue() instead of queue.Queue() because you’ll need the queue state to be shared between processes.

This particular example is a little silly because it doesn’t really rely on many of the useful features of message queues (due to the simplicity of the example).

This is nearly equivalent to using a collections.dequeue or a list in Python, both of which are thread-safe for appends and pops. Dictionaries are also thread-safe for additions/updates, so our original un-locked answer happened to be thread-safe (though subtle changes to it could make it suddenly not thread safe which is why shared state should be avoided if possible.

There are many types of message queues, both in Python and outside of Python.

Message queues tend to be much more straightforward and easy to reason about the use of locks. I recommend using some sort of message queue over locking in most cases.

The Concurrent Futures Module

The concurrent.futures module is a simplified way to interact with threading or multiprocessing.

It’s much more restrictive than threading or multiprocessing, but it tends to be quite a bit easier for simple use cases.

Here’s an example using threading:

from pathlib import Path
from concurrent.futures import ThreadPoolExecutor

import requests


urls = Path('urls.txt').read_text().splitlines()


def fetch_url(url):
    try:
        response = requests.get(url)
        response.raise_for_status()
    except Exception as e:
        print(e)
    return url, response.text


pool = ThreadPoolExecutor(max_workers=len(urls))
results = pool.map(fetch_url, urls)
responses = dict(results)


print("{} responses for {} URL.".format(len(responses), len(urls)))
for url, response in responses.items():
    if 'python' in response.lower():
        print('{url} contains the word "Python"'.format(url=url))

And example output:

$ python sponsors.py
403 Client Error: Forbidden for url: http://www.acties.nl/
403 Client Error: Forbidden for url: http://www.enthought.com/
64 responses for 64 URL.
https://aws.amazon.com/ contains the word "Python"
https://www.heroku.com/home/ contains the word "Python"
http://adimian.com/ contains the word "Python"
http://www.activestate.com/python contains the word "Python"
http://www.oreilly.com/ contains the word "Python"
http://www.eyesopen.com/ contains the word "Python"
http://lincolnloop.com/ contains the word "Python"
https://anaconda.com/ contains the word "Python"
http://infinite-code.com/ contains the word "Python"
http://www.webucator.com/programming-training/python-training.cfm contains the word "Python"
https://www.pythonanywhere.com/ contains the word "Python"
https://dev.ngo/ contains the word "Python"
https://fusionbox.com/ contains the word "Python"
http://newlogic.io/ contains the word "Python"
http://www.python-academy.com/ contains the word "Python"
https://www.datacamp.com/ contains the word "Python"
https://www.accelebrate.com/python-training contains the word "Python"
http://www.revsys.com/ contains the word "Python"
https://www.confirm.ch/ contains the word "Python"
https://pythoninstitute.org/ contains the word "Python"

Here’s the same program using multiple processes instead of multiple threads:

from pathlib import Path
from concurrent.futures import ProcessPoolExecutor

import requests


urls = Path('urls.txt').read_text().splitlines()


def fetch_url(url):
    try:
        response = requests.get(url)
        response.raise_for_status()
    except Exception as e:
        print(e)
    return url, response.text


pool = ProcessPoolExecutor(max_workers=len(urls))
results = pool.map(fetch_url, urls)
responses = dict(results)


print("{} responses for {} URL.".format(len(responses), len(urls)))
for url, response in responses.items():
    if 'python' in response.lower():
        print('{url} contains the word "Python"'.format(url=url))

The output is the same as the ThreadPoolExecutor version above.

The utilities in the concurrent.futures module involve inputs and outputs and as long as you avoid mutating shared state in your threads, you can pretty much avoid the need for locking or message queues.

asyncio

If you’re I/O bound your I/O is very slow or you need many connections to I/O at once, you may want to consider asyncio over threading.

Using asyncio can make your code more resource efficient than threading if you do it right, but it can also be tricky to do right and it can sometimes make for very complicated code.