[Python-Dev] futures API

2010-12-09 Thread Thomas Nagy
Hello,

I am looking forward to replacing a piece of code 
(http://code.google.com/p/waf/source/browse/trunk/waflib/Runner.py#86) by the 
futures module which was announced in python 3.2 beta. I am a bit stuck with 
it, so I have a few questions about the futures:

1. Is the futures API frozen?
2. How hard would it be to return the tasks processed in an output queue to 
process/consume the results while they are returned? The code does not seem to 
be very open for monkey patching.
3. How hard would it be to add new tasks dynamically (after a task is executed) 
and have the futures object never complete?
4. Is there a performance evaluation of the futures code (execution overhead) ?

Thanks,
Thomas



  
___
Python-Dev mailing list
Python-Dev@python.org
http://mail.python.org/mailman/listinfo/python-dev
Unsubscribe: 
http://mail.python.org/mailman/options/python-dev/archive%40mail-archive.com


Re: [Python-Dev] futures API

2010-12-10 Thread Thomas Nagy
--- El jue, 9/12/10, Brian Quinlan escribió:
> On Dec 9, 2010, at 4:26 AM, Thomas Nagy wrote:
> 
> > I am looking forward to replacing a piece of code 
> > (http://code.google.com/p/waf/source/browse/trunk/waflib/Runner.py#86)
> by the futures module which was announced in python 3.2
> beta. I am a bit stuck with it, so I have a few questions
> about the futures:
> > 
> > 1. Is the futures API frozen?
> 
> Yes.
> 
> > 2. How hard would it be to return the tasks processed
> in an output queue to process/consume the results while they
> are returned? The code does not seem to be very open for
> monkey patching.
> 
> You can associate a callback with a submitted future. That
> callback could add the future to your queue.

Ok, it works. I was thinking the object was cleaned up immediately after it was 
used.

> > 3. How hard would it be to add new tasks dynamically
> (after a task is executed) and have the futures object never
> complete?
> 
> I'm not sure that I understand your question. You can
> submit new work to an Executor at until time until it is
> shutdown and a work item can take as long to complete as you
> want. If you are contemplating tasks that don't complete
> then maybe you could be better just scheduling a thread.
> 
> > 4. Is there a performance evaluation of the futures
> code (execution overhead) ?
> 
> No. Scott Dial did make some performance improvements so he
> might have a handle on its overhead.

Ok.

I have a process running for a long time, and which may use futures of 
different max_workers count. I think it is not too far-fetched to create a new 
futures object each time. Yet, the execution becomes slower after each call, 
for example with http://freehackers.org/~tnagy/futures_test.py:

"""
import concurrent.futures
from queue import Queue
import datetime

class counter(object):
    def __init__(self, fut):
        self.fut = fut

    def run(self):
        def look_busy(num, obj):
            tot = 0
            for x in range(num):
                tot += x
            obj.out_q.put(tot)

        start = datetime.datetime.utcnow()
        self.count = 0
        self.out_q = Queue(0)
        for x in range(1000):
            self.count += 1
            self.fut.submit(look_busy, self.count, self)

        while self.count:
            self.count -= 1
            self.out_q.get()

        delta = datetime.datetime.utcnow() - start
        print(delta.total_seconds())

fut = concurrent.futures.ThreadPoolExecutor(max_workers=20)
for x in range(100):
    # comment the following line
    fut = concurrent.futures.ThreadPoolExecutor(max_workers=20)
    c = counter(fut)
    c.run()
"""

The runtime grows after each step:
0.216451
0.225186
0.223725
0.74
0.230964
0.240531
0.24137
0.252393
0.249948
0.257153
...

Is there a mistake in this piece of code?

Thanks,
Thomas



  
___
Python-Dev mailing list
Python-Dev@python.org
http://mail.python.org/mailman/listinfo/python-dev
Unsubscribe: 
http://mail.python.org/mailman/options/python-dev/archive%40mail-archive.com


Re: [Python-Dev] futures API

2010-12-10 Thread Thomas Nagy
--- El vie, 10/12/10, Brian Quinlan escribió:
> On Dec 10, 2010, at 5:36 AM, Thomas Nagy wrote: 
> > I have a process running for a long time, and which
> may use futures of different max_workers count. I think it
> is not too far-fetched to create a new futures object each
> time. Yet, the execution becomes slower after each call, for
> example with http://freehackers.org/~tnagy/futures_test.py:
> > 
> > """
> > import concurrent.futures
> > from queue import Queue
> > import datetime
> > 
> > class counter(object):
> >     def __init__(self, fut):
> >         self.fut = fut
> > 
> >     def run(self):
> >         def
> look_busy(num, obj):
> >         
>    tot = 0
> >         
>    for x in range(num):
> >             
>    tot += x
> >         
>    obj.out_q.put(tot)
> > 
> >         start =
> datetime.datetime.utcnow()
> >         self.count = 0
> >         self.out_q =
> Queue(0)
> >         for x in
> range(1000):
> >         
>    self.count += 1
> >         
>    self.fut.submit(look_busy, self.count,
> self)
> > 
> >         while
> self.count:
> >         
>    self.count -= 1
> >         
>    self.out_q.get()
> > 
> >         delta =
> datetime.datetime.utcnow() - start
> >     
>    print(delta.total_seconds())
> > 
> > fut =
> concurrent.futures.ThreadPoolExecutor(max_workers=20)
> > for x in range(100):
> >     # comment the following line
> >     fut =
> concurrent.futures.ThreadPoolExecutor(max_workers=20)
> >     c = counter(fut)
> >     c.run()
> > """
> > 
> > The runtime grows after each step:
> > 0.216451
> > 0.225186
> > 0.223725
> > 0.74
> > 0.230964
> > 0.240531
> > 0.24137
> > 0.252393
> > 0.249948
> > 0.257153
> > ...
> > 
> > Is there a mistake in this piece of code?
> 
> There is no mistake that I can see but I suspect that the
> circular references that you are building are causing the
> ThreadPoolExecutor to take a long time to be collected. Try
> adding:
> 
>     c = counter(fut)
>     c.run()
> +    fut.shutdown()
> 
> Even if that fixes your problem, I still don't fully
> understand this because I would expect the runtime to fall
> after a while as ThreadPoolExecutors are collected.

The shutdown call is indeed a good fix :-) Here is the time response of the 
calls to counter() when shutdown is not called:
http://www.freehackers.org/~tnagy/runtime_futures.png

After trying to stop the program by using CTRL+C, the following error may 
appear, after which the process cannot be interrupted:

"""
19:18:12 /tmp/build> python3.2 futures_test.py
0.389657
0.417173
0.416513
0.421424
0.449666
0.482273
^CTraceback (most recent call last):
  File "futures_test.py", line 36, in 
    c.run()
  File "futures_test.py", line 22, in run
    self.fut.submit(look_busy, self.count, self)
  File "/usr/local/lib/python3.2/concurrent/futures/thread.py", line 114, in 
submit
    self._work_queue.put(w)
  File "/usr/local/lib/python3.2/queue.py", line 135, in put
    self.not_full.acquire()
KeyboardInterrupt
"""

It is not expected, is it?

Thomas



  
___
Python-Dev mailing list
Python-Dev@python.org
http://mail.python.org/mailman/listinfo/python-dev
Unsubscribe: 
http://mail.python.org/mailman/options/python-dev/archive%40mail-archive.com


Re: [Python-Dev] futures API

2010-12-10 Thread Thomas Nagy
--- El vie, 10/12/10, Thomas Nagy escribió:
> --- El vie, 10/12/10, Brian Quinlan
> escribió:
> > On Dec 10, 2010, at 5:36 AM, Thomas Nagy wrote: 
> > > I have a process running for a long time, and
> which
> > may use futures of different max_workers count. I
> think it
> > is not too far-fetched to create a new futures object
> each
> > time. Yet, the execution becomes slower after each
> call, for
> > example with http://freehackers.org/~tnagy/futures_test.py:
> > > 
> > > """
> > > import concurrent.futures
> > > from queue import Queue
> > > import datetime
> > > 
> > > class counter(object):
> > >     def __init__(self, fut):
> > >         self.fut = fut
> > > 
> > >     def run(self):
> > >         def
> > look_busy(num, obj):
> > >         
> >    tot = 0
> > >         
> >    for x in range(num):
> > >             
> >    tot += x
> > >         
> >    obj.out_q.put(tot)
> > > 
> > >         start =
> > datetime.datetime.utcnow()
> > >         self.count = 0
> > >         self.out_q =
> > Queue(0)
> > >         for x in
> > range(1000):
> > >         
> >    self.count += 1
> > >         
> >    self.fut.submit(look_busy, self.count,
> > self)
> > > 
> > >         while
> > self.count:
> > >         
> >    self.count -= 1
> > >         
> >    self.out_q.get()
> > > 
> > >         delta =
> > datetime.datetime.utcnow() - start
> > >     
> >    print(delta.total_seconds())
> > > 
> > > fut =
> > concurrent.futures.ThreadPoolExecutor(max_workers=20)
> > > for x in range(100):
> > >     # comment the following line
> > >     fut =
> > concurrent.futures.ThreadPoolExecutor(max_workers=20)
> > >     c = counter(fut)
> > >     c.run()
> > > """
> > > 
> > > The runtime grows after each step:
> > > 0.216451
> > > 0.225186
> > > 0.223725
> > > 0.74
> > > 0.230964
> > > 0.240531
> > > 0.24137
> > > 0.252393
> > > 0.249948
> > > 0.257153
> > > ...
> > > 
> > > Is there a mistake in this piece of code?
> > 
> > There is no mistake that I can see but I suspect that
> the
> > circular references that you are building are causing
> the
> > ThreadPoolExecutor to take a long time to be
> collected. Try
> > adding:
> > 
> >     c = counter(fut)
> >     c.run()
> > +    fut.shutdown()
> > 
> > Even if that fixes your problem, I still don't fully
> > understand this because I would expect the runtime to
> fall
> > after a while as ThreadPoolExecutors are collected.
> 
> The shutdown call is indeed a good fix :-) Here is the time
> response of the calls to counter() when shutdown is not
> called:
> http://www.freehackers.org/~tnagy/runtime_futures.png
> 
> After trying to stop the program by using CTRL+C, the
> following error may appear, after which the process cannot
> be interrupted:
> 
> """
> 19:18:12 /tmp/build> python3.2 futures_test.py
> 0.389657
> 0.417173
> 0.416513
> 0.421424
> 0.449666
> 0.482273
> ^CTraceback (most recent call last):
>   File "futures_test.py", line 36, in 
>     c.run()
>   File "futures_test.py", line 22, in run
>     self.fut.submit(look_busy, self.count, self)
>   File
> "/usr/local/lib/python3.2/concurrent/futures/thread.py",
> line 114, in submit
>     self._work_queue.put(w)
>   File "/usr/local/lib/python3.2/queue.py", line 135, in
> put
>     self.not_full.acquire()
> KeyboardInterrupt
> """
> 
> It is not expected, is it?

