Python Multiprocessing Fork Waitpid And Queue
Here I investigate a few ways to do multiprocessing in Python. I investigate Fork, Thread, Multiprocessing and a Queue.
First Step: Fork
In this first step we make a spork with the value 42. After forking with os.fork the two processes are cloned. Now they both set the value to different things. Even though the child does the last modification (2 sec delay instead of 1 sec.) the parent process does not keep its value. Why is this?
Well, you might want to read up on it a bit in Wikipedia: [1]. But in this particular case you have two spork instances in the two processes - these are not the same spork. So no matter how much the child alters its spork, the parent spork will remain unchanged.
The output under cygwin looks something like this (notice that the child process sets the fish but that it does not impact the parent):
$ python fork1.py [4440]: 42 [4440]: parent says child has pid 5880 [4440]: 1337 [5880]: fish [4440]: 1337
The code in this case is:
import os from time import sleep class spork: def __init__(self, n): self._n = n return def update(self, n): self._n =n def hello(self): print "[%s]: %s" % (os.getpid(), self._n) return if __name__ == '__main__': s = spork(42) s.hello() pid = os.fork() if pid: # parent print "[%s]: parent says child has pid %s" % (os.getpid(), pid) sleep(1) s.update(1337) s.hello() os.waitpid(pid, 0) else: #child sleep(2) s.update('fish') s.hello() exit(0) s.hello()
Step 2: pipe between processes
In my case I want the value to be passed from the child to the parent process. A common way seems to be to use pipes. A pipe is created and the child is allowed to write and the parent read - this way the child can pass a message to the parent. In this example I send a message a few times but it feels kinda sloppy. Notice in the output that the child passes a value to the parent:
$ python fork2.py [1416]: 42 [1416]: parent says child has pid 5636 [5636]: fish [1416]: fish [5636]: stingray [1416]: stingray [5636]: cocoon [1416]: cocoon [5636]: yogaplant [1416]: yogaplant [1416]: yogaplant
The code here uses a fork, and os.fdopen to open the read and write from the pipe as file- or stream-like handles:
if __name__ == '__main__': # # open a pipe that the child writes to and the parent reads from # see http://www.myelin.co.nz/post/2003/3/13/ # s = spork(42) s.hello() r, w = os.pipe() pid = os.fork() if pid: # parent now closes write and reads print "[%s]: parent says child has pid %s" % (os.getpid(), pid) sleep(1) os.close(w) handle = os.fdopen(r) while True: fish = handle.readline().strip() if not fish: break s.update(fish) s.hello() os.waitpid(pid, 0) else: #child now closes read and writes sleep(2) os.close(r) handle = os.fdopen(w, 'w') for fish in ['fish', 'stingray', 'cocoon', 'yogaplant']: sleep(1) s.update(fish) s.hello() handle.write(fish+'\n') handle.flush() handle.close() exit(0) s.hello()
Step 3: Multiprocess safe Queue
The real problem I had required an unknown amount of values to be send. Here I also added a kill pill.
import os from time import sleep from Queue import Empty from multiprocessing import Queue from signal import SIGTERM, SIGKILL class safe_spork(): def __init__(self, q, n): self._q = q self._n = n self.poll() self.pid = None return def set_pid(self, pid): self.pid = pid def add(self, n): self.hello('Spork adds %s to queue' % n) self._q.put_nowait(n) return def poll(self): if self._q.empty(): self.hello('there was nothing new in the queue(?)') try: self._n = self._q.get_nowait() self.hello('I found something new') except Empty, e: self.hello('Found nothing in the queue') return False if self._n == 'killpill': self.hello('Mmmh, cyanide...') try: os.kill(self.pid, SIGTERM) except OSError: pass sleep(0.55) try: os.kill(self.pid, SIGKILL) except OSError: pass exit(0) return True def read(self): return self._n def hello(self, msg=""): print "[%s]: %s | %s" % (os.getpid(), self.read(), msg ) return def check(self): self.hello("queue has %s items" % self._q.qsize()) if __name__ == '__main__': # # use a multiprocess Queue # q = Queue() s = safe_spork(q, 42) s.hello() pid = os.fork() if pid: # parent s.set_pid(pid) print "[%s]: parent says child has pid %s" % (os.getpid(), pid) sleep(1.2) s.add(1337) s.add(1337) s.poll() print "parent will wait for kid" ret = False while not ret: try: # toggle here for interesting results ret = os.waitpid(pid, os.WNOHANG)[0] #ret = os.waitpid(pid, 0)[0] except OSError, e: ret = True print "kid completed" print "parent is still waiting - check queue [%s]" % ret s.poll() sleep(0.97) print "parent is done waiting for kid - clear queue and exit" something = True it = 40 while (something) and (it > 0): it -= 1 something = s.poll() sleep(0.16) print "queue is empty - let's move on, it was %s" % it else: #child sleep(2.6) for i in xrange(5): s.add('fish%s' % i) print "first batch of fish is away!" sleep(7.1) s.add('killpill') print "killpill sent!" sleep(5.1) for i in xrange(5,12): s.add('fish%s' % i) print "kid added more fish, will now exit" exit(0) s.check()
In the naive case we await the child and then process the queue - we use ret = os.waitpid(pid, 0)[0]. Here the parent is the 14033-process and the child the 14034-process. The output will be in this form:
- setup
- parent will start waitin
- kid adds items in queue
- when kid is complete the parent will process the queue.
[14033]: 42 | there was nothing new in the queue(?) [14033]: 42 | Found nothing in the queue [14033]: 42 | [14033]: parent says child has pid 14034 [14033]: 42 | Spork adds 1337 to queue [14033]: 42 | Spork adds 1337 to queue [14033]: 1337 | I found something new parent will wait for kid [14034]: 42 | Spork adds fish0 to queue [14034]: 42 | Spork adds fish1 to queue [14034]: 42 | Spork adds fish2 to queue [14034]: 42 | Spork adds fish3 to queue [14034]: 42 | Spork adds fish4 to queue first batch of fish is away! [14034]: 42 | Spork adds killpill to queue killpill sent! [14034]: 42 | Spork adds fish5 to queue [14034]: 42 | Spork adds fish6 to queue [14034]: 42 | Spork adds fish7 to queue [14034]: 42 | Spork adds fish8 to queue [14034]: 42 | Spork adds fish9 to queue [14034]: 42 | Spork adds fish10 to queue [14034]: 42 | Spork adds fish11 to queue kid added more fish, will now exit parent is still waiting - check queue [14034] [14033]: 1337 | I found something new parent is done waiting for kid - clear queue and exit [14033]: fish0 | I found something new [14033]: fish1 | I found something new [14033]: fish2 | I found something new [14033]: fish3 | I found something new [14033]: fish4 | I found something new [14033]: killpill | I found something new [14033]: killpill | Mmmh, cyanide...
In the second version we run the processes in parallel with ret = os.waitpid(pid, os.WNOHANG)[0]. Here the parent will not wait for the child - instead it will poll the queue over and over again.
[14123]: 42 | there was nothing new in the queue(?) [14123]: 42 | Found nothing in the queue [14123]: 42 | [14123]: parent says child has pid 14124 [14123]: 42 | Spork adds 1337 to queue [14123]: 42 | Spork adds 1337 to queue [14123]: 42 | there was nothing new in the queue(?) [14123]: 1337 | I found something new parent will wait for kid parent is still waiting - check queue [0] [14123]: 1337 | I found something new parent is still waiting - check queue [0] [14123]: 1337 | there was nothing new in the queue(?) [14123]: 1337 | Found nothing in the queue [14124]: 42 | Spork adds fish0 to queue [14124]: 42 | Spork adds fish1 to queue [14124]: 42 | Spork adds fish2 to queue [14124]: 42 | Spork adds fish3 to queue [14124]: 42 | Spork adds fish4 to queue first batch of fish is away! parent is still waiting - check queue [0] [14123]: fish0 | I found something new parent is still waiting - check queue [0] [14123]: fish1 | I found something new parent is still waiting - check queue [0] [14123]: fish2 | I found something new parent is still waiting - check queue [0] [14123]: fish3 | I found something new parent is still waiting - check queue [0] [14123]: fish4 | I found something new parent is still waiting - check queue [0] [14123]: fish4 | there was nothing new in the queue(?) [14123]: fish4 | Found nothing in the queue parent is still waiting - check queue [0] [14123]: fish4 | there was nothing new in the queue(?) [14123]: fish4 | Found nothing in the queue [14124]: 42 | Spork adds killpill to queue killpill sent! parent is still waiting - check queue [0] [14123]: killpill | I found something new [14123]: killpill | Mmmh, cyanide...
Step 4: Multiprocessing
Since I am a pretty curious person I wanted to know how easy or intuitive the multiprocessing library in standard Python is - so I tested working with it as well. It turns out it is very intuitive. In this example I create 8 child processes that with the parent:
- spawns new processes with multiprocessing.Process(target=subwork_A, args=args)
- Starts, terminates or joins processes with
- worker.start()
- worker.terminate()
- worker.join
- The parent checks the status of the child with worker.is_alive() (a lot nicer than saying: I want to wait for a PID but I don't want to wait - os.waitpid(pid, os.WNOHANG).)
- Write to standard out in a safe way - using a multiprocessing.Lock().
I also add some testing of the at exit (atexit) module. With it I let the system take care of terminating the children if I were to encounter an exception or do exit(): atexit.register(worker.terminate).
import multiprocessing import time import sys import atexit from signal import SIGKILL, SIGTERM from os import kill, getpid def log(logstream, lock, msg): msg = "%s: %s" % (multiprocessing.current_process().name, msg) with lock: logstream.write(msg + '\n') logstream.flush() return def subwork_A(logstream, name, n, delay, lock): log(logstream, lock, "%s: I will do some subwork" % name) for i in xrange(n): time.sleep(delay) log(logstream, lock, "%s: Work iteration %s of %s" % (name, i, n)) log(logstream, lock, "%s: Done working" % name) return def subwork_B(logstream, name, n, delay, lock): log(logstream, lock, "%s: I only do fast stuff" % name) for i in xrange(n): time.sleep(delay/4) log(logstream, lock, "%s: Look how fast I finised %s" % (name, i)) log(logstream, lock, "%s: Done working" % name) return if __name__ == "__main__": stdoutlock = multiprocessing.Lock() names = ['Fyodor', 'Mordor', 'Compilator', 'Terminator'] its = [1, 1, 2, 5] delays = [0.4, 0.7, 0.47, 0.092] workers = list() for i in xrange(4): args = (sys.stdout, 'A-'+names[i], its[i], delays[i], stdoutlock) workers.append(multiprocessing.Process(target=subwork_A, args=args)) args = (sys.stdout, 'B-'+names[i], its[i], delays[i], stdoutlock) workers.append(multiprocessing.Process(target=subwork_B, args=args)) log(sys.stdout, stdoutlock, "all workers in position") for worker in workers: worker.start() atexit.register(worker.terminate) log(sys.stdout, stdoutlock, "all workers started") if False: # True: log(sys.stdout, stdoutlock, "will the workers die now?") # there are not caught by atexit.register #kill(getpid(), SIGKILL) #kill(getpid(), SIGTERM) # there are caught by atexit.register #exit() raise Exception('hello') for _ in xrange(3): all_dead = True for worker in workers: if worker.is_alive(): all_dead = False if all_dead: log(sys.stdout, stdoutlock, "all workers are dead") break else: log(sys.stdout, stdoutlock, "they are not all dead") time.sleep(0.19) log(sys.stdout, stdoutlock, "I've had enough and want to exit now!") for worker in workers: if worker.is_alive(): log(sys.stdout, stdoutlock, "silence %s I kill you!" % worker.name) worker.terminate() for worker in workers: worker.join() log(sys.stdout, stdoutlock, "all workers joined")
As the code is written above you get output similar to the below. By raising an exception in the if-false-branch the output will be different. Also note that atexit does not catch SIGKILL or SIGTERM.
MainProcess: all workers in position Process-1: A-Fyodor: I will do some subwork Process-2: B-Fyodor: I only do fast stuff Process-3: A-Mordor: I will do some subwork Process-4: B-Mordor: I only do fast stuff Process-5: A-Compilator: I will do some subwork Process-6: B-Compilator: I only do fast stuff Process-7: A-Terminator: I will do some subwork MainProcess: all workers started MainProcess: they are not all dead Process-8: B-Terminator: I only do fast stuff Process-8: B-Terminator: Look how fast I finised 0 Process-8: B-Terminator: Look how fast I finised 1 Process-8: B-Terminator: Look how fast I finised 2 Process-7: A-Terminator: Work iteration 0 of 5 Process-8: B-Terminator: Look how fast I finised 3 Process-2: B-Fyodor: Look how fast I finised 0 Process-2: B-Fyodor: Done working Process-8: B-Terminator: Look how fast I finised 4 Process-8: B-Terminator: Done working Process-6: B-Compilator: Look how fast I finised 0 Process-4: B-Mordor: Look how fast I finised 0 Process-4: B-Mordor: Done working Process-7: A-Terminator: Work iteration 1 of 5 MainProcess: they are not all dead Process-6: B-Compilator: Look how fast I finised 1 Process-6: B-Compilator: Done working Process-7: A-Terminator: Work iteration 2 of 5 Process-7: A-Terminator: Work iteration 3 of 5 MainProcess: they are not all dead Process-1: A-Fyodor: Work iteration 0 of 1 Process-1: A-Fyodor: Done working Process-7: A-Terminator: Work iteration 4 of 5 Process-7: A-Terminator: Done working Process-5: A-Compilator: Work iteration 0 of 2 MainProcess: I've had enough and want to exit now! MainProcess: silence Process-3 I kill you! MainProcess: silence Process-5 I kill you! MainProcess: all workers joined
Step 5: Thread
Example with threads inspired by Doug Hellmann's Python Module of the Week, see [2].
We use a Multiprocess Queue to add jobs from the main process. The threads pull jobs from it and process them one by one. We use -1 as a kill pill.
from threading import Thread from multiprocessing import Queue from Queue import Empty from time import sleep import logging logging.basicConfig(level=logging.DEBUG, format='%(message)s') class MyThread(Thread): """My worker thread""" def __init__(self, logger, queue, name, delay): """Ctor""" Thread.__init__(self) self.q = queue self.delay = delay self.name = name self.logger = logger self.log("I am alive now.") return def start(self): self.log("Starting") Thread.start(self) return def log(self, message): """Do some logging""" self.logger(" %s -- %s" % (self.name, message)) return def work(self, item): """Do the real work""" self.log("Working on item %s" % item) sleep(self.delay * 0.77) return item > 0 def run(self): """Work loop""" alive = True while alive: try: item = self.q.get_nowait() alive = self.work(item) except Empty: self.log("No job for me.") sleep(self.delay * 3.13) self.log("I am Done!!!") return if __name__ == "__main__": logger = logging.debug q = Queue() def mylog(message): logger("%s" % message) names = ['Fyodor', 'Mordor', 'Terminator'] delays = [1, 0.7, 0.47] threads = list() mylog("Spawning threads") for i in xrange(3): t = MyThread(logger, q, names[i], delays[i]) threads.append(t) sleep(1.7) mylog("Start the threads") for thread in threads: thread.start() sleep(4.7) mylog("Add some jobs, then allow some jobs to finish") for N in xrange(100, 170, 9): q.put(N) sleep(2.2) mylog("Kill threads (once all jobs are done)") for _ in threads: q.put(-1)
The output is
Spawning threads Fyodor -- I am alive now. Mordor -- I am alive now. Terminator -- I am alive now. Start the threads Fyodor -- Starting Mordor -- Starting Fyodor -- No job for me. Mordor -- No job for me. Terminator -- Starting Terminator -- No job for me. Terminator -- No job for me. Mordor -- No job for me. Terminator -- No job for me. Fyodor -- No job for me. Mordor -- No job for me. Terminator -- No job for me. Add some jobs, then allow some jobs to finish Terminator -- Working on item 100 Terminator -- Working on item 109 Fyodor -- Working on item 118 Mordor -- Working on item 127 Terminator -- Working on item 136 Kill threads (once all jobs are done) Terminator -- Working on item 145 Fyodor -- Working on item 154 Mordor -- Working on item 163 Terminator -- Working on item -1 Mordor -- Working on item -1 Terminator -- I am Done!!! Fyodor -- Working on item -1 Mordor -- I am Done!!! Fyodor -- I am Done!!!
Belongs to Kategori Programmering.