tfiala updated this revision to Diff 34089.
tfiala added a comment.
Specify tight queue sizes for the job description queue and the job results
queue.
This *might* stop unintentional queue blocking when adding items to the queue
if they were sized too small by default. Might be a possible difference
between Windows and OS X/Linux.
http://reviews.llvm.org/D12651
Files:
test/dosep.py
Index: test/dosep.py
===================================================================
--- test/dosep.py
+++ test/dosep.py
@@ -36,8 +36,10 @@
import os
import fnmatch
import platform
+import Queue
import re
import dotest_args
+import signal
import subprocess
import sys
@@ -142,7 +144,7 @@
return passes, failures, unexpected_successes
-def call_with_timeout(command, timeout, name):
+def call_with_timeout(command, timeout, name, inferior_pid_events):
"""Run command with a timeout if possible."""
"""-s QUIT will create a coredump if they are enabled on your system"""
process = None
@@ -161,8 +163,12 @@
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
+ inferior_pid = process.pid
+ inferior_pid_events.put_nowait(('created', inferior_pid))
output = process.communicate()
exit_status = process.returncode
+ inferior_pid_events.put_nowait(('destroyed', inferior_pid))
+
passes, failures, unexpected_successes = parse_test_results(output)
if exit_status == 0:
# stdout does not have any useful information from 'dotest.py',
@@ -173,7 +179,7 @@
return name, exit_status, passes, failures, unexpected_successes
-def process_dir(root, files, test_root, dotest_argv):
+def process_dir(root, files, test_root, dotest_argv, inferior_pid_events):
"""Examine a directory for tests, and invoke any found within it."""
results = []
for name in files:
@@ -187,7 +193,8 @@
timeout = (os.getenv("LLDB_%s_TIMEOUT" % timeout_name) or
getDefaultTimeout(dotest_options.lldb_platform_name))
- results.append(call_with_timeout(command, timeout, name))
+ results.append(call_with_timeout(
+ command, timeout, name, inferior_pid_events))
# result = (name, status, passes, failures, unexpected_successes)
timed_out = [name for name, status, _, _, _ in results
@@ -208,10 +215,73 @@
out_q = None
-def process_dir_worker(arg_tuple):
+def process_dir_worker(a_output_lock, a_test_counter, a_total_tests,
+ a_test_name_len, a_dotest_options, job_queue,
+ result_queue, inferior_pid_events):
"""Worker thread main loop when in multithreaded mode.
Takes one directory specification at a time and works on it."""
- return process_dir(*arg_tuple)
+
+ # Shut off interrupt handling in the child process.
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
+
+ # Setup the global state for the worker process.
+ setup_global_variables(
+ a_output_lock, a_test_counter, a_total_tests, a_test_name_len,
+ a_dotest_options)
+
+ # Keep grabbing entries from the queue until done.
+ while not job_queue.empty():
+ try:
+ job = job_queue.get(block=False)
+ result = process_dir(job[0], job[1], job[2], job[3],
+ inferior_pid_events)
+ result_queue.put(result)
+ except Queue.Empty:
+ # Fine, we're done.
+ pass
+
+
+def collect_active_pids_from_pid_events(event_queue):
+ """
+ Returns the set of what should be active inferior pids based on
+ the event stream.
+
+ @param event_queue a multiprocessing.Queue containing events of the
+ form:
+ ('created', pid)
+ ('destroyed', pid)
+
+ @return set of inferior dotest.py pids activated but never completed.
+ """
+ active_pid_set = set()
+ while not event_queue.empty():
+ pid_event = event_queue.get_nowait()
+ if pid_event[0] == 'created':
+ active_pid_set.add(pid_event[1])
+ elif pid_event[0] == 'destroyed':
+ active_pid_set.remove(pid_event[1])
+ return active_pid_set
+
+
+def kill_all_worker_processes(workers, inferior_pid_events):
+ """
+ Kills all specified worker processes and their process tree.
+
+ @param workers a list of multiprocess.Process worker objects.
+ @param inferior_pid_events a multiprocess.Queue that contains
+ all inferior create and destroy events. Used to construct
+ the list of child pids still outstanding that need to be killed.
+ """
+ for worker in workers:
+ worker.terminate()
+ worker.join()
+
+ # Add all the child test pids created.
+ active_pid_set = collect_active_pids_from_pid_events(
+ inferior_pid_events)
+ for inferior_pid in active_pid_set:
+ print "killing inferior pid {}".format(inferior_pid)
+ os.kill(inferior_pid, signal.SIGKILL)
def walk_and_invoke(test_directory, test_subdir, dotest_argv, num_threads):
@@ -236,7 +306,8 @@
return name.startswith("Test") and name.endswith(".py")
tests = filter(is_test, files)
- test_work_items.append((root, tests, test_directory, dotest_argv))
+ test_work_items.append(
+ (root, tests, test_directory, dotest_argv, None))
global output_lock, test_counter, total_tests, test_name_len
output_lock = multiprocessing.RLock()
@@ -251,12 +322,65 @@
# Run the items, either in a pool (for multicore speedup) or
# calling each individually.
if num_threads > 1:
- pool = multiprocessing.Pool(
- num_threads,
- initializer=setup_global_variables,
- initargs=(output_lock, test_counter, total_tests, test_name_len,
- dotest_options))
- test_results = pool.map(process_dir_worker, test_work_items)
+ # Create jobs.
+ job_queue = multiprocessing.Queue(len(test_work_items))
+ for test_work_item in test_work_items:
+ job_queue.put(test_work_item)
+
+ result_queue = multiprocessing.Queue(len(test_work_items))
+
+ # Create queues for started child pids. Terminating
+ # the multiprocess processes does not terminate the
+ # child processes they spawn. We can remove this tracking
+ # if/when we move to having the multiprocess process directly
+ # perform the test logic. The Queue size needs to be able to
+ # hold 2 * (num inferior dotest.py processes started) entries.
+ inferior_pid_events = multiprocessing.Queue(4096)
+
+ # Create workers. We don't use multiprocessing.Pool due to
+ # challenges with handling ^C keyboard interrupts.
+ workers = []
+ for _ in range(num_threads):
+ worker = multiprocessing.Process(
+ target=process_dir_worker,
+ args=(output_lock,
+ test_counter,
+ total_tests,
+ test_name_len,
+ dotest_options,
+ job_queue,
+ result_queue,
+ inferior_pid_events))
+ worker.start()
+ workers.append(worker)
+
+ # Wait for all workers to finish, handling ^C as needed.
+ try:
+ for worker in workers:
+ worker.join()
+ except KeyboardInterrupt:
+ # First try to drain the queue of work and let the
+ # running tests complete.
+ while not job_queue.empty():
+ try:
+ # Just drain it to stop more work from being started.
+ job_queue.get_nowait()
+ except Queue.Empty:
+ pass
+
+ print ('\nFirst KeyboardInterrupt received, stopping '
+ 'future work. Press again to hard-stop existing tests.')
+ try:
+ for worker in workers:
+ worker.join()
+ except KeyboardInterrupt:
+ print ('\nSecond KeyboardInterrupt received, killing '
+ 'all worker process trees.')
+ kill_all_worker_processes(workers, inferior_pid_events)
+
+ test_results = []
+ while not result_queue.empty():
+ test_results.append(result_queue.get(block=False))
else:
test_results = map(process_dir_worker, test_work_items)
_______________________________________________
lldb-commits mailing list
[email protected]
http://lists.llvm.org/cgi-bin/mailman/listinfo/lldb-commits