commit: 9772f8f2a58a858a80ad1542d1ce46193616be67
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Mon Apr 16 23:55:14 2018 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Tue Apr 17 00:49:09 2018 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=9772f8f2
EventLoop._idle_add: use thread-safe deque append
This fixes previously unsafe usage of self._idle_callbacks when it was
a dictionary. The deque append is thread-safe, but it does *not* notify
the loop's thread, so the caller must notify if appropriate.
Fixes: 1ee8971ba1cb ("EventLoop: eliminate thread safety from call_soon")
pym/portage/util/_eventloop/EventLoop.py | 90 ++++++++++++++++++++------------
1 file changed, 58 insertions(+), 32 deletions(-)
diff --git a/pym/portage/util/_eventloop/EventLoop.py
b/pym/portage/util/_eventloop/EventLoop.py
index ae5a0a70a..d4f20c6ed 100644
--- a/pym/portage/util/_eventloop/EventLoop.py
+++ b/pym/portage/util/_eventloop/EventLoop.py
@@ -3,6 +3,7 @@
from __future__ import division
+import collections
import errno
import functools
import logging
@@ -30,7 +31,6 @@ portage.proxy.lazyimport.lazyimport(globals(),
'portage.util.futures.unix_events:_PortageEventLoop,_PortageChildWatcher',
)
-from portage import OrderedDict
from portage.util import writemsg_level
from portage.util.monotonic import monotonic
from ..SlotObject import SlotObject
@@ -55,7 +55,7 @@ class EventLoop(object):
__slots__ = ("callback", "data", "pid", "source_id")
class _idle_callback_class(SlotObject):
- __slots__ = ("args", "callback", "calling", "source_id")
+ __slots__ = ("_args", "_callback", "_calling", "_cancelled")
class _io_handler_class(SlotObject):
__slots__ = ("args", "callback", "f", "source_id")
@@ -141,10 +141,10 @@ class EventLoop(object):
# If this attribute has changed since the last time that the
# call_soon callbacks have been called, then it's not safe to
# wait on self._thread_condition without a timeout.
- self._call_soon_id = 0
- # Use OrderedDict in order to emulate the FIFO queue behavior
- # of the AbstractEventLoop.call_soon method.
- self._idle_callbacks = OrderedDict()
+ self._call_soon_id = None
+ # Use deque, with thread-safe append, in order to emulate the
FIFO
+ # queue behavior of the AbstractEventLoop.call_soon method.
+ self._idle_callbacks = collections.deque()
self._timeout_handlers = {}
self._timeout_interval = None
self._default_executor = None
@@ -298,7 +298,10 @@ class EventLoop(object):
events_handled += 1
timeouts_checked = True
- call_soon = prev_call_soon_id !=
self._call_soon_id
+ call_soon = prev_call_soon_id is not
self._call_soon_id
+ if self._call_soon_id is not None and
self._call_soon_id._cancelled:
+ # Allow garbage collection of cancelled
callback.
+ self._call_soon_id = None
if (not call_soon and not event_handlers
and not events_handled and may_block):
@@ -501,8 +504,9 @@ class EventLoop(object):
@type callback: callable
@param callback: a function to call
- @rtype: int
- @return: an integer ID
+ @return: a handle which can be used to cancel the callback
+ via the source_remove method
+ @rtype: object
"""
with self._thread_condition:
source_id = self._idle_add(callback, *args)
@@ -511,32 +515,51 @@ class EventLoop(object):
def _idle_add(self, callback, *args):
"""Like idle_add(), but without thread safety."""
- source_id = self._call_soon_id = self._new_source_id()
- self._idle_callbacks[source_id] = self._idle_callback_class(
- args=args, callback=callback, source_id=source_id)
- return source_id
+ # Hold self._thread_condition when assigning self._call_soon_id,
+ # since it might be modified via a thread-safe method.
+ with self._thread_condition:
+ handle = self._call_soon_id = self._idle_callback_class(
+ _args=args, _callback=callback)
+ # This deque append is thread-safe, but it does *not* notify the
+ # loop's thread, so the caller must notify if appropriate.
+ self._idle_callbacks.append(handle)
+ return handle
def _run_idle_callbacks(self):
# assumes caller has acquired self._thread_rlock
if not self._idle_callbacks:
return False
state_change = 0
- # Iterate of our local list, since self._idle_callbacks can be
- # modified during the exection of these callbacks.
- for x in list(self._idle_callbacks.values()):
- if x.source_id not in self._idle_callbacks:
- # it got cancelled while executing another
callback
- continue
- if x.calling:
- # don't call it recursively
- continue
- x.calling = True
- try:
- if not x.callback(*x.args):
- state_change += 1
- self.source_remove(x.source_id)
- finally:
- x.calling = False
+ reschedule = []
+ # Use remaining count to avoid calling any newly scheduled
callbacks,
+ # since self._idle_callbacks can be modified during the
exection of
+ # these callbacks.
+ remaining = len(self._idle_callbacks)
+ try:
+ while remaining:
+ remaining -= 1
+ try:
+ x = self._idle_callbacks.popleft() #
thread-safe
+ except IndexError:
+ break
+ if x._cancelled:
+ # it got cancelled while executing
another callback
+ continue
+ if x._calling:
+ # don't call it recursively
+ continue
+ x._calling = True
+ try:
+ if x._callback(*x._args):
+ reschedule.append(x)
+ else:
+ x._cancelled = True
+ state_change += 1
+ finally:
+ x._calling = False
+ finally:
+ # Reschedule those that were not cancelled.
+ self._idle_callbacks.extend(reschedule)
return bool(state_change)
@@ -732,6 +755,12 @@ class EventLoop(object):
is found and removed, and False if the reg_id is invalid or has
already been removed.
"""
+ if isinstance(reg_id, self._idle_callback_class):
+ if not reg_id._cancelled:
+ reg_id._cancelled = True
+ return True
+ return False
+
x = self._child_handlers.pop(reg_id, None)
if x is not None:
if not self._child_handlers and self._use_signal:
@@ -741,9 +770,6 @@ class EventLoop(object):
return True
with self._thread_rlock:
- idle_callback = self._idle_callbacks.pop(reg_id, None)
- if idle_callback is not None:
- return True
timeout_handler = self._timeout_handlers.pop(reg_id,
None)
if timeout_handler is not None:
if timeout_handler.interval ==
self._timeout_interval: