tfiala updated this revision to Diff 34089.
tfiala added a comment.

Specify tight queue sizes for the job description queue and the job results 
queue.

This *might* stop unintentional queue blocking when adding items to the queue 
if they were sized too small by default.  Might be a possible difference 
between Windows and OS X/Linux.


http://reviews.llvm.org/D12651

Files:
  test/dosep.py

Index: test/dosep.py
===================================================================
--- test/dosep.py
+++ test/dosep.py
@@ -36,8 +36,10 @@
 import os
 import fnmatch
 import platform
+import Queue
 import re
 import dotest_args
+import signal
 import subprocess
 import sys
 
@@ -142,7 +144,7 @@
     return passes, failures, unexpected_successes
 
 
-def call_with_timeout(command, timeout, name):
+def call_with_timeout(command, timeout, name, inferior_pid_events):
     """Run command with a timeout if possible."""
     """-s QUIT will create a coredump if they are enabled on your system"""
     process = None
@@ -161,8 +163,12 @@
                                    stdin=subprocess.PIPE,
                                    stdout=subprocess.PIPE,
                                    stderr=subprocess.PIPE)
+    inferior_pid = process.pid
+    inferior_pid_events.put_nowait(('created', inferior_pid))
     output = process.communicate()
     exit_status = process.returncode
+    inferior_pid_events.put_nowait(('destroyed', inferior_pid))
+
     passes, failures, unexpected_successes = parse_test_results(output)
     if exit_status == 0:
         # stdout does not have any useful information from 'dotest.py',
@@ -173,7 +179,7 @@
     return name, exit_status, passes, failures, unexpected_successes
 
 
-def process_dir(root, files, test_root, dotest_argv):
+def process_dir(root, files, test_root, dotest_argv, inferior_pid_events):
     """Examine a directory for tests, and invoke any found within it."""
     results = []
     for name in files:
@@ -187,7 +193,8 @@
         timeout = (os.getenv("LLDB_%s_TIMEOUT" % timeout_name) or
                    getDefaultTimeout(dotest_options.lldb_platform_name))
 
