Asynchronous Messaging

2007-09-26 Thread wink
Hello,

I'm getting my feet wet in Python and thought I'd try to see how well
Python works for asynchronous messaging. I've been using asynchronous
messaging for 5 years and find it advantageous for many things. In the
scheme I use communication is not only asynchronous but it is also non-
blocking and inherently serialized via a queue.

This provides two benefits, a sender is never directly effected by the
receiver and since the receiver handles only one message at a time
it generally never has to use mutexes or semaphores. This allows for
the
programmer to use multiple threads without having to contend with the
associated issues mutexes have in the area of deadly embraces or race
conditions.

Anyway, below is a link to the first pass at an implementation
in Python, comments are welcome:

http://www.saville.com/python/mproc.tgz

Regards,

Wink Saville

-- 
http://mail.python.org/mailman/listinfo/python-list


Re: Asynchronous Messaging

2007-09-26 Thread wink
On Sep 26, 4:47 am, wink <[EMAIL PROTECTED]> wrote:
To make it easier to comment on the code, I'm including
"mproc.py" file below. Fredrik, was commenting about using
Queue and in fact I do. Queue is quite nice and is also
thread safe, which is a requirement for this implementation.
But its performance is poor if the number of items on a
Queue becomes large because it is implemented using a list.

One of the things I was thinking of was doing another implementation
using of Queue which was based on deque.


"""Message Processor Module.

This modules allows programmers to create compnents
which communicate asynchronously via messages. In
addition the receiving component is will only handle
one message at a time. This allows the programmer to
create multi-threaded program with fewer shared memory
thus fewer mutex's and semaphore's.

The basic communication is via an instance of the
Msg class which is sent to mproc's using the send
method. An Mproc is an active component which has a
thread and executes asynchronously from all other
Mproc's by using one MprocDriver for each Mproc. It
is also possible for several mproc's to share one
MprocDriver by using BaseMproc.

Each mproc must override the _handler method. When a
message arrives for a mproc is it placed in a Queue
and the driver calls the _handler method passing the
Msg as a parameter. The driver uses the Queue to
serialize the message processing, thus the _handler
method will be invoked with one message at a time.
Thus the _handler method does not generally need to
use mutex's or semaphore's. But because each message's
processing must be completed before the next message
will be started it is important that the message be
processed as quickly as possible.

Add more documentation."""

import copy
import threading
import Queue
import traceback

class Msg:
"""A message"""

def __init__(self):
"""Initializer"""
self.dstMpId = None
self.dstCnId = None
self.srcMpId = None
self.srcCnId = None
self.mid = None
self.cmd = None
self.tag = None
self.status = None
self.data = None

def dup(self):
"""Duplicate the message"""
msgCopy = copy.deepcopy(self)
return msgCopy

def send(self):
"""Send a message.

Returns True if the message was started on its way"""

try:
MprocDriver._mpList[self.dstMpId]._msgQueue.put(self)
return True
except:
return False

class BaseMproc:
"""Message Processor.

A message processor requires a handler method and another
driver which passes messages to it. This mproc driver has
one overriding requirement, it must only pass one message at
a time to the handler method. This eliminates any need for
the programmer to worry about multi-threading issues while
processing messages.

This does put a burden on the handler routine, it must process
messages quickly and in a non-blocking fashion so that the
mproc may remain lively.

The name of an BaseMproc must be unique, an exception is thrown
if the name is already used."""

def __init__(self, name, mprocDriver):
"""Initializer"""
self.name = name
addMproc(self, mprocDriver)

def close(self):
"""Close the mproc"""
#print "BaseMproc.close: ", self.name
try:
rmvMproc(self)
except:
#print "BaseMproc.close: excption"
traceback.print_exc()
self._unreg()

def _handler(self):
"""Override this routine."""
raise Exception("BaseMproc._handler needs to be overridden")

def _reg(self, mprocDriver, id):
"""Register the mproc driver for this mproc"""
self._mprocDriver = mprocDriver
self._msgQueue = mprocDriver._msgQueue
self.id = id

def _unreg(self):
"""Unregister the mproc driver for this mproc"""
self._mprocDriver = None
self._msgQueue = None
self.id = None

class Mproc(BaseMproc):
"""Active Message Processor.

An active message processor isa BaseMproc but it always creates a
MprocDriver instance as its driver"""

def __init__(self, name):
"""Initializer"""
BaseMproc.__init__(self, name,
MprocDriver("ActiveMprocDriver_" + name))

def close(self):
"""Close the active mproc"""
try:
this_mprocDriver = s

