commit:     d389b3b378d88b8c41dfaba2a90bc9643a9ba99e
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Thu Mar  5 06:46:26 2020 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Thu Mar  5 08:06:19 2020 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=d389b3b3

Scheduler: use add_done_callback (bug 711322)

Use add_done_callback instead of addExistListener, in order to avoid
callback races like the SequentialTaskQueue exit listener race that
triggered bug 711322. The addExistListener method is prone to races
because its listeners are executed in quick succession. In contrast,
callbacks scheduled via add_done_callback are placed in a fifo
queue, ensuring that they execute in an order that is unsurprising
relative to other callbacks.

Bug: https://bugs.gentoo.org/711322
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>

 lib/_emerge/Scheduler.py | 36 ++++++++++++++++++++++--------------
 1 file changed, 22 insertions(+), 14 deletions(-)

diff --git a/lib/_emerge/Scheduler.py b/lib/_emerge/Scheduler.py
index 98eaf3bcc..98bc789ff 100644
--- a/lib/_emerge/Scheduler.py
+++ b/lib/_emerge/Scheduler.py
@@ -1,9 +1,10 @@
-# Copyright 1999-2019 Gentoo Authors
+# Copyright 1999-2020 Gentoo Authors
 # Distributed under the terms of the GNU General Public License v2
 
 from __future__ import division, print_function, unicode_literals
 
 from collections import deque
+import functools
 import gc
 import gzip
 import logging
@@ -1259,11 +1260,13 @@ class Scheduler(PollScheduler):
                                child not in completed_tasks:
                                unsatisfied.add(child)
 
-       def _merge_wait_exit_handler(self, task):
+       def _merge_wait_exit_handler(self, task, future):
+               future.cancelled() or future.result()
                self._merge_wait_scheduled.remove(task)
-               self._merge_exit(task)
+               self._merge_exit(task, future)
 
-       def _merge_exit(self, merge):
+       def _merge_exit(self, merge, future):
+               future.cancelled() or future.result()
                self._running_tasks.pop(id(merge), None)
                self._do_merge_exit(merge)
                self._deallocate_config(merge.merge.settings)
@@ -1327,7 +1330,8 @@ class Scheduler(PollScheduler):
                        del mtimedb["resume"]
                mtimedb.commit()
 
-       def _build_exit(self, build):
+       def _build_exit(self, build, future):
+               future.cancelled() or future.result()
                self._running_tasks.pop(id(build), None)
                if build.returncode == os.EX_OK and self._terminated_tasks:
                        # We've been interrupted, so we won't
@@ -1336,7 +1340,7 @@ class Scheduler(PollScheduler):
                        self._deallocate_config(build.settings)
                elif build.returncode == os.EX_OK:
                        self.curval += 1
-                       merge = PackageMerge(merge=build)
+                       merge = PackageMerge(merge=build, 
scheduler=self._sched_iface)
                        self._running_tasks[id(merge)] = merge
                        if not build.build_opts.buildpkgonly and \
                                build.pkg in self._deep_system_deps:
@@ -1345,8 +1349,8 @@ class Scheduler(PollScheduler):
                                self._merge_wait_queue.append(merge)
                                
merge.addStartListener(self._system_merge_started)
                        else:
-                               merge.addExitListener(self._merge_exit)
                                self._task_queues.merge.add(merge)
+                               
merge.async_wait().add_done_callback(functools.partial(self._merge_exit, merge))
                                self._status_display.merges = 
len(self._task_queues.merge)
                else:
                        settings = build.settings
@@ -1365,8 +1369,9 @@ class Scheduler(PollScheduler):
                self._status_display.running = self._jobs
                self._schedule()
 
-       def _extract_exit(self, build):
-               self._build_exit(build)
+       def _extract_exit(self, build, future):
+               future.cancelled() or future.result()
+               self._build_exit(build, future)
 
        def _task_complete(self, pkg):
                self._completed_tasks.add(pkg)
@@ -1580,9 +1585,10 @@ class Scheduler(PollScheduler):
                        if (self._merge_wait_queue and not self._jobs and
                                not self._task_queues.merge):
                                task = self._merge_wait_queue.popleft()
-                               
task.addExitListener(self._merge_wait_exit_handler)
+                               task.scheduler = self._sched_iface
                                self._merge_wait_scheduled.append(task)
                                self._task_queues.merge.add(task)
+                               
task.async_wait().add_done_callback(functools.partial(self._merge_wait_exit_handler,
 task))
                                self._status_display.merges = 
len(self._task_queues.merge)
                                state_change += 1
 
@@ -1699,26 +1705,28 @@ class Scheduler(PollScheduler):
                        task = self._task(pkg)
 
                        if pkg.installed:
-                               merge = PackageMerge(merge=task)
+                               merge = PackageMerge(merge=task, 
scheduler=self._sched_iface)
                                self._running_tasks[id(merge)] = merge
-                               merge.addExitListener(self._merge_exit)
                                self._task_queues.merge.addFront(merge)
+                               
merge.async_wait().add_done_callback(functools.partial(self._merge_exit, merge))
 
                        elif pkg.built:
                                self._jobs += 1
                                self._previous_job_start_time = time.time()
                                self._status_display.running = self._jobs
                                self._running_tasks[id(task)] = task
-                               task.addExitListener(self._extract_exit)
+                               task.scheduler = self._sched_iface
                                self._task_queues.jobs.add(task)
+                               
task.async_wait().add_done_callback(functools.partial(self._extract_exit, task))
 
                        else:
                                self._jobs += 1
                                self._previous_job_start_time = time.time()
                                self._status_display.running = self._jobs
                                self._running_tasks[id(task)] = task
-                               task.addExitListener(self._build_exit)
+                               task.scheduler = self._sched_iface
                                self._task_queues.jobs.add(task)
+                               
task.async_wait().add_done_callback(functools.partial(self._build_exit, task))
 
                return bool(state_change)
 

Reply via email to