Per Erik Strandberg /cv /kurser /blog

Here I investigate a few ways to do multiprocessing in Python. I investigate Fork, Thread, Multiprocessing and a Queue.

First Step: Fork

In this first step we make a spork with the value 42. After forking with os.fork the two processes are cloned. Now they both set the value to different things. Even though the child does the last modification (2 sec delay instead of 1 sec.) the parent process does not keep its value. Why is this?

Well, you might want to read up on it a bit in Wikipedia: [1]. But in this particular case you have two spork instances in the two processes - these are not the same spork. So no matter how much the child alters its spork, the parent spork will remain unchanged.

The output under cygwin looks something like this (notice that the child process sets the fish but that it does not impact the parent):

$ python fork1.py
[4440]: 42
[4440]: parent says child has pid 5880
[4440]: 1337
[5880]: fish
[4440]: 1337

The code in this case is:

import os
from time import sleep

class spork:

    def __init__(self, n):
        self._n = n
        return

    def update(self, n):
	self._n =n

    def hello(self):
        print "[%s]: %s" % (os.getpid(), self._n)
        return

if __name__ == '__main__':                                              
    s = spork(42)
    s.hello()

    pid = os.fork()

    if pid:
        # parent                                                                
        print "[%s]: parent says child has pid %s" % (os.getpid(), pid)
        sleep(1)
        s.update(1337)
        s.hello()
        os.waitpid(pid, 0)
    else:
        #child                                                                  
        sleep(2)
        s.update('fish')
        s.hello()
        exit(0)
    s.hello()

Step 2: pipe between processes

In my case I want the value to be passed from the child to the parent process. A common way seems to be to use pipes. A pipe is created and the child is allowed to write and the parent read - this way the child can pass a message to the parent. In this example I send a message a few times but it feels kinda sloppy. Notice in the output that the child passes a value to the parent:

$ python fork2.py
[1416]: 42
[1416]: parent says child has pid 5636
[5636]: fish
[1416]: fish
[5636]: stingray
[1416]: stingray
[5636]: cocoon
[1416]: cocoon
[5636]: yogaplant
[1416]: yogaplant
[1416]: yogaplant

The code here uses a fork, and os.fdopen to open the read and write from the pipe as file- or stream-like handles:

if __name__ == '__main__':                                              
    #                                                                           
    # open a pipe that the child writes to and the parent reads from            
    # see http://www.myelin.co.nz/post/2003/3/13/                                   
    #                                                                           
    s = spork(42)
    s.hello()

    r, w = os.pipe()
    pid = os.fork()

    if pid:
        # parent now closes write and reads                                     
        print "[%s]: parent says child has pid %s" % (os.getpid(), pid)
        sleep(1)
        os.close(w)
        handle = os.fdopen(r)
        while True:
            fish = handle.readline().strip()
            if not fish:
                break
            s.update(fish)
            s.hello()
        os.waitpid(pid, 0)
    else:
        #child now closes read and writes                                       
        sleep(2)
        os.close(r)
        handle = os.fdopen(w, 'w')
        for fish in ['fish', 'stingray', 'cocoon', 'yogaplant']:
            sleep(1)
            s.update(fish)
            s.hello()
            handle.write(fish+'\n')
            handle.flush()
        handle.close()
        exit(0)
    s.hello()

Step 3: Multiprocess safe Queue

The real problem I had required an unknown amount of values to be send. Here I also added a kill pill.


import os
from time import sleep
from Queue import Empty
from multiprocessing import Queue
from signal import SIGTERM, SIGKILL

class safe_spork():

    def __init__(self, q, n):
        self._q = q
        self._n = n
        self.poll()
        self.pid = None
        return

    def set_pid(self, pid):
        self.pid = pid

    def add(self, n):
        self.hello('Spork adds %s to queue' % n)
        self._q.put_nowait(n)
        return

    def poll(self):
        if self._q.empty():
            self.hello('there was nothing new in the queue(?)')
        try:
            self._n = self._q.get_nowait()
            self.hello('I found something new')
        except Empty, e:
            self.hello('Found nothing in the queue')
            return False

        if self._n == 'killpill':
            self.hello('Mmmh, cyanide...')
            try:
                os.kill(self.pid, SIGTERM)
            except OSError:
                pass
            sleep(0.55)
            try:
                os.kill(self.pid, SIGKILL)
            except OSError:
                pass
            exit(0)

        return True

    def read(self):
        return self._n

    def hello(self, msg=""):
        print "[%s]: %s | %s" % (os.getpid(), self.read(), msg )
        return

    def check(self):
        self.hello("queue has %s items" % self._q.qsize())


if __name__ == '__main__':
    #
    # use a multiprocess Queue
    #
    q = Queue()
    s = safe_spork(q, 42)
    s.hello()

    pid = os.fork()

    if pid:
        # parent
        s.set_pid(pid)
        print "[%s]: parent says child has pid %s" % (os.getpid(), pid)
        sleep(1.2)
        s.add(1337)
        s.add(1337)
        s.poll()
        print "parent will wait for kid"
        ret = False
        while not ret:
            try:
                # toggle here for interesting results
                ret = os.waitpid(pid, os.WNOHANG)[0]
                #ret = os.waitpid(pid, 0)[0]
            except OSError, e:
                ret = True
                print "kid completed"
            print "parent is still waiting - check queue [%s]" % ret
            s.poll()
            sleep(0.97)
        print "parent is done waiting for kid - clear queue and exit"
        something = True
        it = 40
        while (something) and (it > 0):
            it -= 1
            something = s.poll()
            sleep(0.16)
        print "queue is empty - let's move on, it was %s" % it

    else:
        #child
        sleep(2.6)
        for i in xrange(5):
            s.add('fish%s' % i)
        print "first batch of fish is away!"

        sleep(7.1)
        s.add('killpill')
        print "killpill sent!"
        sleep(5.1)

        for i in xrange(5,12):
            s.add('fish%s' % i)
        print "kid added more fish, will now exit"
        exit(0)
    s.check()

In the naive case we await the child and then process the queue - we use ret = os.waitpid(pid, 0)[0]. Here the parent is the 14033-process and the child the 14034-process. The output will be in this form:

[14033]: 42 | there was nothing new in the queue(?)
[14033]: 42 | Found nothing in the queue
[14033]: 42 | 
[14033]: parent says child has pid 14034
[14033]: 42 | Spork adds 1337 to queue
[14033]: 42 | Spork adds 1337 to queue
[14033]: 1337 | I found something new
parent will wait for kid
[14034]: 42 | Spork adds fish0 to queue
[14034]: 42 | Spork adds fish1 to queue
[14034]: 42 | Spork adds fish2 to queue
[14034]: 42 | Spork adds fish3 to queue
[14034]: 42 | Spork adds fish4 to queue
first batch of fish is away!
[14034]: 42 | Spork adds killpill to queue
killpill sent!
[14034]: 42 | Spork adds fish5 to queue
[14034]: 42 | Spork adds fish6 to queue
[14034]: 42 | Spork adds fish7 to queue
[14034]: 42 | Spork adds fish8 to queue
[14034]: 42 | Spork adds fish9 to queue
[14034]: 42 | Spork adds fish10 to queue
[14034]: 42 | Spork adds fish11 to queue
kid added more fish, will now exit
parent is still waiting - check queue [14034]
[14033]: 1337 | I found something new
parent is done waiting for kid - clear queue and exit
[14033]: fish0 | I found something new
[14033]: fish1 | I found something new
[14033]: fish2 | I found something new
[14033]: fish3 | I found something new
[14033]: fish4 | I found something new
[14033]: killpill | I found something new
[14033]: killpill | Mmmh, cyanide...

In the second version we run the processes in parallel with ret = os.waitpid(pid, os.WNOHANG)[0]. Here the parent will not wait for the child - instead it will poll the queue over and over again.

[14123]: 42 | there was nothing new in the queue(?)
[14123]: 42 | Found nothing in the queue
[14123]: 42 | 
[14123]: parent says child has pid 14124
[14123]: 42 | Spork adds 1337 to queue
[14123]: 42 | Spork adds 1337 to queue
[14123]: 42 | there was nothing new in the queue(?)
[14123]: 1337 | I found something new
parent will wait for kid
parent is still waiting - check queue [0]
[14123]: 1337 | I found something new
parent is still waiting - check queue [0]
[14123]: 1337 | there was nothing new in the queue(?)
[14123]: 1337 | Found nothing in the queue
[14124]: 42 | Spork adds fish0 to queue
[14124]: 42 | Spork adds fish1 to queue
[14124]: 42 | Spork adds fish2 to queue
[14124]: 42 | Spork adds fish3 to queue
[14124]: 42 | Spork adds fish4 to queue
first batch of fish is away!
parent is still waiting - check queue [0]
[14123]: fish0 | I found something new
parent is still waiting - check queue [0]
[14123]: fish1 | I found something new
parent is still waiting - check queue [0]
[14123]: fish2 | I found something new
parent is still waiting - check queue [0]
[14123]: fish3 | I found something new
parent is still waiting - check queue [0]
[14123]: fish4 | I found something new
parent is still waiting - check queue [0]
[14123]: fish4 | there was nothing new in the queue(?)
[14123]: fish4 | Found nothing in the queue
parent is still waiting - check queue [0]
[14123]: fish4 | there was nothing new in the queue(?)
[14123]: fish4 | Found nothing in the queue
[14124]: 42 | Spork adds killpill to queue
killpill sent!
parent is still waiting - check queue [0]
[14123]: killpill | I found something new
[14123]: killpill | Mmmh, cyanide...

Step 4: Multiprocessing

Since I am a pretty curious person I wanted to know how easy or intuitive the multiprocessing library in standard Python is - so I tested working with it as well. It turns out it is very intuitive. In this example I create 8 child processes that with the parent:

I also add some testing of the at exit (atexit) module. With it I let the system take care of terminating the children if I were to encounter an exception or do exit(): atexit.register(worker.terminate).

import multiprocessing
import time
import sys
import atexit
from signal import SIGKILL, SIGTERM
from os import kill, getpid

def log(logstream, lock, msg):
    msg = "%s: %s" % (multiprocessing.current_process().name, msg)
    with lock:
        logstream.write(msg + '\n')
        logstream.flush()
    return

def subwork_A(logstream, name, n, delay, lock):
    log(logstream, lock, "%s: I will do some subwork" % name)
    for i in xrange(n):
        time.sleep(delay)
        log(logstream, lock, "%s: Work iteration %s of %s" % (name, i, n))
    log(logstream, lock, "%s: Done working" % name)
    return

def subwork_B(logstream, name, n, delay, lock):
    log(logstream, lock, "%s: I only do fast stuff" % name)
    for i in xrange(n):
        time.sleep(delay/4)
        log(logstream, lock, "%s: Look how fast I finised %s" % (name, i))
    log(logstream, lock, "%s: Done working" % name)
    return


if __name__ == "__main__":
    stdoutlock = multiprocessing.Lock()
    names = ['Fyodor', 'Mordor', 'Compilator', 'Terminator']
    its = [1, 1, 2, 5]
    delays = [0.4, 0.7, 0.47, 0.092]
    workers = list()
    for i in xrange(4):
        args = (sys.stdout, 'A-'+names[i], its[i], delays[i], stdoutlock)
        workers.append(multiprocessing.Process(target=subwork_A, args=args))
        args = (sys.stdout, 'B-'+names[i], its[i], delays[i], stdoutlock)
        workers.append(multiprocessing.Process(target=subwork_B, args=args))

    log(sys.stdout, stdoutlock, "all workers in position")
    for worker in workers:
        worker.start()
        atexit.register(worker.terminate)

    log(sys.stdout, stdoutlock, "all workers started")

    if False: # True:
        log(sys.stdout, stdoutlock, "will the workers die now?")

        # there are not caught by atexit.register
        #kill(getpid(), SIGKILL)
        #kill(getpid(), SIGTERM)

        # there are caught by atexit.register
        #exit()
        raise Exception('hello')

    for _ in xrange(3):
        all_dead = True
        for worker in workers:
            if worker.is_alive():
                all_dead = False
        if all_dead:
            log(sys.stdout, stdoutlock, "all workers are dead")
            break
        else:
            log(sys.stdout, stdoutlock, "they are not all dead")
            time.sleep(0.19)

    log(sys.stdout, stdoutlock, "I've had enough and want to exit now!")

    for worker in workers:
        if worker.is_alive():
            log(sys.stdout, stdoutlock, "silence %s I kill you!" % worker.name)
            worker.terminate()

    for worker in workers:
        worker.join()

    log(sys.stdout, stdoutlock, "all workers joined")

As the code is written above you get output similar to the below. By raising an exception in the if-false-branch the output will be different. Also note that atexit does not catch SIGKILL or SIGTERM.