-        results.append(call_with_timeout(command, timeout, name))
+        results.append(call_with_timeout(
+            command, timeout, name, inferior_pid_events))
 
     # result = (name, status, passes, failures, unexpected_successes)
     timed_out = [name for name, status, _, _, _ in results
@@ -208,10 +215,73 @@
 out_q = None
 
 
-def process_dir_worker(arg_tuple):
+def process_dir_worker(a_output_lock, a_test_counter, a_total_tests,
+                       a_test_name_len, a_dotest_options, job_queue,
+                       result_queue, inferior_pid_events):
     """Worker thread main loop when in multithreaded mode.
     Takes one directory specification at a time and works on it."""
-    return process_dir(*arg_tuple)
+
+    # Shut off interrupt handling in the child process.
+    signal.signal(signal.SIGINT, signal.SIG_IGN)
+
+    # Setup the global state for the worker process.
+    setup_global_variables(
+        a_output_lock, a_test_counter, a_total_tests, a_test_name_len,
+        a_dotest_options)
+
+    # Keep grabbing entries from the queue until done.
+    while not job_queue.empty():
+        try:
+            job = job_queue.get(block=False)
+            result = process_dir(job[0], job[1], job[2], job[3],
+                                 inferior_pid_events)
+            result_queue.put(result)
+        except Queue.Empty:
+            # Fine, we're done.
+            pass
+
+
+def collect_active_pids_from_pid_events(event_queue):
+    """
+    Returns the set of what should be active inferior pids based on
+    the event stream.
+
+    @param event_queue a multiprocessing.Queue containing events of the
+    form:
+         ('created', pid)
+         ('destroyed', pid)
+
+    @return set of inferior dotest.py pids activated but never completed.
+    """
+    active_pid_set = set()
+    while not event_queue.empty():
+        pid_event = event_queue.get_nowait()
+        if pid_event[0] == 'created':
+            active_pid_set.add(pid_event[1])
+        elif pid_event[0] == 'destroyed':
+            active_pid_set.remove(pid_event[1])
+    return active_pid_set
+
+
+def kill_all_worker_processes(workers, inferior_pid_events):
+    """
+    Kills all specified worker processes and their process tree.
+
+    @param workers a list of multiprocess.Process worker objects.
+    @param inferior_pid_events a multiprocess.Queue that contains
+    all inferior create and destroy events.  Used to construct
+    the list of child pids still outstanding that need to be killed.
+    """
+    for worker in workers:
+        worker.terminate()
+        worker.join()
+
+    # Add all the child test pids created.
+    active_pid_set = collect_active_pids_from_pid_events(
+        inferior_pid_events)
+    for inferior_pid in active_pid_set:
+        print "killing inferior pid {}".format(inferior_pid)
+        os.kill(inferior_pid, signal.SIGKILL)
 
 
 def walk_and_invoke(test_directory, test_subdir, dotest_argv, num_threads):
@@ -236,7 +306,8 @@
             return name.startswith("Test") and name.endswith(".py")
 
         tests = filter(is_test, files)
-        test_work_items.append((root, tests, test_directory, dotest_argv))
+        test_work_items.append(
+            (root, tests, test_directory, dotest_argv, None))
 
     global output_lock, test_counter, total_tests, test_name_len
     output_lock = multiprocessing.RLock()
@@ -251,12 +322,65 @@
     # Run the items, either in a pool (for multicore speedup) or
     # calling each individually.
     if num_threads > 1:
-        pool = multiprocessing.Pool(
-            num_threads,
-            initializer=setup_global_variables,
-            initargs=(output_lock, test_counter, total_tests, test_name_len,
-                      dotest_options))
-        test_results = pool.map(process_dir_worker, test_work_items)
+        # Create jobs.
+        job_queue = multiprocessing.Queue(len(test_work_items))
+        for test_work_item in test_work_items:
+            job_queue.put(test_work_item)
+
+        result_queue = multiprocessing.Queue(len(test_work_items))
+
+        # Create queues for started child pids.  Terminating
+        # the multiprocess processes does not terminate the
+        # child processes they spawn.  We can remove this tracking
+        # if/when we move to having the multiprocess process directly
+        # perform the test logic.  The Queue size needs to be able to
+        # hold 2 * (num inferior dotest.py processes started) entries.
+        inferior_pid_events = multiprocessing.Queue(4096)
+
+        # Create workers.  We don't use multiprocessing.Pool due to
+        # challenges with handling ^C keyboard interrupts.
+        workers = []
+        for _ in range(num_threads):
+            worker = multiprocessing.Process(
+                target=process_dir_worker,
+                args=(output_lock,
+                      test_counter,
+                      total_tests,
+                      test_name_len,
+                      dotest_options,
+                      job_queue,
+                      result_queue,
+                      inferior_pid_events))
+            worker.start()
+            workers.append(worker)
+
+        # Wait for all workers to finish, handling ^C as needed.
+        try:
+            for worker in workers:
+                worker.join()
+        except KeyboardInterrupt:
+            # First try to drain the queue of work and let the
+            # running tests complete.
+            while not job_queue.empty():
+                try:
+                    # Just drain it to stop more work from being started.
+                    job_queue.get_nowait()
+                except Queue.Empty:
+                    pass
+
+            print ('\nFirst KeyboardInterrupt received, stopping '
+                   'future work.  Press again to hard-stop existing tests.')
+            try:
+                for worker in workers:
+                    worker.join()
+            except KeyboardInterrupt:
+                print ('\nSecond KeyboardInterrupt received, killing '
+                       'all worker process trees.')
+                kill_all_worker_processes(workers, inferior_pid_events)
+
+        test_results = []
+        while not result_queue.empty():
+            test_results.append(result_queue.get(block=False))
     else:
         test_results = map(process_dir_worker, test_work_items)
 
_______________________________________________
lldb-commits mailing list
lldb-commits@lists.llvm.org
http://lists.llvm.org/cgi-bin/mailman/listinfo/lldb-commits

Reply via email to