commit:     61878e4fbdfef5f8512b34640089e954a14e6d12
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Tue Mar 21 07:06:12 2017 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Fri Mar 24 20:32:25 2017 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=61878e4f

MirrorDistTask: terminate via call_soon for asyncio compat

These changes are analogous to the PollScheduler changes in
the previous commit.

 pym/portage/_emirrordist/MirrorDistTask.py | 39 +++++++++++++++++++++---------
 1 file changed, 27 insertions(+), 12 deletions(-)

diff --git a/pym/portage/_emirrordist/MirrorDistTask.py 
b/pym/portage/_emirrordist/MirrorDistTask.py
index e23a11b3c..d6b3decc0 100644
--- a/pym/portage/_emirrordist/MirrorDistTask.py
+++ b/pym/portage/_emirrordist/MirrorDistTask.py
@@ -24,15 +24,16 @@ if sys.hexversion >= 0x3000000:
 
 class MirrorDistTask(CompositeTask):
 
-       __slots__ = ('_config', '_terminated', '_term_check_id')
+       __slots__ = ('_config', '_fetch_iterator', '_term_rlock',
+               '_term_callback_handle')
 
        def __init__(self, config):
                CompositeTask.__init__(self, scheduler=config.event_loop)
                self._config = config
-               self._terminated = threading.Event()
+               self._term_rlock = threading.RLock()
+               self._term_callback_handle = None
 
        def _start(self):
-               self._term_check_id = 
self.scheduler.idle_add(self._termination_check)
                fetch = TaskScheduler(iter(FetchIterator(self._config)),
                        max_jobs=self._config.options.jobs,
                        max_load=self._config.options.load_average,
@@ -203,17 +204,31 @@ class MirrorDistTask(CompositeTask):
                logging.info("added %i files" % added_file_count)
                logging.info("added %i bytes total" % added_byte_count)
 
+       def _cleanup(self):
+               """
+               Cleanup any callbacks that have been registered with the global
+               event loop.
+               """
+               # The self._term_callback_handle attribute requires locking
+               # since it's modified by the thread safe terminate method.
+               with self._term_rlock:
+                       if self._term_callback_handle not in (None, False):
+                               self._term_callback_handle.cancel()
+                       # This prevents the terminate method from scheduling
+                       # any more callbacks (since _cleanup must eliminate all
+                       # callbacks in order to ensure complete cleanup).
+                       self._term_callback_handle = False
+
        def terminate(self):
-               self._terminated.set()
+               with self._term_rlock:
+                       if self._term_callback_handle is None:
+                               self._term_callback_handle = 
self.scheduler.call_soon_threadsafe(
+                                       self._term_callback)
 
-       def _termination_check(self):
-               if self._terminated.is_set():
-                       self.cancel()
-                       self.wait()
-               return True
+       def _term_callback(self):
+               self.cancel()
+               self.wait()
 
        def _wait(self):
                CompositeTask._wait(self)
-               if self._term_check_id is not None:
-                       self.scheduler.source_remove(self._term_check_id)
-                       self._term_check_id = None
+               self._cleanup()

Reply via email to