tfiala created this revision.
tfiala added reviewers: labath, zturner.
tfiala added a subscriber: lldb-commits.

This makes one of the internal process_control tests go green.  It covers the 
case where spawned process P1 itself spawns a child process C1, shared 
stdout/stderr file handles with C1, and then P1 terminates.

Prior to this change, we would end up timing out rather than detecting the 
immediate termination of P1 because we would wait for the stdout/stderr file 
handles to wrap up.

Now we wait on a condition variable that is set via a thread parked on 
subprocess.Popen.wait() and another on subprocess.Popen.communicate().  This 
allows us to catch the scenario above.  There's an additional thread (for the 
thread parked on wait()).  Both the wait() and the communicate() threads wait 
efficiently, so this should be minimal cost. 

http://reviews.llvm.org/D13362

Files:
  test/test_runner/lib/process_control.py
  test/test_runner/test/inferior.py
  test/test_runner/test/process_control_tests.py

Index: test/test_runner/test/process_control_tests.py
===================================================================
--- test/test_runner/test/process_control_tests.py
+++ test/test_runner/test/process_control_tests.py
@@ -14,7 +14,6 @@
 
 # System imports.
 import os
-import platform
 import unittest
 import sys
 import threading
@@ -27,9 +26,11 @@
 
 
 class TestInferiorDriver(process_control.ProcessDriver):
-    def __init__(self, soft_terminate_timeout=None):
+    def __init__(self, soft_terminate_timeout=5.0):
+        # override the default
         super(TestInferiorDriver, self).__init__(
             soft_terminate_timeout=soft_terminate_timeout)
+
         self.started_event = threading.Event()
         self.started_event.clear()
 
@@ -105,10 +106,13 @@
     def test_run_completes_with_code(self):
         """Test that running completes and gets expected stdout/stderr."""
         driver = TestInferiorDriver()
-        driver.run_command(self.inferior_command(options="-r10"))
+        expected_returncode = 10
+        driver.run_command(self.inferior_command(options="-r{}".format(
+            expected_returncode)))
         self.assertTrue(
             driver.completed_event.wait(5), "process failed to complete")
-        self.assertEqual(driver.returncode, 10, "return code does not match")
+        self.assertIsNotNone(driver.returncode)
+        self.assertEqual(driver.returncode, expected_returncode)
 
 
 class ProcessControlTimeoutTests(ProcessControlTests):
@@ -204,27 +208,30 @@
         """
         driver = TestInferiorDriver()
 
+        timeout_seconds = 5
+        return_code = 3
         # Create the inferior (I1), and instruct it to create a child (C1)
         # that shares the stdout/stderr handles with the inferior.
         # C1 will then loop forever.
         driver.run_command_with_timeout(
             self.inferior_command(
-                options="--launch-child-share-handles --return-code 3"),
-            "5s",
+                options="--launch-child-share-handles --return-code {}".format(
+                    return_code)),
+            "{}s".format(timeout_seconds),
             False)
 
         # We should complete without a timetout.  I1 should end
         # immediately after launching C1.
         self.assertTrue(
-            driver.completed_event.wait(5),
+            driver.completed_event.wait(timeout_seconds),
             "process failed to complete")
 
         # Ensure we didn't receive a timeout.
-        self.assertTrue(
+        self.assertFalse(
             driver.was_timeout, "inferior should have completed normally")
 
         self.assertEqual(
-            driver.returncode, 3,
+            driver.returncode, return_code,
             "expected inferior process to end with expected returncode")
 
 
Index: test/test_runner/test/inferior.py
===================================================================
--- test/test_runner/test/inferior.py
+++ test/test_runner/test/inferior.py
@@ -140,4 +140,6 @@
     return options.return_code
 
 if __name__ == "__main__":
-    sys.exit(main(sys.argv[1:]))
+    RETURN_CODE = main(sys.argv[1:])
+    print "returning {}".format(RETURN_CODE)
+    sys.exit(RETURN_CODE)
Index: test/test_runner/lib/process_control.py
===================================================================
--- test/test_runner/lib/process_control.py
+++ test/test_runner/lib/process_control.py
@@ -13,6 +13,7 @@
 """
 
 # System imports
+import datetime
 import os
 import re
 import signal
@@ -21,31 +22,40 @@
 import threading
 
 
-class CommunicatorThread(threading.Thread):
-    """Provides a thread class that communicates with a subprocess."""
-    def __init__(self, process, event, output_file):
-        super(CommunicatorThread, self).__init__()
+class CallAndNotifyThread(threading.Thread):
+    """Provides a thread class that calls a method, then notifies.
+
+    Implements a thread that sits on a synchronous call, then notifies
+    the client of the return and (if an exception was raised while waiting),
+    any exception that occurred.
+    """
+
+    def __init__(self, call_func, completion_func):
+        """Initializes the CommunicatorThread object.
+
+        @param call_func the callable object that should be called as
+        result = call_func()
+
+        @param completion_func the callable object that will be called
+        as completion_func(call_func_result, None) if no exception occurred,
+        or completion_func(None, exception) if an exception occurred.
+        """
+        super(CallAndNotifyThread, self).__init__()
         # Don't let this thread prevent shutdown.
         self.daemon = True
-        self.process = process
-        self.pid = process.pid
-        self.event = event
-        self.output_file = output_file
-        self.output = None
+        self.call_func = call_func
+        self.completion_func = completion_func
 
     def run(self):
         try:
-            # Communicate with the child process.
-            # This will not complete until the child process terminates.
-            self.output = self.process.communicate()
+            # Call the call_func()
+            result = self.call_func()
+            # Synchronous method completed.  Notify client.
+            self.completion_func(result, None)
         except Exception as exception:  # pylint: disable=broad-except
-            if self.output_file:
-                self.output_file.write(
-                    "exception while using communicate() for pid: {}\n".format(
-                        exception))
-        finally:
-            # Signal that the thread's run is complete.
-            self.event.set()
+            # Notify client that we hit an exception while
+            # waiting for our synchronous call to complete.
+            self.completion_func(None, exception)
 
 
 # Provides a regular expression for matching gtimeout-based durations.
@@ -404,21 +414,34 @@
         super(ProcessDriver, self).__init__()
         self.process_helper = ProcessHelper.process_helper()
         self.pid = None
-        # Create the synchronization event for notifying when the
-        # inferior dotest process is complete.
-        self.done_event = threading.Event()
-        self.io_thread = None
+
         self.process = None
+
         # Number of seconds to wait for the soft terminate to
         # wrap up, before moving to more drastic measures.
         # Might want this longer if core dumps are generated and
         # take a long time to write out.
         self.soft_terminate_timeout = soft_terminate_timeout
+
         # Number of seconds to wait for the hard terminate to
         # wrap up, before giving up on the io thread.  This should
         # be fast.
         self.hard_terminate_timeout = 5.0
+
+        # Number of seconds to wait after one of the process
+        # completion events succeeds but we're now waiting on
+        # the other (quite likely wait() succeeded, but stdout/stderr
+        # are not closed, possibly due to child sharing of file
+        # descriptors.  Used during normal (non-timeout) completion
+        # of any of the child process completion events.
+        self.wait_for_all_completion_timeout = 5.0
+
+        # This condition variable protects the following two state variables.
+        self.ending_condition = threading.Condition()
+        self.wait_thread = None
         self.returncode = None
+        self.communicate_thread = None
+        self.output = None
 
     # =============================================
     # Methods for subclasses to override if desired.
@@ -444,23 +467,26 @@
     def run_command(self, command):
         # Start up the child process and the thread that does the
         # communication pump.
-        self._start_process_and_io_thread(command)
+        self._start_process_and_completion_threads(command)
+
+        # Wait for either the wait() or communicate() thread to
+        # complete
+        self._wait_for_any_completion_event()
+        cleanup_result = self._wait_for_all_completion_events(
+            10.0)
+        if not cleanup_result:
+            self._raise_on_incomplete_state()
 
-        # Wait indefinitely for the child process to finish
-        # communicating.  This indicates it has closed stdout/stderr
-        # pipes and is done.
-        self.io_thread.join()
-        self.returncode = self.process.wait()
         if self.returncode is None:
             raise Exception(
                 "no exit status available for pid {} after the "
-                " inferior dotest.py should have completed".format(
+                " child process should have completed".format(
                     self.process.pid))
 
         # Notify of non-timeout exit.
         self.on_process_exited(
             command,
-            self.io_thread.output,
+            self.output,
             False,
             self.returncode)
 
@@ -470,7 +496,7 @@
 
         # Start up the child process and the thread that does the
         # communication pump.
-        self._start_process_and_io_thread(command)
+        self._start_process_and_completion_threads(command)
 
         self._wait_with_timeout(timeout_seconds, command, want_core)
 
@@ -478,20 +504,207 @@
     # Internal details.
     # ================
 
-    def _start_process_and_io_thread(self, command):
-        # Create the process.
+    def _raise_on_incomplete_state(self):
+        do_raise = False
+        self.ending_condition.acquire()
+        try:
+            if self.output is not None:
+                communicate_state = "completed"
+            else:
+                communicate_state = "incomplete"
+                do_raise = True
+            if self.returncode is not None:
+                wait_state = "completed"
+            else:
+                wait_state = "incomplete"
+                do_raise = True
+        finally:
+            self.ending_condition.release()
+
+        if do_raise:
+            raise Exception(
+                "failed waiting on one of wait() and communicate(): "
+                "communicate()={}, wait()={}".format(
+                    communicate_state, wait_state))
+
+    def _wait_completed(self, result, exception):
+        """Notifies that the wait() on the child process completed.
+
+        result will be non-None if there was no exception caught
+        while calling the method; otherwise, result will be None
+        and exception will be non-None, indicating the exception
+        thrown.
+
+        @param result the result of calling subprocess.Popen.wait().
+        This will be the returncode from wait(), unless an exception
+        occurred.  In that case, it will be None.
+
+        @param exception specifies the exception that occurred while
+        calling wait(); otherwise, None on normal operation.
+        """
+        # Report any errors.
+        if exception is not None:
+            self.write("caught exception during wait() on process: {}".format(
+                exception))
+
+        # Grab the condition lock.
+        self.ending_condition.acquire()
+
+        try:
+            # Update our state.
+            if exception is not None:
+                # Save it as the returncode.  We'll check for
+                # exceptions when the condition variable is triggered.
+                # This allows us to wrap up, but also know we totally
+                # failed to reap the process (along with why).
+                self.returncode = exception
+            else:
+                self.returncode = result
+
+            # Notify that we have new state.
+            self.ending_condition.notifyAll()
+        finally:
+            # And release the lock.
+            self.ending_condition.release()
+
+    def _communicate_completed(self, output, exception):
+        """Notifies that the communicate() on the child process completed.
+
+        @param output the (stdout, stderr) from the child process; None
+        if an exception occurred while waiting for communicate() to return.
+
+        @param exception specifies the exception that occurred while
+        calling wait(); otherwise, None on normal operation with communicate()
+        completing successfully.
+        """
+        # Report any errors.
+        if exception is not None:
+            self.write(
+                "caught exception during communicate() on process: {}".format(
+                    exception))
+
+        # Grab the condition lock.
+        self.ending_condition.acquire()
+
+        try:
+            # Update our state.
+            if exception is None:
+                if output is None:
+                    # This will create some problems as we're
+                    # expecting self.output to have something (which
+                    # can be a tuple with empty strings, or an exception).
+                    # Note the issue, as something went awry if we
+                    # didn't get an exception *and* the return value
+                    # was None.  We'll later use self.output is None
+                    # to validate that the communicate() thread is
+                    # still doing work.
+                    output = ('', '')
+                    self.write(
+                        "unexpected: communicate() completed but return value "
+                        "was None, replacing with ('', '')")
+                self.output = output
+            else:
+                self.output = exception
+
+            # Notify that we have new state.
+            self.ending_condition.notifyAll()
+        finally:
+            # And release the lock.
+            self.ending_condition.release()
+
+    def _wait_for_any_completion_event(self, timeout=None):
+        """Waits for either wait() or commuicate() to complete, or timeout.
+
+        @param timeout float representing the number of seconds to wait for
+        one of the completion events to occur.  May be None if no timeout
+        is desired, in which case it will wait indefinitely for one of them
+        to occur.
+
+        @return True if either of the completion events triggered the return,
+        None if the timeout was reached first.
+        """
+        if timeout:
+            end_time = datetime.datetime.now() + datetime.timedelta(0, timeout)
+        else:
+            end_time = None
+
+        # Do the following while we haven't timed out (or don't have a timeout)
+        self.ending_condition.acquire()
+        try:
+            # While we haven't timed out, and none of the completion
+            # events have occured, we'll wait.
+            while ((not end_time or datetime.datetime.now() < end_time) and
+                   (self.returncode is None) and (self.output is None)):
+                # Wait with a timeout. The reason for the timeout on
+                # this wait is to support our overall child process
+                # timeout mechanism. We need to be able to kill this
+                # thing if it isn't ending. The fidelity of this timer
+                # only needs to be good enough to catch a timeout
+                # issue.
+                self.ending_condition.wait(5.0)
+            return (self.returncode is not None) or (self.output is not None)
+        finally:
+            # Make sure we release the lock.
+            self.ending_condition.release()
+
+    def _wait_for_all_completion_events(self, timeout):
+        """Given that we received at least one of the completion events,
+        wait for the remaining.
+
+        The intent of this method is to provide a short, graceful collection
+        of the completion event that has not yet occurred.  e.g. if the
+        wait() returns but the communicate() hasn't fully drained the read
+        side of the pipe, we need the communicate to wrap up cleanly in order
+        to parse output properly.  The timeout should be short here, just
+        a few seconds should do.
+
+        Unlike _wait_for_any_completion_event(), this call requires a
+        timeout.
+
+        @timeout a float indicating the maximum amount of time to wait for
+        the remaining completion event.
+
+        @return True if all completion events are now complete; False
+        otherwise (i.e. a timeout occurred).
+        """
+        # Do the following while we haven't timed out (or don't have a timeout)
+        end_time = datetime.datetime.now() + datetime.timedelta(0, timeout)
+
+        self.ending_condition.acquire()
+        try:
+            # While we haven't timed out, and either of the completion
+            # events have not yet occurred, we'll wait.
+            while ((datetime.datetime.now() < end_time) and
+                   ((self.returncode is None) or (self.output is None))):
+                # Wait with a timeout. The reason for the timeout on
+                # this wait is to support our overall child process
+                # timeout mechanism. We need to be able to kill this
+                # thing if it isn't ending. The fidelity of this timer
+                # only needs to be good enough to catch a timeout
+                # issue.
+                self.ending_condition.wait(1.0)
+            return (self.returncode is not None) and (self.output is not None)
+        finally:
+            # Make sure we release the lock.
+            self.ending_condition.release()
+
+    def _start_process_and_completion_threads(self, command):
+        # Create and start the process.
         self.process = self.process_helper.create_piped_process(command)
         self.pid = self.process.pid
         self.on_process_started()
 
-        # Ensure the event is cleared that is used for signaling
-        # from the communication() thread when communication is
-        # complete (i.e. the inferior process has finished).
-        self.done_event.clear()
+        # Create the thread that waits on Popen.communicate()
+        self.communicate_thread = CallAndNotifyThread(
+            self.process.communicate,
+            self._communicate_completed)
+        self.communicate_thread.start()
 
-        self.io_thread = CommunicatorThread(
-            self.process, self.done_event, self.write)
-        self.io_thread.start()
+        # Create the thread that waits on Popen.wait()
+        self.wait_thread = CallAndNotifyThread(
+            self.process.wait,
+            self._wait_completed)
+        self.wait_thread.start()
 
     def _attempt_soft_kill(self, want_core):
         # The inferior dotest timed out.  Attempt to clean it
@@ -503,23 +716,14 @@
             want_core=want_core,
             log_file=self)
 
-        # Now wait up to a certain timeout period for the io thread
-        # to say that the communication ended.  If that wraps up
-        # within our soft terminate timeout, we're all done here.
-        self.io_thread.join(self.soft_terminate_timeout)
-        if not self.io_thread.is_alive():
-            # stdout/stderr were closed on the child process side. We
-            # should be able to wait and reap the child process here.
-            self.returncode = self.process.wait()
-            # We terminated, and the done_trying result is n/a
-            terminated = True
-            done_trying = None
-        else:
+        # This will wait for all completion events to complete.
+        terminated = self._wait_for_all_completion_events(
+            self.soft_terminate_timeout)
+        if not terminated:
             self.write("soft kill attempt of process {} timed out "
                        "after {} seconds\n".format(
                            self.process.pid, self.soft_terminate_timeout))
-            terminated = False
-            done_trying = False
+        done_trying = False
         return terminated, done_trying
 
     def _attempt_hard_kill(self):
@@ -529,27 +733,11 @@
             self.process,
             log_file=self)
 
-        # Reap the child process.  This should not hang as the
-        # hard_kill() mechanism is supposed to really kill it.
-        # Improvement option:
-        # If this does ever hang, convert to a self.process.poll()
-        # loop checking on self.process.returncode until it is not
-        # None or the timeout occurs.
-        self.returncode = self.process.wait()
-
-        # Wait a few moments for the io thread to finish...
-        self.io_thread.join(self.hard_terminate_timeout)
-        if self.io_thread.is_alive():
-            # ... but this is not critical if it doesn't end for some
-            # reason.
-            self.write(
-                "hard kill of process {} timed out after {} seconds waiting "
-                "for the io thread (ignoring)\n".format(
-                    self.process.pid, self.hard_terminate_timeout))
+        # This will wait for all completion events to complete.
+        terminated = self._wait_for_all_completion_events(
+            self.hard_terminate_timeout)
 
