tfiala created this revision. tfiala added reviewers: clayborg, zturner. tfiala added a subscriber: lldb-commits.
This change does the following: * If Ctrl-C is hit once, the parallel dotest.py runner will stop any further work from being started, but will wait for the existing tests in progress to complete. * If Ctrl-C is hit a second time, the parallel dotest.py will kill the process trees for the child worker threads and the inferior dotest.py processes they spawned. In both cases, the report infrastructure is left intact to report on whatever work was completed before it stopped. Thus, if 42 tests ran when the test was keyboard interrupted, the results for those 42 tests will be displayed. See https://llvm.org/bugs/show_bug.cgi?id=24709 http://reviews.llvm.org/D12651 Files: test/dosep.py
Index: test/dosep.py =================================================================== --- test/dosep.py +++ test/dosep.py @@ -36,8 +36,10 @@ import os import fnmatch import platform +import Queue import re import dotest_args +import signal import subprocess import sys @@ -142,7 +144,7 @@ return passes, failures, unexpected_successes -def call_with_timeout(command, timeout, name): +def call_with_timeout(command, timeout, name, inferior_pid_events): """Run command with a timeout if possible.""" """-s QUIT will create a coredump if they are enabled on your system""" process = None @@ -161,8 +163,12 @@ stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + inferior_pid = process.pid + inferior_pid_events.put_nowait(('created', inferior_pid)) output = process.communicate() exit_status = process.returncode + inferior_pid_events.put_nowait(('destroyed', inferior_pid)) + passes, failures, unexpected_successes = parse_test_results(output) if exit_status == 0: # stdout does not have any useful information from 'dotest.py', @@ -173,7 +179,7 @@ return name, exit_status, passes, failures, unexpected_successes -def process_dir(root, files, test_root, dotest_argv): +def process_dir(root, files, test_root, dotest_argv, inferior_pid_events): """Examine a directory for tests, and invoke any found within it.""" results = [] for name in files: @@ -187,7 +193,8 @@ timeout = (os.getenv("LLDB_%s_TIMEOUT" % timeout_name) or getDefaultTimeout(dotest_options.lldb_platform_name)) - results.append(call_with_timeout(command, timeout, name)) + results.append(call_with_timeout( + command, timeout, name, inferior_pid_events)) # result = (name, status, passes, failures, unexpected_successes) timed_out = [name for name, status, _, _, _ in results @@ -208,10 +215,73 @@ out_q = None -def process_dir_worker(arg_tuple): +def process_dir_worker(a_output_lock, a_test_counter, a_total_tests, + a_test_name_len, a_dotest_options, job_queue, + result_queue, inferior_pid_events): """Worker thread main loop when in multithreaded mode. Takes one directory specification at a time and works on it.""" - return process_dir(*arg_tuple) + + # Shut off interrupt handling in the child process. + signal.signal(signal.SIGINT, signal.SIG_IGN) + + # Setup the global state for the worker process. + setup_global_variables( + a_output_lock, a_test_counter, a_total_tests, a_test_name_len, + a_dotest_options) + + # Keep grabbing entries from the queue until done. + while not job_queue.empty(): + try: + job = job_queue.get(block=False) + result = process_dir(job[0], job[1], job[2], job[3], + inferior_pid_events) + result_queue.put(result) + except Queue.Empty: + # Fine, we're done. + pass + + +def collect_active_pids_from_pid_events(event_queue): + """ + Returns the set of what should be active inferior pids based on + the event stream. + + @param event_queue a multiprocessing.Queue containing events of the + form: + ('created', pid) + ('destroyed', pid) + + @return set of inferior dotest.py pids activated but never completed. + """ + active_pid_set = set() + while not event_queue.empty(): + pid_event = event_queue.get_nowait() + if pid_event[0] == 'created': + active_pid_set.add(pid_event[1]) + elif pid_event[0] == 'destroyed': + active_pid_set.remove(pid_event[1]) + return active_pid_set + + +def kill_all_worker_processes(workers, inferior_pid_events): + """ + Kills all specified worker processes and their process tree. + + @param workers a list of multiprocess.Process worker objects. + @param inferior_pid_events a multiprocess.Queue that contains + all inferior create and destroy events. Used to construct + the list of child pids still outstanding that need to be killed. + """ + for worker in workers: + worker.terminate() + worker.join() + + # Add all the child test pids created. + active_pid_set = collect_active_pids_from_pid_events( + inferior_pid_events) + for inferior_pid in active_pid_set: + print "killing inferior pid {}".format(inferior_pid) + os.kill(inferior_pid, signal.SIGKILL) def walk_and_invoke(test_directory, test_subdir, dotest_argv, num_threads): @@ -236,7 +306,8 @@ return name.startswith("Test") and name.endswith(".py") tests = filter(is_test, files) - test_work_items.append((root, tests, test_directory, dotest_argv)) + test_work_items.append( + (root, tests, test_directory, dotest_argv, None)) global output_lock, test_counter, total_tests, test_name_len output_lock = multiprocessing.RLock() @@ -251,12 +322,65 @@ # Run the items, either in a pool (for multicore speedup) or # calling each individually. if num_threads > 1: - pool = multiprocessing.Pool( - num_threads, - initializer=setup_global_variables, - initargs=(output_lock, test_counter, total_tests, test_name_len, - dotest_options)) - test_results = pool.map(process_dir_worker, test_work_items) + # Create jobs. + job_queue = multiprocessing.Queue() + for test_work_item in test_work_items: + job_queue.put(test_work_item) + + result_queue = multiprocessing.Queue() + + # Create queues for started child pids. Terminating + # the multiprocess processes does not terminate the + # child processes they spawn. We can remove this tracking + # if/when we move to having the multiprocess process directly + # perform the test logic. The Queue size needs to be able to + # hold 2 * (num inferior dotest.py processes started) entries. + inferior_pid_events = multiprocessing.Queue(4096) + + # Create workers. We don't use multiprocessing.Pool due to + # challenges with handling ^C keyboard interrupts. + workers = [] + for _ in range(num_threads): + worker = multiprocessing.Process( + target=process_dir_worker, + args=(output_lock, + test_counter, + total_tests, + test_name_len, + dotest_options, + job_queue, + result_queue, + inferior_pid_events)) + worker.start() + workers.append(worker) + + # Wait for all workers to finish, handling ^C as needed. + try: + for worker in workers: + worker.join() + except KeyboardInterrupt: + # First try to drain the queue of work and let the + # running tests complete. + while not job_queue.empty(): + try: + # Just drain it to stop more work from being started. + job_queue.get_nowait() + except Queue.Empty: + pass + + print ('\nFirst KeyboardInterrupt received, stopping ' + 'future work. Press again to hard-stop existing tests.') + try: + for worker in workers: + worker.join() + except KeyboardInterrupt: + print ('\nSecond KeyboardInterrupt received, killing ' + 'all worker process trees.') + kill_all_worker_processes(workers, inferior_pid_events) + + test_results = [] + while not result_queue.empty(): + test_results.append(result_queue.get(block=False)) else: test_results = map(process_dir_worker, test_work_items)
_______________________________________________ lldb-commits mailing list lldb-commits@lists.llvm.org http://lists.llvm.org/cgi-bin/mailman/listinfo/lldb-commits