Real World Tutorial 3: Parallel Number crunching using Cython

Python was not designed to be very good at parallel processing. There are two major problems at the core of the language that make it hard to implement parallel algorithms.

  • The Global Interpreter Lock
  • Flexible object model

The first of these issues is the most famous obstacle towards a convincing multi-threading approach, where a single instance of the Python interpreter runs in several threads. The second point is more subtle, but makes it harder to do multi-processing, where several independent instances of the Python interpreter work together to achieve parallelism. We will first explain an elegant way to work around the Global Interpreter Lock, or GIL: use Cython.

Using Cython to lift the GIL

The GIL means that the Python interpreter will only operate on one thread at a time. Even when we think we run in a gazillion threads, Python itself uses only one. Multi-threading in Python is only usefull to wait for I/O and to perform system calls. To do useful CPU intensive work in multi-threaded mode, we need to develop functions that are implemented in C, and tell Python when we call these functions not to worry about the GIL. The easiest way to achieve this, is to use Cython. We develop a number-crunching prime adder, and have it run in parallel threads.

We’ll load the multiprocessing, threading and queue modules to do our plumbing, and the cython extension so we can do the number crunching, as is shown in this blog post.

[1]:
%load_ext cython
import multiprocessing
import threading
import queue

We define a function that computes the sum of all primes below a certain integer n, and don’t try to be smart about it; the point is that it needs a lot of computation. These functions are designated nogil, so that we can be certain no Python objects are accessed. Finally we create a single Python exposed function that uses the:

with nogil:
    ...

statement. This is a context-manager that lifts the GIL for the duration of its contents.

[2]:
%%cython

from libc.math cimport ceil, sqrt


cdef inline int _is_prime(int n) nogil:
   """return a boolean, is the input integer a prime?"""
   if n == 2:
       return True
   cdef int max_i = <int>ceil(sqrt(n))
   cdef int i = 2
   while i <= max_i:
      if n % i == 0:
          return False
      i += 1
   return True


cdef unsigned long _sum_primes(int n) nogil:
   """return sum of all primes less than n """
   cdef unsigned long i = 0
   cdef int x
   for x in range(2, n):
       if _is_prime(x):
           i += x
   return i


def sum_primes(int n):
    with nogil:
        result = _sum_primes(n)
    return result

In fact, we only loaded the multiprocessing module to get the number of CPUs on this machine. We also get a decent amount of work to do in the input_range.

[3]:
input_range = range(int(1e6), int(2e6), int(5e4))
ncpus = multiprocessing.cpu_count()
print("We have {} cores to work on!".format(ncpus))
We have 4 cores to work on!

Single thread

Let’s first run our tests in a single thread:

[4]:
%%time

for i in input_range:
    print(sum_primes(i), end=' ', flush=True)
print()
37550402023 41276629127 45125753695 49161463647 53433406131 57759511224 62287995772 66955471633 71881256647 76875349479 82074443256 87423357964 92878592188 98576757977 104450958704 110431974857 116581137847 122913801665 129451433482 136136977177
CPU times: user 8.62 s, sys: 5.05 ms, total: 8.62 s
Wall time: 8.63 s

Multi-threading: Worker pool

We can do better than that! We now create a queue containing the work to be done, and a pool of threads eating from this queue. The workers will keep on working as long as the queue has work for them.

[5]:
%%time

### We need to define a worker function that fetches jobs from the queue.
def worker(q):
    while True:
        try:
            x = q.get(block=False)
            print(sum_primes(x), end=' ', flush=True)
        except queue.Empty:
            break

### Create the queue, and fill it with input values
work_queue = queue.Queue()
for i in input_range:
    work_queue.put(i)

### Start a number of threads
threads = [
    threading.Thread(target=worker, args=(work_queue,))
    for i in range(ncpus)]

for t in threads:
    t.start()

### Wait until all of them are done
for t in threads:
    t.join()

print()
37550402023 41276629127 45125753695 49161463647 53433406131 57759511224 62287995772 66955471633 71881256647 76875349479 82074443256 87423357964 92878592188 98576757977 104450958704 110431974857 116581137847 122913801665 129451433482 136136977177
CPU times: user 14.7 s, sys: 9.1 ms, total: 14.7 s
Wall time: 3.98 s

Using Noodles

On my laptop, a dual-core hyper-threaded Intel(R) Core(TM) i5-5300U CPU, this runs just over two times faster than the single threaded code. However, setting up a queue and a pool of workers is quite cumbersome. Also, this approach doesn’t scale up if the dependencies between our computations get more complex. Next we’ll use Noodles to provide the multi-threaded environment to execute our work. We’ll need three functions:

  • schedule to decorate our work function
  • run_parallel to run the work in parallel
  • gather to collect our work into a workflow
[6]:
from noodles import (schedule, run_parallel, gather)
[7]:
%%time

@schedule
def s_sum_primes(n):
    result = sum_primes(n)
    print(result, end=' ', flush=True)
    return result

p_prime_sums = gather(*(s_sum_primes(i) for i in input_range))
prime_sums = run_parallel(p_prime_sums, n_threads=ncpus)
print()
37550402023 41276629127 45125753695 49161463647 53433406131 57759511224 62287995772 66955471633 71881256647 76875349479 82074443256 87423357964 92878592188 98576757977 104450958704 110431974857 116581137847 122913801665 129451433482 136136977177
CPU times: user 14.7 s, sys: 11.8 ms, total: 14.7 s
Wall time: 4.08 s

That does look much nicer! We have much less code, the code we do have is clearly separating function and form, and this approach is easily expandable to more complex situations.