Asynchronous Messaging
Hello, I'm getting my feet wet in Python and thought I'd try to see how well Python works for asynchronous messaging. I've been using asynchronous messaging for 5 years and find it advantageous for many things. In the scheme I use communication is not only asynchronous but it is also non- blocking and inherently serialized via a queue. This provides two benefits, a sender is never directly effected by the receiver and since the receiver handles only one message at a time it generally never has to use mutexes or semaphores. This allows for the programmer to use multiple threads without having to contend with the associated issues mutexes have in the area of deadly embraces or race conditions. Anyway, below is a link to the first pass at an implementation in Python, comments are welcome: http://www.saville.com/python/mproc.tgz Regards, Wink Saville -- http://mail.python.org/mailman/listinfo/python-list
Re: Asynchronous Messaging
On Sep 26, 4:47 am, wink <[EMAIL PROTECTED]> wrote:
To make it easier to comment on the code, I'm including
"mproc.py" file below. Fredrik, was commenting about using
Queue and in fact I do. Queue is quite nice and is also
thread safe, which is a requirement for this implementation.
But its performance is poor if the number of items on a
Queue becomes large because it is implemented using a list.
One of the things I was thinking of was doing another implementation
using of Queue which was based on deque.
"""Message Processor Module.
This modules allows programmers to create compnents
which communicate asynchronously via messages. In
addition the receiving component is will only handle
one message at a time. This allows the programmer to
create multi-threaded program with fewer shared memory
thus fewer mutex's and semaphore's.
The basic communication is via an instance of the
Msg class which is sent to mproc's using the send
method. An Mproc is an active component which has a
thread and executes asynchronously from all other
Mproc's by using one MprocDriver for each Mproc. It
is also possible for several mproc's to share one
MprocDriver by using BaseMproc.
Each mproc must override the _handler method. When a
message arrives for a mproc is it placed in a Queue
and the driver calls the _handler method passing the
Msg as a parameter. The driver uses the Queue to
serialize the message processing, thus the _handler
method will be invoked with one message at a time.
Thus the _handler method does not generally need to
use mutex's or semaphore's. But because each message's
processing must be completed before the next message
will be started it is important that the message be
processed as quickly as possible.
Add more documentation."""
import copy
import threading
import Queue
import traceback
class Msg:
"""A message"""
def __init__(self):
"""Initializer"""
self.dstMpId = None
self.dstCnId = None
self.srcMpId = None
self.srcCnId = None
self.mid = None
self.cmd = None
self.tag = None
self.status = None
self.data = None
def dup(self):
"""Duplicate the message"""
msgCopy = copy.deepcopy(self)
return msgCopy
def send(self):
"""Send a message.
Returns True if the message was started on its way"""
try:
MprocDriver._mpList[self.dstMpId]._msgQueue.put(self)
return True
except:
return False
class BaseMproc:
"""Message Processor.
A message processor requires a handler method and another
driver which passes messages to it. This mproc driver has
one overriding requirement, it must only pass one message at
a time to the handler method. This eliminates any need for
the programmer to worry about multi-threading issues while
processing messages.
This does put a burden on the handler routine, it must process
messages quickly and in a non-blocking fashion so that the
mproc may remain lively.
The name of an BaseMproc must be unique, an exception is thrown
if the name is already used."""
def __init__(self, name, mprocDriver):
"""Initializer"""
self.name = name
addMproc(self, mprocDriver)
def close(self):
"""Close the mproc"""
#print "BaseMproc.close: ", self.name
try:
rmvMproc(self)
except:
#print "BaseMproc.close: excption"
traceback.print_exc()
self._unreg()
def _handler(self):
"""Override this routine."""
raise Exception("BaseMproc._handler needs to be overridden")
def _reg(self, mprocDriver, id):
"""Register the mproc driver for this mproc"""
self._mprocDriver = mprocDriver
self._msgQueue = mprocDriver._msgQueue
self.id = id
def _unreg(self):
"""Unregister the mproc driver for this mproc"""
self._mprocDriver = None
self._msgQueue = None
self.id = None
class Mproc(BaseMproc):
"""Active Message Processor.
An active message processor isa BaseMproc but it always creates a
MprocDriver instance as its driver"""
def __init__(self, name):
"""Initializer"""
BaseMproc.__init__(self, name,
MprocDriver("ActiveMprocDriver_" + name))
def close(self):
"""Close the active mproc"""
try:
this_mprocDriver = s
Re: Asynchronous Messaging
Fredrik, You are most correct, but Queue is slow compared to deque but not for the reason I guessed. Apparently it's because deque is implemented in C while Queue is in python. Using the program below it looks there is about a 35:1 speed difference. 100 d.append 0.11s 0.1097us per 100 d.popL0.11s 0.1097us per 100 q.put 0.000429s 4.2892us per 100 q.get 0.000391s 3.9077us per So someday it _might_ warrant adding Queue to collections. #!/usr/bin/python import timeit import Queue from collections import deque setupD = """ from collections import deque d=deque() cnt = %d for x in xrange(cnt): d.append(x) """ setupQ = """ import Queue q=Queue.Queue() cnt = %d for x in xrange(cnt): q.put(x) """ def main(): cnt = 100 t = timeit.Timer(setup=setupD % cnt, stmt="d.append(0)") time = min(t.repeat(1, cnt)) print " %9d d.append %fs %7.4fus per" % \ (cnt, time, (time/cnt) * 100.0) t = timeit.Timer(setup=setupD % cnt, stmt="d.popleft()") time = min(t.repeat(1, cnt)) print " %9d d.popL%fs %7.4fus per" % \ (cnt, time, (time/cnt) * 100.0) t = timeit.Timer(setup=setupQ % cnt, stmt="q.put(0)") time = min(t.repeat(1, cnt)) print " %9d q.put %fs %7.4fus per" % \ (cnt, time, (time/cnt) * 100.0) t = timeit.Timer(setup=setupQ % cnt, stmt="q.get()") time = min(t.repeat(1, cnt)) print " %9d q.get %fs %7.4fus per" % \ (cnt, time, (time/cnt) * 100.0) if __name__ == "__main__": main() -- http://mail.python.org/mailman/listinfo/python-list
Re: Asynchronous Messaging
> > That't not the reason. A Queue is built around a container, and it happens > to be a deque in the default implementation. But the important thing is > that a Queue is a synchronized object - it performs the necesary > synchronization to ensure proper operation even from multiple threads > attempting to use it at the same time. > So your comparison is meaningless, apart from telling that using mutexes > is not cheap. > > -- > Gabriel Genellina Interesting, from the documentation for deque says; "Deques support thread-safe, ..." which would seem to imply a mutex of some sort would be used. But in looking at collectionsmodule.c for 2.5.1 I don't see any mutex which would seem to imply there is one place else, maybe the GIL, or the documentation is incorrect. On the other hand looking at the code in Queue.py there is a more code plus the not_full Condition variable plus the call to _put (which is a call to the deque.append) plus a notify on the not_empty conditional, so it's not surprising it's slower. If and when I think Queue is a performance impediment to an mproc I'll take another look at it, for now lots of things to learn. Thanks, Wink -- http://mail.python.org/mailman/listinfo/python-list
Pythonic way for handling file errors
Hello,
I would like to know what would be considered the most
Pythonic way of handling errors when dealing with files,
solutions that seem reasonable using 2.5:
---
try:
f = open('afile', 'r')
content = f.read()
error = 200
except Exception:
error = 404
finally:
if locals().has_key('f'):
f.close()
--
try:
f = open('afile', 'r')
content = f.read()
except Exception:
error = 404
else:
error = 200
finally:
if locals().has_key('f'):
f.close()
---
try:
f = open('afile', 'r')
content = f.read()
error = 200
except Exception:
error = 404
finally:
try:
f.close()
except Exception:
pass
---
try:
f = None
f = open('afile', 'r')
content = f.read()
error = 200
except Exception:
error = 404
finally:
if f:
f.close()
try:
with open('afile', 'r') as f:
content = f.read()
error = 200
except Exception:
error = 404
Of the above I think I like the last one best, but I think
I'd really like to have:
with open('afile', 'r') as f with exceptions:
content = f.read()
error = 200
except Exception:
error = 404
Another words from looking at PEP343 it is the author
of the object returned by the with expression that gets
to decide if exceptions are re-raised. But it would seem
to me it should be the programmer using it that should
decide.
Of course as a newbie, I may be way off base.
Thanks,
Wink Saville
--
http://mail.python.org/mailman/listinfo/python-list