-        # Set if it terminated.  (Set up for optional improvement above).
-        terminated = self.returncode is not None
-        # Nothing else to try.
+        # Nothing else to try if this didn't work.
         done_trying = True
 
         return terminated, done_trying
@@ -565,7 +753,9 @@
                 return self._attempt_hard_kill()
             else:
                 # We don't have anything else to try.
-                terminated = self.returncode is not None
+                terminated = (
+                    self.returncode is not None and
+                    self.output is not None)
                 done_trying = True
                 return terminated, done_trying
         else:
@@ -575,20 +765,43 @@
                 return self._attempt_hard_kill()
             else:
                 # We don't have anything else to try.
-                terminated = self.returncode is not None
+                terminated = (
+                    self.returncode is not None and
+                    self.output is not None)
                 done_trying = True
                 return terminated, done_trying
 
     def _wait_with_timeout(self, timeout_seconds, command, want_core):
         # Allow up to timeout seconds for the io thread to wrap up.
         # If that completes, the child process should be done.
-        completed_normally = self.done_event.wait(timeout_seconds)
-        if completed_normally:
-            # Reap the child process here.
-            self.returncode = self.process.wait()
+        one_event_received = self._wait_for_any_completion_event(
+            timeout=timeout_seconds)
+        if one_event_received:
+            # Make sure we get the other completion event.  At most
+            # this should need to wait a few seconds for output or reaping
+            # to complete.  If longer than that, we may have the
+            # process group leader dead but with shared out stdout/stderr
+            # pipes.
+            all_events_received = self._wait_for_all_completion_events(
+                self.wait_for_all_completion_timeout)
         else:
-            # Prepare to stop the process
-            process_terminated = completed_normally
+            all_events_received = False
+
+        if not all_events_received:
+            # Prepare to stop the process/process group.  We're doing
+            # this because one of two cases happened:
+            # 1) the child process launched is still running (the
+            #    obvious case)
+            # or
+            # 2) the child process finished, but it shared out its
+            #    stdout/stderr with a child it spawned, which is still
+            #    running.
+            #
+            # In case 2, the process itself is done, but we need to
+            # terminate its children.  As long as we're using
+            # process groups, we'll be fine terminating the whole
+            # process group.
+            process_terminated = False
             terminate_attempt_count = 0
 
             # Try as many attempts as we support for trying to shut down
@@ -608,6 +821,8 @@
         # attempts, or we failed but gave it our best effort.
         self.on_process_exited(
             command,
-            self.io_thread.output,
-            not completed_normally,
+            self.output,
+            # We were a timeout if we never received at least one of
+            # the wait()/communicate() events before the timeout.
+            not one_event_received,
             self.returncode)
_______________________________________________
lldb-commits mailing list
lldb-commits@lists.llvm.org
http://lists.llvm.org/cgi-bin/mailman/listinfo/lldb-commits

Reply via email to