The problem also occurs when using a callback:
http://www.freehackers.org/~tnagy/futures_test2.py

If it is necessary to catch KeyboardInterrupt exceptions to cancel the futures 
execution, then how about adding this detail to the docs?

Thomas



  
___
Python-Dev mailing list
Python-Dev@python.org
http://mail.python.org/mailman/listinfo/python-dev
Unsubscribe: 
http://mail.python.org/mailman/options/python-dev/archive%40mail-archive.com


Re: [Python-Dev] futures API

2010-12-11 Thread Thomas Nagy
--- El vie, 10/12/10, Brian Quinlan escribió:
> On Dec 10, 2010, at 10:51 AM, Thomas Nagy wrote:
> > --- El vie, 10/12/10, Brian Quinlan escribió:
> >> On Dec 10, 2010, at 5:36 AM, Thomas Nagy wrote:
> >>> I have a process running for a long time, and
> which
> >> may use futures of different max_workers count. I
> think it
> >> is not too far-fetched to create a new futures
> object each
> >> time. Yet, the execution becomes slower after each
> call, for
> >> example with http://freehackers.org/~tnagy/futures_test.py:
> >>>
> >>> """
> >>> import concurrent.futures
> >>> from queue import Queue
> >>> import datetime
> >>>
> >>> class counter(object):
> >>>      def __init__(self, fut):
> >>>          self.fut =
> fut
> >>>
> >>>      def run(self):
> >>>          def
> >> look_busy(num, obj):
> >>>
> >>    tot = 0
> >>>
> >>    for x in range(num):
> >>>
> >>    tot += x
> >>>
> >>    obj.out_q.put(tot)
> >>>
> >>>          start =
> >> datetime.datetime.utcnow()
> >>>          self.count =
> 0
> >>>          self.out_q
> =
> >> Queue(0)
> >>>          for x in
> >> range(1000):
> >>>
> >>    self.count += 1
> >>>
> >>    self.fut.submit(look_busy,
> self.count,
> >> self)
> >>>
> >>>          while
> >> self.count:
> >>>
> >>    self.count -= 1
> >>>
> >>    self.out_q.get()
> >>>
> >>>          delta =
> >> datetime.datetime.utcnow() - start
> >>>
> >>    print(delta.total_seconds())
> >>>
> >>> fut =
> >>
> concurrent.futures.ThreadPoolExecutor(max_workers=20)
> >>> for x in range(100):
> >>>      # comment the following
> line
> >>>      fut =
> >>
> concurrent.futures.ThreadPoolExecutor(max_workers=20)
> >>>      c = counter(fut)
> >>>      c.run()
> >>> """
> >>>
> >>> The runtime grows after each step:
> >>> 0.216451
> >>> 0.225186
> >>> 0.223725
> >>> 0.74
> >>> 0.230964
> >>> 0.240531
> >>> 0.24137
> >>> 0.252393
> >>> 0.249948
> >>> 0.257153
> >>> ...
> >>>
> >>> Is there a mistake in this piece of code?
> >>
> >> There is no mistake that I can see but I suspect
> that the
> >> circular references that you are building are
> causing the
> >> ThreadPoolExecutor to take a long time to be
> collected. Try
> >> adding:
> >>
> >>     c = counter(fut)
> >>     c.run()
> >> +    fut.shutdown()
> >>
> >> Even if that fixes your problem, I still don't
> fully
> >> understand this because I would expect the runtime
> to fall
> >> after a while as ThreadPoolExecutors are
> collected.
> >
> > The shutdown call is indeed a good fix :-) Here is the
> time response  
> > of the calls to counter() when shutdown is not
> called:
> > http://www.freehackers.org/~tnagy/runtime_futures.png
> 
> FWIW, I think that you are confusion the term "future"
> with  
> "executor". A future represents a single work item. An
> executor  
> creates futures and schedules their underlying work.

