On Tue, 1 Mar 2005, Mark Kels wrote: > Can anyone give me a very simple example on thread programming ?
I don't think a simple example is possible, given that threads are inherently for slightly more complex processing than you ordinarily do. That being said, here's an example. This is a made-up process. It shows both queuing and an arbitrary number of threads. First, the imports: ########################### import random, os, time, sys import threading, Queue ########################### No, I'll define a class for the "work" to be performed. Basically, each work unit just specifies how much time the system is supposed to wait, in second; pretend that it's doing some amount of work that takes some number of seconds to be performed: ########################### class workunit(object): """ Sample object to be put on a queue similating work to be done. variables: counter: a serial number just for identification purposes. waittime: the time in second it uses up. Done: a flag used for a special-purpose end-of-queue sentinel, in which case waittime is ignored. """ counter = 1 def __init__(self, waittime=0, Done=False): self.counter = workunit.counter self.waittime = waittime self.Done = Done workunit.counter += 1 ########################### This will be called in one of two ways: w = workunit(waittime=20) # to indicate wait for 20 seconds; or w = workunit(Done=True) # to add a "dummy" work unit to the queue so # everyone know the queue is empty and finished. Okay, imagine a queue (or just a list) full of work units like the above. Here's a plain old sequential NON-THREAD way of processing this: ########################### def ProcessQueue(work_queue): """ Method to process an element from the queue (Non-Threaded implementation). All it does is loop, doing the following: pull an element from the queue (break out if it's a "Done" marker) print a starting message; wait for the specified amount of time; print an ending message """ while True: queue_entry = work_queue.get() if queue_entry.Done: break print "%s starting on workunit %d, %d secs" % \ (time.asctime(), queue_entry.counter, queue_entry.waittime) time.sleep(queue_entry.waittime) print "%s ending for workunit %d" % \ (time.asctime(), queue_entry.counter) ############################ Okay, understand that, first See what it's doing? It's just popping things off the work_queue, printing a message, waiting for the indicated amount of time in the work unit, and printing another message; then starting over. Now, let's try the same approach with threads. Firrst, the class declaration: ############################# class ThreadProcessQueue(threading.Thread): """ This is a Threaded equivalent to ProcessQueue(). """ ############################# Now, here's the ThreadProcessQueue.__init__: ############################# def __init__(self, threadname, work_queue, **kwds): self.tname = threadname self.work_queue = work_queue threading.Thread.__init__(self, **kwds) print "%s Thread %s started" % (time.asctime(), self.tname) ############################# The parameters here are an arbitrary name for the thread, and the queue it will process. All __init__ does is print a message that the thread started. Here's the guts of it, the ThreadProcessQueue.__run__ method. NOte how similar it is to the non-Threaded version: ############################# def run(self): while True: queue_entry = work_queue.get() if queue_entry.Done: break print "%s Thread %s starting on workunit %d, %d secs" % \ (time.asctime(), self.tname, queue_entry.counter, queue_entry.waittime) time.sleep(queue_entry.waittime) print "%s Thread %s ending for workunit %d" % \ (time.asctime(), self.tname, queue_entry.counter) print "%s %s thead ending." % (time.asctime(), self.tname) self.work_queue.put(queue_entry) ############################# The only real difference is that the messages produced include an identifier so you can see which thread is generating which message; and also there's that self.work_queue.put(queue_entry) at the end. I'll discuss that at the end of this message. Now, here's the main program that uses these. First some setup: ############################ print "MAIN: %s starting..." % (time.asctime()) work_queue = Queue.Queue() NumWorkUnits=8 NumThreads=3 WaitTimes = [3,6,9,12,1,5,5,1] lenWaitTimes = len(WaitTimes) # WaitTimes is just a list of some arbitrary times representing work # A particular WorkUnit will wait for one of these times. ThreadList=[] ########################### Queue is s specialized type of FIFO list, which is made to be shared among concurrently running threads. We use that instead of a plain list. NumWorkUnits and NumThreads are just constants, for the number of work units that we'll put in the queue, and the number of threads that will read from them. WaitTimes is just an arbitrary list of numbers that we'll randomly select from later. Every work unit will get a random one of these, which indicates how long it should take to be processed (e.g., 1 for a 1-second workunit, 12 for a 12-second workunit, etc. ThreadList is a list that will contain all the threads. Let's get those threads going: ################################# # make up a list of threads (not started yet) for i in range(1,NumThreads+1): ThreadName = "T%03d" % i ThreadList.append(ThreadProcessQueue(ThreadName, work_queue)) ################################# Okay, this has created three threads (NumThreads = 3), with names T001, T002 and T003. Each has been passed the work_queue (which is still empty), and all three threads have been added to the ThreadList. The threads all exist at this point, but have not yet been started. That's next: ############################### # start the Threads for t in ThreadList: t.start() print "%s MAIN: all threads started" % (time.asctime()) ############################### This just starts the threads. The __run__ method in each thread starts running. The problem is, with the work queue empty, they'll just sit there. So, let's start putting stuff into the work queue: ############################### # Start putting things on the queue for i in range(NumWorkUnits): random_wait = WaitTimes[int(random.uniform(0,len(WaitTimes)))] w = workunit(random_wait) work_queue.put(w) ############################### This just selects a random amount of time to wait from the WaitTimes list, creates a work unit specifying that amount of time, and puts the workunit on the queue. At this point, the threads should start waking up; they were all sitting on a queue.get for an empty queue. NOw that they can start pulling things off of it, they will. I also want to put an end-of-queue element here: ############################### # Put a shutdown indicator work_queue.put(workunit(Done=True)) ############################### That's it! Just for grins, we can add a "final" shutdown message: ############################ print "%s MAIN: all done." % (time.asctime()) ############################ But you'll see that doesn't work very well. Here's what I see as output on one run: Wed Mar 02 00:40:05 2005 MAIN starting... Wed Mar 02 00:40:05 2005 Thread T001 started Wed Mar 02 00:40:05 2005 Thread T002 started Wed Mar 02 00:40:05 2005 Thread T003 started Wed Mar 02 00:40:05 2005 MAIN: all threads started Wed Mar 02 00:40:05 2005 Thread T001 starting on workunit 1, 5 secs Wed Mar 02 00:40:05 2005 Thread T002 starting on workunit 2, 12 secs Wed Mar 02 00:40:05 2005 Thread T003 starting on workunit 3, 9 secs Wed Mar 02 00:40:05 2005 MAIN: all done. Wed Mar 02 00:40:10 2005 Thread T001 ending for workunit 1 Wed Mar 02 00:40:10 2005 Thread T001 starting on workunit 4, 6 secs Wed Mar 02 00:40:14 2005 Thread T003 ending for workunit 3 Wed Mar 02 00:40:14 2005 Thread T003 starting on workunit 5, 12 secs Wed Mar 02 00:40:16 2005 Thread T001 ending for workunit 4 Wed Mar 02 00:40:16 2005 Thread T001 starting on workunit 6, 12 secs Wed Mar 02 00:40:17 2005 Thread T002 ending for workunit 2 Wed Mar 02 00:40:17 2005 Thread T002 starting on workunit 7, 1 secs Wed Mar 02 00:40:18 2005 Thread T002 ending for workunit 7 Wed Mar 02 00:40:18 2005 Thread T002 starting on workunit 8, 1 secs Wed Mar 02 00:40:19 2005 Thread T002 ending for workunit 8 Wed Mar 02 00:40:19 2005 T002 thead ending. Wed Mar 02 00:40:26 2005 Thread T003 ending for workunit 5 Wed Mar 02 00:40:26 2005 T003 thead ending. Wed Mar 02 00:40:28 2005 Thread T001 ending for workunit 6 Wed Mar 02 00:40:28 2005 T001 thead ending. >From the queue's point of view, here is how the work got parcelled out among threads: Work units: 1: 5 sec (T001) 2: 12 sec (T002) 3: 9 sec (T003) 4: 6 sec (T001) 5: 12 sec (T003) 6: 12 sec (T001) 7: 1 sec (T002) 8: 1 sec (T002) Here's a graphical view of what the threads were doing: T001: 11111444444666666666666 T002: 22222222222278 T003: 333333333555555555555 You can see that T002 ended first, followed by T003 and then T001 (which matches the timestamps). Okay, a couple oddities. First, that self.work_queue.put(queue_entry) line. This is so that when the first thread to see the Done marker sees it, it puts it back on the queue before it exits. That way, each of the other threads sees it, puts it back on for the next and exits. (I could have instead written this just to exeit when the queue was empty, but there are some ugly issues with that, too, for example, if the thing that loads up the queue pauses, the queue empties and all the threads quit, and then the queue loader adds more, with the threads already gone.) The other oddity: Note that the MAIN "final message" issued long before the threads were done. I wanted this example to nicely wait for all the threads to end before putting out that message, but couldn't figure out how to do that. For those familiar with threads: calls to t.isAlive() returned True long after the thread referred to had finished up and put out its final shitdown message. Anyway, I hope this helps as a bit of a tutorial. _______________________________________________ Tutor maillist - Tutor@python.org http://mail.python.org/mailman/listinfo/tutor