commit:     9772f8f2a58a858a80ad1542d1ce46193616be67
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Mon Apr 16 23:55:14 2018 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Tue Apr 17 00:49:09 2018 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=9772f8f2

EventLoop._idle_add: use thread-safe deque append

This fixes previously unsafe usage of self._idle_callbacks when it was
a dictionary. The deque append is thread-safe, but it does *not* notify
the loop's thread, so the caller must notify if appropriate.

Fixes: 1ee8971ba1cb ("EventLoop: eliminate thread safety from call_soon")

 pym/portage/util/_eventloop/EventLoop.py | 90 ++++++++++++++++++++------------
 1 file changed, 58 insertions(+), 32 deletions(-)

diff --git a/pym/portage/util/_eventloop/EventLoop.py 
b/pym/portage/util/_eventloop/EventLoop.py
index ae5a0a70a..d4f20c6ed 100644
--- a/pym/portage/util/_eventloop/EventLoop.py
+++ b/pym/portage/util/_eventloop/EventLoop.py
@@ -3,6 +3,7 @@
 
 from __future__ import division
 
+import collections
 import errno
 import functools
 import logging
@@ -30,7 +31,6 @@ portage.proxy.lazyimport.lazyimport(globals(),
        
'portage.util.futures.unix_events:_PortageEventLoop,_PortageChildWatcher',
 )
 
-from portage import OrderedDict
 from portage.util import writemsg_level
 from portage.util.monotonic import monotonic
 from ..SlotObject import SlotObject
@@ -55,7 +55,7 @@ class EventLoop(object):
                __slots__ = ("callback", "data", "pid", "source_id")
 
        class _idle_callback_class(SlotObject):
-               __slots__ = ("args", "callback", "calling", "source_id")
+               __slots__ = ("_args", "_callback", "_calling", "_cancelled")
 
        class _io_handler_class(SlotObject):
                __slots__ = ("args", "callback", "f", "source_id")
@@ -141,10 +141,10 @@ class EventLoop(object):
                # If this attribute has changed since the last time that the
                # call_soon callbacks have been called, then it's not safe to
                # wait on self._thread_condition without a timeout.
-               self._call_soon_id = 0
-               # Use OrderedDict in order to emulate the FIFO queue behavior
-               # of the AbstractEventLoop.call_soon method.
-               self._idle_callbacks = OrderedDict()
+               self._call_soon_id = None
+               # Use deque, with thread-safe append, in order to emulate the 
FIFO
+               # queue behavior of the AbstractEventLoop.call_soon method.
+               self._idle_callbacks = collections.deque()
                self._timeout_handlers = {}
                self._timeout_interval = None
                self._default_executor = None
@@ -298,7 +298,10 @@ class EventLoop(object):
                                        events_handled += 1
                                timeouts_checked = True
 
-                               call_soon = prev_call_soon_id != 
self._call_soon_id
+                               call_soon = prev_call_soon_id is not 
self._call_soon_id
+                               if self._call_soon_id is not None and 
self._call_soon_id._cancelled:
+                                       # Allow garbage collection of cancelled 
callback.
+                                       self._call_soon_id = None
 
                                if (not call_soon and not event_handlers
                                        and not events_handled and may_block):
@@ -501,8 +504,9 @@ class EventLoop(object):
 
                @type callback: callable
                @param callback: a function to call
-               @rtype: int
-               @return: an integer ID
+               @return: a handle which can be used to cancel the callback
+                       via the source_remove method
+               @rtype: object
                """
                with self._thread_condition:
                        source_id = self._idle_add(callback, *args)
@@ -511,32 +515,51 @@ class EventLoop(object):
 
        def _idle_add(self, callback, *args):
                """Like idle_add(), but without thread safety."""
-               source_id = self._call_soon_id = self._new_source_id()
-               self._idle_callbacks[source_id] = self._idle_callback_class(
-                       args=args, callback=callback, source_id=source_id)
-               return source_id
+               # Hold self._thread_condition when assigning self._call_soon_id,
+               # since it might be modified via a thread-safe method.
+               with self._thread_condition:
+                       handle = self._call_soon_id = self._idle_callback_class(
+                               _args=args, _callback=callback)
+               # This deque append is thread-safe, but it does *not* notify the
+               # loop's thread, so the caller must notify if appropriate.
+               self._idle_callbacks.append(handle)
+               return handle
 
        def _run_idle_callbacks(self):
                # assumes caller has acquired self._thread_rlock
                if not self._idle_callbacks:
                        return False
                state_change = 0
-               # Iterate of our local list, since self._idle_callbacks can be
-               # modified during the exection of these callbacks.
-               for x in list(self._idle_callbacks.values()):
-                       if x.source_id not in self._idle_callbacks:
-                               # it got cancelled while executing another 
callback
-                               continue
-                       if x.calling:
-                               # don't call it recursively
-                               continue
-                       x.calling = True
-                       try:
-                               if not x.callback(*x.args):
-                                       state_change += 1
-                                       self.source_remove(x.source_id)
-                       finally:
-                               x.calling = False
+               reschedule = []
+               # Use remaining count to avoid calling any newly scheduled 
callbacks,
+               # since self._idle_callbacks can be modified during the 
exection of
+               # these callbacks.
+               remaining = len(self._idle_callbacks)
+               try:
+                       while remaining:
+                               remaining -= 1
+                               try:
+                                       x = self._idle_callbacks.popleft() # 
thread-safe
+                               except IndexError:
+                                       break
+                               if x._cancelled:
+                                       # it got cancelled while executing 
another callback
+                                       continue
+                               if x._calling:
+                                       # don't call it recursively
+                                       continue
+                               x._calling = True
+                               try:
+                                       if x._callback(*x._args):
+                                               reschedule.append(x)
+                                       else:
+                                               x._cancelled = True
+                                               state_change += 1
+                               finally:
+                                       x._calling = False
+               finally:
+                       # Reschedule those that were not cancelled.
+                       self._idle_callbacks.extend(reschedule)
 
                return bool(state_change)
 
@@ -732,6 +755,12 @@ class EventLoop(object):
                is found and removed, and False if the reg_id is invalid or has
                already been removed.
                """
+               if isinstance(reg_id, self._idle_callback_class):
+                       if not reg_id._cancelled:
+                               reg_id._cancelled = True
+                               return True
+                       return False
+
                x = self._child_handlers.pop(reg_id, None)
                if x is not None:
                        if not self._child_handlers and self._use_signal:
@@ -741,9 +770,6 @@ class EventLoop(object):
                        return True
 
                with self._thread_rlock:
-                       idle_callback = self._idle_callbacks.pop(reg_id, None)
-                       if idle_callback is not None:
-                               return True
                        timeout_handler = self._timeout_handlers.pop(reg_id, 
None)
                        if timeout_handler is not None:
                                if timeout_handler.interval == 
self._timeout_interval:

Reply via email to