Ah yes, sorry. I have also realized that the executor is not the killer feature 
I was expecting, it can only replace a little part of the code I have: 
controlling the exceptions and the workflow is the most complicated part.

I have also observed a minor performance degradation with the executor 
replacement (3 seconds for 5000 work items). The amount of work items processed 
by unit of time does not seem to be a straight line: 
http://www.freehackers.org/~tnagy/runtime_futures_2.png . Out of curiosity, 
what is the "_thread_references" for?

The source file for the example is in:
http://www.freehackers.org/~tnagy/futures_test3.py

The diagram was created by:
http://www.freehackers.org/~tnagy/futures_test3.plot

Thomas



  
___
Python-Dev mailing list
Python-Dev@python.org
http://mail.python.org/mailman/listinfo/python-dev
Unsubscribe: 
http://mail.python.org/mailman/options/python-dev/archive%40mail-archive.com


Re: [Python-Dev] futures API

2010-12-11 Thread Thomas Nagy
--- El sáb, 11/12/10, Brian Quinlan escribió:
> 
> On Dec 11, 2010, at 6:44 AM, Thomas Nagy wrote:
> 
> > --- El vie, 10/12/10, Brian Quinlan escribió:
> >> On Dec 10, 2010, at 10:51 AM, Thomas Nagy wrote:
> >>> --- El vie, 10/12/10, Brian Quinlan
> escribió:
> >>>> On Dec 10, 2010, at 5:36 AM, Thomas Nagy
> wrote:
> >>>>> I have a process running for a long
> time, and
> >> which
> >>>> may use futures of different max_workers
> count. I
> >> think it
> >>>> is not too far-fetched to create a new
> futures
> >> object each
> >>>> time. Yet, the execution becomes slower
> after each
> >> call, for
> >>>> example with http://freehackers.org/~tnagy/futures_test.py:
> >>>>> 
> >>>>> """
> >>>>> import concurrent.futures
> >>>>> from queue import Queue
> >>>>> import datetime
> >>>>> 
> >>>>> class counter(object):
> >>>>>       def
> __init__(self, fut):
> >>>>>       
>    self.fut =
> >> fut
> >>>>> 
> >>>>>       def
> run(self):
> >>>>>       
>    def
> >>>> look_busy(num, obj):
> >>>>> 
> >>>>     tot = 0
> >>>>> 
> >>>>     for x in
> range(num):
> >>>>> 
> >>>>     tot += x
> >>>>> 
> >>>> 
>    obj.out_q.put(tot)
> >>>>> 
> >>>>>       
>    start =
> >>>> datetime.datetime.utcnow()
> >>>>>       
>    self.count =
> >> 0
> >>>>>       
>    self.out_q
> >> =
> >>>> Queue(0)
> >>>>>       
>    for x in
> >>>> range(1000):
> >>>>> 
> >>>>     self.count += 1
> >>>>> 
> >>>> 
>    self.fut.submit(look_busy,
> >> self.count,
> >>>> self)
> >>>>> 
> >>>>>       
>    while
> >>>> self.count:
> >>>>> 
> >>>>     self.count -= 1
> >>>>> 
> >>>>     self.out_q.get()
> >>>>> 
> >>>>>       
>    delta =
> >>>> datetime.datetime.utcnow() - start
> >>>>> 
> >>>> 
>    print(delta.total_seconds())
> >>>>> 
> >>>>> fut =
> >>>> 
> >>
> concurrent.futures.ThreadPoolExecutor(max_workers=20)
> >>>>> for x in range(100):
> >>>>>       #
> comment the following
> >> line
> >>>>>       fut =
> >>>> 
> >>
> concurrent.futures.ThreadPoolExecutor(max_workers=20)
> >>>>>       c =
> counter(fut)
> >>>>>   
>    c.run()
> >>>>> """
> >>>>> 
> >>>>> The runtime grows after each step:
> >>>>> 0.216451
> >>>>> 0.225186
> >>>>> 0.223725
> >>>>> 0.74
> >>>>> 0.230964
> >>>>> 0.240531
> >>>>> 0.24137
> >>>>> 0.252393
> >>>>> 0.249948
> >>>>> 0.257153
> >>>>> ...
> >>>>> 
> >>>>> Is there a mistake in this piece of
> code?
> >>>> 
> >>>> There is no mistake that I can see but I
> suspect
> >> that the
> >>>> circular references that you are building
> are
> >> causing the
> >>>> ThreadPoolExecutor to take a long time to
> be
> >> collected. Try
> >>>> adding:
> >>>> 
> >>>>      c = counter(fut)
> >>>>      c.run()
> >>>> +    fut.shutdown()
> >>>> 
> >>>> Even if that fixes your problem, I still
> don't
> >> fully
> >>>> understand this because I would expect the
> runtime
> >> to fall
> >>>> after a while as ThreadPoolExecutors are
> >> collected.
> >>> 
> >>> The shutdown call is indeed a good fix :-)
> Here is the
> >> time response
> >>> of the calls to counter() when shutdown is
> not
> >> called:
> >>> http://www.freehackers.org/~tnagy/runtime_f