Re: Asynchronous Messaging

2007-09-26 Thread wink
Fredrik,

You are most correct, but Queue is slow compared to deque but
not for the reason I guessed. Apparently it's because deque is
implemented in C while Queue is in python. Using the program below
it looks there is about a 35:1 speed difference.

100 d.append  0.11s  0.1097us per
100 d.popL0.11s  0.1097us per
100 q.put 0.000429s  4.2892us per
100 q.get 0.000391s  3.9077us per

So someday it _might_ warrant adding Queue to collections.


#!/usr/bin/python

import timeit
import Queue
from collections import deque

setupD = """
from collections import deque
d=deque()
cnt = %d
for x in xrange(cnt):
d.append(x)
"""

setupQ = """
import Queue
q=Queue.Queue()
cnt = %d
for x in xrange(cnt):
q.put(x)
"""

def main():
cnt = 100
t = timeit.Timer(setup=setupD % cnt, stmt="d.append(0)")
time = min(t.repeat(1, cnt))
print "  %9d d.append  %fs %7.4fus per" % \
(cnt, time, (time/cnt) * 100.0)

t = timeit.Timer(setup=setupD % cnt, stmt="d.popleft()")
time = min(t.repeat(1, cnt))
print "  %9d d.popL%fs %7.4fus per" % \
(cnt, time, (time/cnt) * 100.0)

t = timeit.Timer(setup=setupQ % cnt, stmt="q.put(0)")
time = min(t.repeat(1, cnt))
print "  %9d q.put %fs %7.4fus per" % \
(cnt, time, (time/cnt) * 100.0)

t = timeit.Timer(setup=setupQ % cnt, stmt="q.get()")
time = min(t.repeat(1, cnt))
print "  %9d q.get %fs %7.4fus per" % \
(cnt, time, (time/cnt) * 100.0)
if __name__ == "__main__":
main()

-- 
http://mail.python.org/mailman/listinfo/python-list


Re: Asynchronous Messaging

2007-09-27 Thread wink
>
> That't not the reason. A Queue is built around a container, and it happens
> to be a deque in the default implementation. But the important thing is
> that a Queue is a synchronized object - it performs the necesary
> synchronization to ensure proper operation even from multiple threads
> attempting to use it at the same time.
> So your comparison is meaningless, apart from telling that using mutexes
> is not cheap.
>
> --
> Gabriel Genellina

Interesting, from the documentation for deque says;
"Deques support thread-safe, ..." which would seem to
imply a mutex of some sort would be used. But in
looking at collectionsmodule.c for 2.5.1 I don't see
any mutex which would seem to imply there is one
place else, maybe the GIL, or the documentation is
incorrect.

On the other hand looking at the code in Queue.py there
is a more code plus the not_full Condition variable plus
the call to _put (which is a call to the deque.append)
plus a notify on the not_empty conditional, so it's not
surprising it's slower.

If and when I think Queue is a performance impediment
to an mproc I'll take another look at it, for now lots
of things to learn.

Thanks,

Wink

-- 
http://mail.python.org/mailman/listinfo/python-list


Pythonic way for handling file errors

2007-10-10 Thread wink
Hello,

I would like to know what would be considered the most
Pythonic way of handling errors when dealing with files,
solutions that seem reasonable using 2.5:

---
try:
   f = open('afile', 'r')
   content = f.read()
   error = 200
except Exception:
   error = 404
finally:
   if locals().has_key('f'):
   f.close()
--
try:
   f = open('afile', 'r')
   content = f.read()
except Exception:
   error = 404
else:
   error = 200
finally:
   if locals().has_key('f'):
   f.close()
---
try:
   f = open('afile', 'r')
   content = f.read()
   error = 200
except Exception:
   error = 404
finally:
   try:
   f.close()
   except Exception:
   pass
---
try:
   f = None
   f = open('afile', 'r')
   content = f.read()
   error = 200
except Exception:
   error = 404
finally:
   if f:
   f.close()

try:
   with open('afile', 'r') as f:
   content = f.read()
   error = 200
except Exception:
   error = 404


Of the above I think I like the last one best, but I think
I'd really like to have:

with open('afile', 'r') as f with exceptions:
   content = f.read()
   error = 200
except Exception:
   error = 404

Another words from looking at PEP343 it is the author
of the object returned by the with expression that gets
to decide if exceptions are re-raised. But it would seem
to me it should be the programmer using it that should
decide.

Of course as a newbie, I may be way off base.

Thanks,

Wink Saville

-- 
http://mail.python.org/mailman/listinfo/python-list