On 04/18/2013 02:16 AM, Chuck Mayers wrote:
Hi! I was having a bit of nostalgia today, and thought I'd try to
write a simple, old school BBS. I found the 'paramiko' library, and
I've got something I can SSH into that would have impressed my 1990's
self.
I found some example code of the "threading" library, and I've managed
to use it to take multiple incoming connections at once.
I've never done any multithreaded programming, and everything I've
ever read is essentially "don't do it! It's really hard to get right!"
True it canbe difficult, but how are you supposed to learn if you don't
try it, and sometimes you need to do two things at once or at least nearly.
Everything I've read today says to use the Queue library
I can't seem to wrap my head around it, though. I've not seen any
example code that looks like what I'm trying to do.
Some examples of things I'd like to do:
Have one thread open a file (that all threads will want to read/write
to) and update it
Have one thread broadcast to all other threads a "chat message"
The only way I can think of, with the Queue library, would be the
following (I know this has to be the "wrong way" to do this):
1. create an object MultiTask which will have methods to handle thread
sensitive things like "update a file", etc
2. create one instance of it (we'll call it mtask) and add it to the queue
3. each thread, when it needs to, does queue.get() to get that instance
4. the thread then calls whatever it needs to, say
mtask.update_users_file()
5. then, call queue.put(mtask) to put it back on the queue
Yeah, I am not sure that will work but maybe. What I did was to model
the Mediator Pattern kind of, it then controls the queue and assigns the
workload to multiple threads creating new threads as needed. Not that
my way is the best just how I did it.
http://sourcemaking.com/design_patterns/mediator
My Mediator does not follow the design pattern very well, it is called
dispatcher, and it uses workers to get the Jobs done, jobs are actually
defined too. Additionally the code is not as good as it could be,
because I have not used in for many months, but the tests should work.
It was made with Python3 in mind. It is attached.
so, there is only ever 1 object in the queue. I assume when another
thread tries to get() it, they block until the other threads are done
with it.
Will this work? Is there a better way to do this?
Thanks!
_______________________________________________
Tutor maillist - Tutor@python.org
To unsubscribe or change subscription options:
http://mail.python.org/mailman/listinfo/tutor
# -*- coding: utf-8 *-*
# Provides event driven threaded workers and a dispatcher for handling multiple
# tasks simultaneously.
#Import sys for Python version check, before we import and cause a traceback.
import sys
version = sys.hexversion
#Standard Module Imports
import threading
#Version If Statements:
if version <= 0x02050000:
print ("""Feral Components: Mediator
This script requires Python 2.5 or newer to run.
""")
sys.exit()
if version <= 0x03000000:
from Queue import Queue, Empty
else:
from queue import Queue, Empty
#Non-Standard Module Imports
#Local Module Imports
import mediator #
#Globals
class Job(object):
# Base Class, used to give jobs to Dispatcher.
def __init__(self, func, concurrent=True, name=None, args=None,
callback=None):
self.worker = None # This becomes the threaded worker class instance.
self.function = func # Function to be run.
self.concurrent = concurrent # Run simultaneous. Boolean
self.arguments = args # Arguments that are passed to the function.
self.name = name # The human readable Job name that's used for logging.
self.callback = callback # The callback that is made when the worker
# joins and terminates.
class ThreadDaemon(threading.Thread): # DEPRECIATED
# Base Threading Class for Workers. All Workers are Daemons.
# http://docs.python.org/library/threading.html#threading.Thread.daemon
def __init__(self):
print('DEPRECIATED: ThreadDaemon class was depreciated on 4 June 2012.')
threading.Thread.__init__(self)
self.daemon = True
print('Thread: Initialized. As Thread = ' + self.name)
def __del__(self):
print('Thread: Exiting. As Thread = ' + self.name)
class Worker(ThreadDaemon):
# Worker Completes the function given to it as a seperate thread, using the
# Job object.
def __init__(self, job):
#ThreadDaemon.__init__(self)
self.thread = threading.Thread(target=self.run)
self.thread.daemon = True
self.event = None
self.job = job
def run(self):
print('Worker ' + self.job.name + ': Running. As Thread = ' +
self.thread.name)
if self.job.arguments != None:
self.job.function(self.job.arguments)
else:
self.job.function()
def __del__(self): # The process of deleting the worker blocks Dispatcher.
if self.job.callback != None:
self.job.callback()
#ThreadDaemon.__del__(self)
class Dispatcher(ThreadDaemon):
# Dispatcher is a loop that responds to events by checking it's Queue.
# It is implemented using a seperate thread with a moderate sleep time, so
# it does add slight CPU overhead.
# I felt the need to add Dispatcher to give me a better sense of control, but
# also as a seperate thread that could take care of background tasks as needed.
# Dispatcher prevents the UI from becoming unresponsive, because it handles
# long running tasks.
def __init__(self, Mediator):
ThreadDaemon.__init__(self)
self.queue = Queue()
self.concurrent = []
self.nonconcurrent = []
self.nonconcurrent_running = False
self.Mediator = Mediator
self.running = True
self.Mediator.put(mediator.Event(name='SUBSCRIBE', event='EXIT',
listener=self.queue))
self.Mediator.put(mediator.Event(name='SUBSCRIBE', event='Dispatch_Job',
listener=self.queue))
def dispatch(self, job):
if not job.concurrent:
self.nonconcurrent.append(job)
self.dispatch_nonconcurrent()
else:
job.worker = Worker(job)
job.worker.thread.start()
self.concurrent.append(job)
# All workers called by the Dispatcher must be Threads.
# OPTIMIZE: Read Below.
# http://stackoverflow.com/questions/481970/how-many-threads-is-too-many
def dispatch_nonconcurrent(self):
if not self.nonconcurrent_running:
if len(self.nonconcurrent) > 0:
self.nonconcurrent_running = True
job = self.nonconcurrent[0]
job.worker = Worker(job)
job.worker.thread.start()
else:
if not self.nonconcurrent[0].worker.thread.is_alive():
self.nonconcurrent_running = False
del self.nonconcurrent[0].worker # __del__
del self.nonconcurrent[0]
self.queue.task_done()
self.dispatch_nonconcurrent()
def run(self):
print('Dispatcher: Running. As Thread = ' + self.name)
while self.running:
try:
event = self.queue.get(True, 0.8)
if not event.name == 'EXIT':
job = event.object
self.dispatch(job)
else:
self.running = False
except Empty:
self.dispatch_nonconcurrent()
if len(self.concurrent) > 0:
for job in self.concurrent:
if not job.worker.thread.is_alive():
self.queue.task_done()
self.concurrent.remove(job)
del job.worker # __del__
del job
if __name__ == '__main__':
def test_func(args=None):
i = 0
while i < 5:
import time
time.sleep(1)
print('Test_Func: Loop ' + str(i) + ' complete.')
if args != None:
print('Test_Func: Arguments = ' + str(args))
i += 1
Mediator = mediator.Mediator()
Mediator.start()
Dispatcher = Dispatcher(Mediator.queue)
Dispatcher.start()
job = Job(func=test_func, concurrent=False, name='Test Func async',
args='async', callback=test_func)
Mediator.queue.put(mediator.Event(name='Dispatch_Job', object=job))
Mediator.queue.put(mediator.Event(name='Dispatch_Job', object=job))
job = Job(func=test_func, concurrent=True, name='Test Func',
args='not async', callback=test_func)
job1 = Job(func=test_func, concurrent=True, name='Test Func',
args='not async', callback=test_func)
Mediator.queue.put(mediator.Event(name='Dispatch_Job', object=job))
Mediator.queue.put(mediator.Event(name='Dispatch_Job', object=job1))
import time
time.sleep(10)
Mediator.queue.put(mediator.Event(name='EXIT'))
Dispatcher.join()
_______________________________________________
Tutor maillist - Tutor@python.org
To unsubscribe or change subscription options:
http://mail.python.org/mailman/listinfo/tutor