MainProcess: all workers in position
Process-1: A-Fyodor: I will do some subwork
Process-2: B-Fyodor: I only do fast stuff
Process-3: A-Mordor: I will do some subwork
Process-4: B-Mordor: I only do fast stuff
Process-5: A-Compilator: I will do some subwork
Process-6: B-Compilator: I only do fast stuff
Process-7: A-Terminator: I will do some subwork
MainProcess: all workers started
MainProcess: they are not all dead
Process-8: B-Terminator: I only do fast stuff
Process-8: B-Terminator: Look how fast I finised 0
Process-8: B-Terminator: Look how fast I finised 1
Process-8: B-Terminator: Look how fast I finised 2
Process-7: A-Terminator: Work iteration 0 of 5
Process-8: B-Terminator: Look how fast I finised 3
Process-2: B-Fyodor: Look how fast I finised 0
Process-2: B-Fyodor: Done working
Process-8: B-Terminator: Look how fast I finised 4
Process-8: B-Terminator: Done working
Process-6: B-Compilator: Look how fast I finised 0
Process-4: B-Mordor: Look how fast I finised 0
Process-4: B-Mordor: Done working
Process-7: A-Terminator: Work iteration 1 of 5
MainProcess: they are not all dead
Process-6: B-Compilator: Look how fast I finised 1
Process-6: B-Compilator: Done working
Process-7: A-Terminator: Work iteration 2 of 5
Process-7: A-Terminator: Work iteration 3 of 5
MainProcess: they are not all dead
Process-1: A-Fyodor: Work iteration 0 of 1
Process-1: A-Fyodor: Done working
Process-7: A-Terminator: Work iteration 4 of 5
Process-7: A-Terminator: Done working
Process-5: A-Compilator: Work iteration 0 of 2
MainProcess: I've had enough and want to exit now!
MainProcess: silence Process-3 I kill you!
MainProcess: silence Process-5 I kill you!
MainProcess: all workers joined

Step 5: Thread

Example with threads inspired by Doug Hellmann's Python Module of the Week, see [2].

We use a Multiprocess Queue to add jobs from the main process. The threads pull jobs from it and process them one by one. We use -1 as a kill pill.

from threading import Thread
from multiprocessing import Queue
from Queue import Empty
from time import sleep
import logging

logging.basicConfig(level=logging.DEBUG, format='%(message)s')

class MyThread(Thread):
    """My worker thread"""

    def __init__(self, logger, queue, name, delay):
        """Ctor"""
        Thread.__init__(self)
        self.q = queue
        self.delay = delay
        self.name = name
        self.logger = logger
        self.log("I am alive now.")
        return

    def start(self):
        self.log("Starting")
        Thread.start(self)
        return

    def log(self, message):
        """Do some logging"""
        self.logger("    %s -- %s" % (self.name, message))
        return

    def work(self, item):
        """Do the real work"""
        self.log("Working on item %s" % item)
        sleep(self.delay * 0.77)
        return item > 0

    def run(self):
        """Work loop"""
        alive = True
        while alive:
            try:
                item = self.q.get_nowait()
                alive = self.work(item)
            except Empty:
                self.log("No job for me.")
                sleep(self.delay * 3.13)
        self.log("I am Done!!!")
        return

if __name__ == "__main__":

    logger = logging.debug
    q = Queue()

    def mylog(message):
        logger("%s" % message)

    names = ['Fyodor', 'Mordor', 'Terminator']
    delays = [1, 0.7, 0.47]
    threads = list()

    mylog("Spawning threads")
    for i in xrange(3):
        t = MyThread(logger, q, names[i], delays[i])
        threads.append(t)
    sleep(1.7)

    mylog("Start the threads")
    for thread in threads:
        thread.start()

    sleep(4.7)

    mylog("Add some jobs, then allow some jobs to finish")
    for N in xrange(100, 170, 9):
        q.put(N)
    sleep(2.2)

    mylog("Kill threads (once all jobs are done)")
    for _ in threads:
        q.put(-1)

The output is

Spawning threads
    Fyodor -- I am alive now.
    Mordor -- I am alive now.
    Terminator -- I am alive now.
Start the threads
    Fyodor -- Starting
    Mordor -- Starting
    Fyodor -- No job for me.
    Mordor -- No job for me.
    Terminator -- Starting
    Terminator -- No job for me.
    Terminator -- No job for me.
    Mordor -- No job for me.
    Terminator -- No job for me.
    Fyodor -- No job for me.
    Mordor -- No job for me.
    Terminator -- No job for me.
Add some jobs, then allow some jobs to finish
    Terminator -- Working on item 100
    Terminator -- Working on item 109
    Fyodor -- Working on item 118
    Mordor -- Working on item 127
    Terminator -- Working on item 136
Kill threads (once all jobs are done)
    Terminator -- Working on item 145
    Fyodor -- Working on item 154
    Mordor -- Working on item 163
    Terminator -- Working on item -1
    Mordor -- Working on item -1
    Terminator -- I am Done!!!
    Fyodor -- Working on item -1
    Mordor -- I am Done!!!
    Fyodor -- I am Done!!!


Belongs to Kategori Programmering.