[Python-Dev] futures API
Hello, I am looking forward to replacing a piece of code (http://code.google.com/p/waf/source/browse/trunk/waflib/Runner.py#86) by the futures module which was announced in python 3.2 beta. I am a bit stuck with it, so I have a few questions about the futures: 1. Is the futures API frozen? 2. How hard would it be to return the tasks processed in an output queue to process/consume the results while they are returned? The code does not seem to be very open for monkey patching. 3. How hard would it be to add new tasks dynamically (after a task is executed) and have the futures object never complete? 4. Is there a performance evaluation of the futures code (execution overhead) ? Thanks, Thomas ___ Python-Dev mailing list Python-Dev@python.org http://mail.python.org/mailman/listinfo/python-dev Unsubscribe: http://mail.python.org/mailman/options/python-dev/archive%40mail-archive.com
Re: [Python-Dev] futures API
--- El jue, 9/12/10, Brian Quinlan escribió: > On Dec 9, 2010, at 4:26 AM, Thomas Nagy wrote: > > > I am looking forward to replacing a piece of code > > (http://code.google.com/p/waf/source/browse/trunk/waflib/Runner.py#86) > by the futures module which was announced in python 3.2 > beta. I am a bit stuck with it, so I have a few questions > about the futures: > > > > 1. Is the futures API frozen? > > Yes. > > > 2. How hard would it be to return the tasks processed > in an output queue to process/consume the results while they > are returned? The code does not seem to be very open for > monkey patching. > > You can associate a callback with a submitted future. That > callback could add the future to your queue. Ok, it works. I was thinking the object was cleaned up immediately after it was used. > > 3. How hard would it be to add new tasks dynamically > (after a task is executed) and have the futures object never > complete? > > I'm not sure that I understand your question. You can > submit new work to an Executor at until time until it is > shutdown and a work item can take as long to complete as you > want. If you are contemplating tasks that don't complete > then maybe you could be better just scheduling a thread. > > > 4. Is there a performance evaluation of the futures > code (execution overhead) ? > > No. Scott Dial did make some performance improvements so he > might have a handle on its overhead. Ok. I have a process running for a long time, and which may use futures of different max_workers count. I think it is not too far-fetched to create a new futures object each time. Yet, the execution becomes slower after each call, for example with http://freehackers.org/~tnagy/futures_test.py: """ import concurrent.futures from queue import Queue import datetime class counter(object): def __init__(self, fut): self.fut = fut def run(self): def look_busy(num, obj): tot = 0 for x in range(num): tot += x obj.out_q.put(tot) start = datetime.datetime.utcnow() self.count = 0 self.out_q = Queue(0) for x in range(1000): self.count += 1 self.fut.submit(look_busy, self.count, self) while self.count: self.count -= 1 self.out_q.get() delta = datetime.datetime.utcnow() - start print(delta.total_seconds()) fut = concurrent.futures.ThreadPoolExecutor(max_workers=20) for x in range(100): # comment the following line fut = concurrent.futures.ThreadPoolExecutor(max_workers=20) c = counter(fut) c.run() """ The runtime grows after each step: 0.216451 0.225186 0.223725 0.74 0.230964 0.240531 0.24137 0.252393 0.249948 0.257153 ... Is there a mistake in this piece of code? Thanks, Thomas ___ Python-Dev mailing list Python-Dev@python.org http://mail.python.org/mailman/listinfo/python-dev Unsubscribe: http://mail.python.org/mailman/options/python-dev/archive%40mail-archive.com
Re: [Python-Dev] futures API
--- El vie, 10/12/10, Brian Quinlan escribió: > On Dec 10, 2010, at 5:36 AM, Thomas Nagy wrote: > > I have a process running for a long time, and which > may use futures of different max_workers count. I think it > is not too far-fetched to create a new futures object each > time. Yet, the execution becomes slower after each call, for > example with http://freehackers.org/~tnagy/futures_test.py: > > > > """ > > import concurrent.futures > > from queue import Queue > > import datetime > > > > class counter(object): > > def __init__(self, fut): > > self.fut = fut > > > > def run(self): > > def > look_busy(num, obj): > > > tot = 0 > > > for x in range(num): > > > tot += x > > > obj.out_q.put(tot) > > > > start = > datetime.datetime.utcnow() > > self.count = 0 > > self.out_q = > Queue(0) > > for x in > range(1000): > > > self.count += 1 > > > self.fut.submit(look_busy, self.count, > self) > > > > while > self.count: > > > self.count -= 1 > > > self.out_q.get() > > > > delta = > datetime.datetime.utcnow() - start > > > print(delta.total_seconds()) > > > > fut = > concurrent.futures.ThreadPoolExecutor(max_workers=20) > > for x in range(100): > > # comment the following line > > fut = > concurrent.futures.ThreadPoolExecutor(max_workers=20) > > c = counter(fut) > > c.run() > > """ > > > > The runtime grows after each step: > > 0.216451 > > 0.225186 > > 0.223725 > > 0.74 > > 0.230964 > > 0.240531 > > 0.24137 > > 0.252393 > > 0.249948 > > 0.257153 > > ... > > > > Is there a mistake in this piece of code? > > There is no mistake that I can see but I suspect that the > circular references that you are building are causing the > ThreadPoolExecutor to take a long time to be collected. Try > adding: > > c = counter(fut) > c.run() > + fut.shutdown() > > Even if that fixes your problem, I still don't fully > understand this because I would expect the runtime to fall > after a while as ThreadPoolExecutors are collected. The shutdown call is indeed a good fix :-) Here is the time response of the calls to counter() when shutdown is not called: http://www.freehackers.org/~tnagy/runtime_futures.png After trying to stop the program by using CTRL+C, the following error may appear, after which the process cannot be interrupted: """ 19:18:12 /tmp/build> python3.2 futures_test.py 0.389657 0.417173 0.416513 0.421424 0.449666 0.482273 ^CTraceback (most recent call last): File "futures_test.py", line 36, in c.run() File "futures_test.py", line 22, in run self.fut.submit(look_busy, self.count, self) File "/usr/local/lib/python3.2/concurrent/futures/thread.py", line 114, in submit self._work_queue.put(w) File "/usr/local/lib/python3.2/queue.py", line 135, in put self.not_full.acquire() KeyboardInterrupt """ It is not expected, is it? Thomas ___ Python-Dev mailing list Python-Dev@python.org http://mail.python.org/mailman/listinfo/python-dev Unsubscribe: http://mail.python.org/mailman/options/python-dev/archive%40mail-archive.com
Re: [Python-Dev] futures API
--- El vie, 10/12/10, Thomas Nagy escribió: > --- El vie, 10/12/10, Brian Quinlan > escribió: > > On Dec 10, 2010, at 5:36 AM, Thomas Nagy wrote: > > > I have a process running for a long time, and > which > > may use futures of different max_workers count. I > think it > > is not too far-fetched to create a new futures object > each > > time. Yet, the execution becomes slower after each > call, for > > example with http://freehackers.org/~tnagy/futures_test.py: > > > > > > """ > > > import concurrent.futures > > > from queue import Queue > > > import datetime > > > > > > class counter(object): > > > def __init__(self, fut): > > > self.fut = fut > > > > > > def run(self): > > > def > > look_busy(num, obj): > > > > > tot = 0 > > > > > for x in range(num): > > > > > tot += x > > > > > obj.out_q.put(tot) > > > > > > start = > > datetime.datetime.utcnow() > > > self.count = 0 > > > self.out_q = > > Queue(0) > > > for x in > > range(1000): > > > > > self.count += 1 > > > > > self.fut.submit(look_busy, self.count, > > self) > > > > > > while > > self.count: > > > > > self.count -= 1 > > > > > self.out_q.get() > > > > > > delta = > > datetime.datetime.utcnow() - start > > > > > print(delta.total_seconds()) > > > > > > fut = > > concurrent.futures.ThreadPoolExecutor(max_workers=20) > > > for x in range(100): > > > # comment the following line > > > fut = > > concurrent.futures.ThreadPoolExecutor(max_workers=20) > > > c = counter(fut) > > > c.run() > > > """ > > > > > > The runtime grows after each step: > > > 0.216451 > > > 0.225186 > > > 0.223725 > > > 0.74 > > > 0.230964 > > > 0.240531 > > > 0.24137 > > > 0.252393 > > > 0.249948 > > > 0.257153 > > > ... > > > > > > Is there a mistake in this piece of code? > > > > There is no mistake that I can see but I suspect that > the > > circular references that you are building are causing > the > > ThreadPoolExecutor to take a long time to be > collected. Try > > adding: > > > > c = counter(fut) > > c.run() > > + fut.shutdown() > > > > Even if that fixes your problem, I still don't fully > > understand this because I would expect the runtime to > fall > > after a while as ThreadPoolExecutors are collected. > > The shutdown call is indeed a good fix :-) Here is the time > response of the calls to counter() when shutdown is not > called: > http://www.freehackers.org/~tnagy/runtime_futures.png > > After trying to stop the program by using CTRL+C, the > following error may appear, after which the process cannot > be interrupted: > > """ > 19:18:12 /tmp/build> python3.2 futures_test.py > 0.389657 > 0.417173 > 0.416513 > 0.421424 > 0.449666 > 0.482273 > ^CTraceback (most recent call last): > File "futures_test.py", line 36, in > c.run() > File "futures_test.py", line 22, in run > self.fut.submit(look_busy, self.count, self) > File > "/usr/local/lib/python3.2/concurrent/futures/thread.py", > line 114, in submit > self._work_queue.put(w) > File "/usr/local/lib/python3.2/queue.py", line 135, in > put > self.not_full.acquire() > KeyboardInterrupt > """ > > It is not expected, is it? The problem also occurs when using a callback: http://www.freehackers.org/~tnagy/futures_test2.py If it is necessary to catch KeyboardInterrupt exceptions to cancel the futures execution, then how about adding this detail to the docs? Thomas ___ Python-Dev mailing list Python-Dev@python.org http://mail.python.org/mailman/listinfo/python-dev Unsubscribe: http://mail.python.org/mailman/options/python-dev/archive%40mail-archive.com
Re: [Python-Dev] futures API
--- El vie, 10/12/10, Brian Quinlan escribió: > On Dec 10, 2010, at 10:51 AM, Thomas Nagy wrote: > > --- El vie, 10/12/10, Brian Quinlan escribió: > >> On Dec 10, 2010, at 5:36 AM, Thomas Nagy wrote: > >>> I have a process running for a long time, and > which > >> may use futures of different max_workers count. I > think it > >> is not too far-fetched to create a new futures > object each > >> time. Yet, the execution becomes slower after each > call, for > >> example with http://freehackers.org/~tnagy/futures_test.py: > >>> > >>> """ > >>> import concurrent.futures > >>> from queue import Queue > >>> import datetime > >>> > >>> class counter(object): > >>> def __init__(self, fut): > >>> self.fut = > fut > >>> > >>> def run(self): > >>> def > >> look_busy(num, obj): > >>> > >> tot = 0 > >>> > >> for x in range(num): > >>> > >> tot += x > >>> > >> obj.out_q.put(tot) > >>> > >>> start = > >> datetime.datetime.utcnow() > >>> self.count = > 0 > >>> self.out_q > = > >> Queue(0) > >>> for x in > >> range(1000): > >>> > >> self.count += 1 > >>> > >> self.fut.submit(look_busy, > self.count, > >> self) > >>> > >>> while > >> self.count: > >>> > >> self.count -= 1 > >>> > >> self.out_q.get() > >>> > >>> delta = > >> datetime.datetime.utcnow() - start > >>> > >> print(delta.total_seconds()) > >>> > >>> fut = > >> > concurrent.futures.ThreadPoolExecutor(max_workers=20) > >>> for x in range(100): > >>> # comment the following > line > >>> fut = > >> > concurrent.futures.ThreadPoolExecutor(max_workers=20) > >>> c = counter(fut) > >>> c.run() > >>> """ > >>> > >>> The runtime grows after each step: > >>> 0.216451 > >>> 0.225186 > >>> 0.223725 > >>> 0.74 > >>> 0.230964 > >>> 0.240531 > >>> 0.24137 > >>> 0.252393 > >>> 0.249948 > >>> 0.257153 > >>> ... > >>> > >>> Is there a mistake in this piece of code? > >> > >> There is no mistake that I can see but I suspect > that the > >> circular references that you are building are > causing the > >> ThreadPoolExecutor to take a long time to be > collected. Try > >> adding: > >> > >> c = counter(fut) > >> c.run() > >> + fut.shutdown() > >> > >> Even if that fixes your problem, I still don't > fully > >> understand this because I would expect the runtime > to fall > >> after a while as ThreadPoolExecutors are > collected. > > > > The shutdown call is indeed a good fix :-) Here is the > time response > > of the calls to counter() when shutdown is not > called: > > http://www.freehackers.org/~tnagy/runtime_futures.png > > FWIW, I think that you are confusion the term "future" > with > "executor". A future represents a single work item. An > executor > creates futures and schedules their underlying work. Ah yes, sorry. I have also realized that the executor is not the killer feature I was expecting, it can only replace a little part of the code I have: controlling the exceptions and the workflow is the most complicated part. I have also observed a minor performance degradation with the executor replacement (3 seconds for 5000 work items). The amount of work items processed by unit of time does not seem to be a straight line: http://www.freehackers.org/~tnagy/runtime_futures_2.png . Out of curiosity, what is the "_thread_references" for? The source file for the example is in: http://www.freehackers.org/~tnagy/futures_test3.py The diagram was created by: http://www.freehackers.org/~tnagy/futures_test3.plot Thomas ___ Python-Dev mailing list Python-Dev@python.org http://mail.python.org/mailman/listinfo/python-dev Unsubscribe: http://mail.python.org/mailman/options/python-dev/archive%40mail-archive.com
Re: [Python-Dev] futures API
--- El sáb, 11/12/10, Brian Quinlan escribió: > > On Dec 11, 2010, at 6:44 AM, Thomas Nagy wrote: > > > --- El vie, 10/12/10, Brian Quinlan escribió: > >> On Dec 10, 2010, at 10:51 AM, Thomas Nagy wrote: > >>> --- El vie, 10/12/10, Brian Quinlan > escribió: > >>>> On Dec 10, 2010, at 5:36 AM, Thomas Nagy > wrote: > >>>>> I have a process running for a long > time, and > >> which > >>>> may use futures of different max_workers > count. I > >> think it > >>>> is not too far-fetched to create a new > futures > >> object each > >>>> time. Yet, the execution becomes slower > after each > >> call, for > >>>> example with http://freehackers.org/~tnagy/futures_test.py: > >>>>> > >>>>> """ > >>>>> import concurrent.futures > >>>>> from queue import Queue > >>>>> import datetime > >>>>> > >>>>> class counter(object): > >>>>> def > __init__(self, fut): > >>>>> > self.fut = > >> fut > >>>>> > >>>>> def > run(self): > >>>>> > def > >>>> look_busy(num, obj): > >>>>> > >>>> tot = 0 > >>>>> > >>>> for x in > range(num): > >>>>> > >>>> tot += x > >>>>> > >>>> > obj.out_q.put(tot) > >>>>> > >>>>> > start = > >>>> datetime.datetime.utcnow() > >>>>> > self.count = > >> 0 > >>>>> > self.out_q > >> = > >>>> Queue(0) > >>>>> > for x in > >>>> range(1000): > >>>>> > >>>> self.count += 1 > >>>>> > >>>> > self.fut.submit(look_busy, > >> self.count, > >>>> self) > >>>>> > >>>>> > while > >>>> self.count: > >>>>> > >>>> self.count -= 1 > >>>>> > >>>> self.out_q.get() > >>>>> > >>>>> > delta = > >>>> datetime.datetime.utcnow() - start > >>>>> > >>>> > print(delta.total_seconds()) > >>>>> > >>>>> fut = > >>>> > >> > concurrent.futures.ThreadPoolExecutor(max_workers=20) > >>>>> for x in range(100): > >>>>> # > comment the following > >> line > >>>>> fut = > >>>> > >> > concurrent.futures.ThreadPoolExecutor(max_workers=20) > >>>>> c = > counter(fut) > >>>>> > c.run() > >>>>> """ > >>>>> > >>>>> The runtime grows after each step: > >>>>> 0.216451 > >>>>> 0.225186 > >>>>> 0.223725 > >>>>> 0.74 > >>>>> 0.230964 > >>>>> 0.240531 > >>>>> 0.24137 > >>>>> 0.252393 > >>>>> 0.249948 > >>>>> 0.257153 > >>>>> ... > >>>>> > >>>>> Is there a mistake in this piece of > code? > >>>> > >>>> There is no mistake that I can see but I > suspect > >> that the > >>>> circular references that you are building > are > >> causing the > >>>> ThreadPoolExecutor to take a long time to > be > >> collected. Try > >>>> adding: > >>>> > >>>> c = counter(fut) > >>>> c.run() > >>>> + fut.shutdown() > >>>> > >>>> Even if that fixes your problem, I still > don't > >> fully > >>>> understand this because I would expect the > runtime > >> to fall > >>>> after a while as ThreadPoolExecutors are > >> collected. > >>> > >>> The shutdown call is indeed a good fix :-) > Here is the > >> time response > >>> of the calls to counter() when shutdown is > not > >> called: > >>> http://www.freehackers.org/~tnagy/runtime_f