tfiala added a subscriber: dawn. tfiala updated this revision to Diff 34112. tfiala added a comment.
The latest patch does the following: - adds a test runner strategy layer for the isolated, concurrent test support (in dosep.py). The currently-supported test runners are: 1. multiprocessing This is a multiprocessing-based parallel test runner that supports ctrl-c through rolling its own worker pool. This one is known to currently have an issue on Windows. 2. multiprocessing-pool This one is exactly like what previously existed - multiprocessing based using multiprocessing.Pool() to do the work. It does not handle Ctrl-C at all (and performs poorly, just like before, when the user does press Ctrl-C). 3. serial This one is the same test runner used that does no concurrent test running but does use subprocess to provide process isolation to each of the test suite files. It is not expected that a user specify test-runner-name: sane defaults are provided. The defaults used when no test runner name is given are: - Use serial if the number of threads determined to be used is 1. (Likely due to the --threads argument on modern hardware). - If the os.name is 'nt', use multiprocessing-pool. - Otherwise, use multiprocessing. @Zachary - this should provide the net result of 'no behavior change' on Windows. If you have a chance to verify that with no special options, that'd be great. Note I plan to make one more change to add in the threading-based test runner before I'm totally done here, so this is just WIP. Zachary, if you want to wait for the "threading" test runner, that seems reasonable. I hope to have that diff in later today. @Dawn, I did add in the logic that says 'if you specify -v and using the dosep-style test runner, assume --output-on-success is specified. http://reviews.llvm.org/D12651 Files: test/dosep.py test/dotest.py test/dotest_args.py
Index: test/dotest_args.py =================================================================== --- test/dotest_args.py +++ test/dotest_args.py @@ -128,11 +128,17 @@ dest='num_threads', help=('The number of threads/processes to use when running tests ' 'separately, defaults to the number of CPU cores available')) - parser.add_argument( + group.add_argument( '--test-subdir', action='store', help='Specify a test subdirectory to use relative to the test root dir' ) + group.add_argument( + '--test-runner-name', + action='store', + help=('Specify a test runner strategy. Valid values: multiprocessing,' + ' multiprocessing-pool, serial') + ) # Remove the reference to our helper function del X Index: test/dotest.py =================================================================== --- test/dotest.py +++ test/dotest.py @@ -249,6 +249,7 @@ num_threads = None output_on_success = False no_multiprocess_test_runner = False +test_runner_name = None def usage(parser): parser.print_help() @@ -495,6 +496,7 @@ global num_threads global output_on_success global no_multiprocess_test_runner + global test_runner_name do_help = False @@ -756,7 +758,8 @@ if args.inferior: is_inferior_test_runner = True - if args.output_on_success: + # Turn on output_on_sucess if either explicitly added or -v specified. + if args.output_on_success or args.v: output_on_success = True if args.num_threads: @@ -765,6 +768,9 @@ if args.test_subdir: multiprocess_test_subdir = args.test_subdir + if args.test_runner_name: + test_runner_name = args.test_runner_name + if args.lldb_platform_name: lldb_platform_name = args.lldb_platform_name if args.lldb_platform_url: @@ -1278,7 +1284,8 @@ # multiprocess test runner here. if isMultiprocessTestRunner(): import dosep - dosep.main(output_on_success, num_threads, multiprocess_test_subdir) + dosep.main(output_on_success, num_threads, multiprocess_test_subdir, + test_runner_name) raise "should never get here" setupSysPath() Index: test/dosep.py =================================================================== --- test/dosep.py +++ test/dosep.py @@ -32,15 +32,18 @@ echo core.%p | sudo tee /proc/sys/kernel/core_pattern """ +import fnmatch import multiprocessing import os -import fnmatch import platform +import Queue import re -import dotest_args +import signal import subprocess import sys +import dotest_args + from optparse import OptionParser @@ -142,7 +145,7 @@ return passes, failures, unexpected_successes -def call_with_timeout(command, timeout, name): +def call_with_timeout(command, timeout, name, inferior_pid_events): """Run command with a timeout if possible.""" """-s QUIT will create a coredump if they are enabled on your system""" process = None @@ -161,8 +164,14 @@ stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + inferior_pid = process.pid + if inferior_pid_events: + inferior_pid_events.put_nowait(('created', inferior_pid)) output = process.communicate() exit_status = process.returncode + if inferior_pid_events: + inferior_pid_events.put_nowait(('destroyed', inferior_pid)) + passes, failures, unexpected_successes = parse_test_results(output) if exit_status == 0: # stdout does not have any useful information from 'dotest.py', @@ -173,7 +182,7 @@ return name, exit_status, passes, failures, unexpected_successes -def process_dir(root, files, test_root, dotest_argv): +def process_dir(root, files, test_root, dotest_argv, inferior_pid_events): """Examine a directory for tests, and invoke any found within it.""" results = [] for name in files: @@ -187,7 +196,8 @@ timeout = (os.getenv("LLDB_%s_TIMEOUT" % timeout_name) or getDefaultTimeout(dotest_options.lldb_platform_name)) - results.append(call_with_timeout(command, timeout, name)) + results.append(call_with_timeout( + command, timeout, name, inferior_pid_events)) # result = (name, status, passes, failures, unexpected_successes) timed_out = [name for name, status, _, _, _ in results @@ -208,39 +218,129 @@ out_q = None -def process_dir_worker(arg_tuple): - """Worker thread main loop when in multithreaded mode. +def process_dir_worker_multiprocessing( + a_output_lock, a_test_counter, a_total_tests, a_test_name_len, + a_dotest_options, job_queue, result_queue, inferior_pid_events): + """Worker thread main loop when in multiprocessing mode. Takes one directory specification at a time and works on it.""" - return process_dir(*arg_tuple) + # Shut off interrupt handling in the child process. + signal.signal(signal.SIGINT, signal.SIG_IGN) -def walk_and_invoke(test_directory, test_subdir, dotest_argv, num_threads): - """Look for matched files and invoke test driver on each one. - In single-threaded mode, each test driver is invoked directly. - In multi-threaded mode, submit each test driver to a worker - queue, and then wait for all to complete. + # Setup the global state for the worker process. + setup_global_variables( + a_output_lock, a_test_counter, a_total_tests, a_test_name_len, + a_dotest_options) - test_directory - lldb/test/ directory - test_subdir - lldb/test/ or a subfolder with the tests we're interested in - running + # Keep grabbing entries from the queue until done. + while not job_queue.empty(): + try: + job = job_queue.get(block=False) + result = process_dir(job[0], job[1], job[2], job[3], + inferior_pid_events) + result_queue.put(result) + except Queue.Empty: + # Fine, we're done. + pass + + +def process_dir_worker_multiprocessing_pool(args): + process_dir(*args) + + +def process_dir_mapper_inprocess(args): + """Map adapter for running the subprocess-based test runner. + + @param args the process work item tuple + @return the test result tuple +""" + return process_dir(*args) + + +def collect_active_pids_from_pid_events(event_queue): """ + Returns the set of what should be active inferior pids based on + the event stream. - # Collect the test files that we'll run. - test_work_items = [] - for root, dirs, files in os.walk(test_subdir, topdown=False): - def is_test(name): + @param event_queue a multiprocessing.Queue containing events of the + form: + ('created', pid) + ('destroyed', pid) + + @return set of inferior dotest.py pids activated but never completed. + """ + active_pid_set = set() + while not event_queue.empty(): + pid_event = event_queue.get_nowait() + if pid_event[0] == 'created': + active_pid_set.add(pid_event[1]) + elif pid_event[0] == 'destroyed': + active_pid_set.remove(pid_event[1]) + return active_pid_set + + +def kill_all_worker_processes(workers, inferior_pid_events): + """ + Kills all specified worker processes and their process tree. + + @param workers a list of multiprocess.Process worker objects. + @param inferior_pid_events a multiprocess.Queue that contains + all inferior create and destroy events. Used to construct + the list of child pids still outstanding that need to be killed. + """ + for worker in workers: + worker.terminate() + worker.join() + + # Add all the child test pids created. + active_pid_set = collect_active_pids_from_pid_events( + inferior_pid_events) + for inferior_pid in active_pid_set: + print "killing inferior pid {}".format(inferior_pid) + os.kill(inferior_pid, signal.SIGKILL) + + +def find_test_files_in_dir_tree(dir_root, found_func): + """Calls found_func for all the test files in the given dir hierarchy. + + @param dir_root the path to the directory to start scanning + for test files. All files in this directory and all its children + directory trees will be searched. + + @param found_func a callable object that will be passed + the parent directory (relative to dir_root) and the list of + test files from within that directory. + """ + for root, _, files in os.walk(dir_root, topdown=False): + def is_test_filename(test_dir, base_filename): + """Returns True if the given filename matches the test name format. + + @param test_dir the directory to check. Should be absolute or + relative to current working directory. + + @param base_filename the base name of the filename to check for a + dherence to the python test case filename format. + + @return True if name matches the python test case filename format. + """ # Not interested in symbolically linked files. - if os.path.islink(os.path.join(root, name)): + if os.path.islink(os.path.join(test_dir, base_filename)): return False # Only interested in test files with the "Test*.py" naming pattern. - return name.startswith("Test") and name.endswith(".py") + return (base_filename.startswith("Test") and + base_filename.endswith(".py")) + + tests = [filename for filename in files + if is_test_filename(root, filename)] + if tests: + found_func(root, tests) - tests = filter(is_test, files) - test_work_items.append((root, tests, test_directory, dotest_argv)) +def initialize_global_vars_multiprocessing(num_threads, test_work_items): + # Initialize the global state we'll use to communicate with the + # rest of the flat module. global output_lock, test_counter, total_tests, test_name_len output_lock = multiprocessing.RLock() - # item = (root, tests, test_directory, dotest_argv) total_tests = sum([len(item[1]) for item in test_work_items]) test_counter = multiprocessing.Value('i', 0) test_name_len = multiprocessing.Value('i', 0) @@ -248,19 +348,129 @@ total_tests, num_threads, (num_threads > 1) * "s") update_progress() - # Run the items, either in a pool (for multicore speedup) or - # calling each individually. - if num_threads > 1: - pool = multiprocessing.Pool( - num_threads, - initializer=setup_global_variables, - initargs=(output_lock, test_counter, total_tests, test_name_len, - dotest_options)) - test_results = pool.map(process_dir_worker, test_work_items) - else: - test_results = map(process_dir_worker, test_work_items) - # result = (timed_out, failed, passed, unexpected_successes, fail_count, pass_count) +def multiprocessing_test_runner(num_threads, test_work_items): + """Provides multiprocessing.Pool.map() test runner adapter. + + This concurrent test runner is based on the multiprocessing + library, and rolls its own worker pooling strategy so it + can handle Ctrl-C properly. + + This test runner is known to have an issue running on + Windows platforms. + + @param num_threads the number of worker processes to use. + + @param test_work_items the iterable of test work item tuples + to run. + """ + + # Initialize our global state. + initialize_global_vars_multiprocessing(num_threads, test_work_items) + + # Create jobs. + job_queue = multiprocessing.Queue(len(test_work_items)) + for test_work_item in test_work_items: + job_queue.put(test_work_item) + + result_queue = multiprocessing.Queue(len(test_work_items)) + + # Create queues for started child pids. Terminating + # the multiprocess processes does not terminate the + # child processes they spawn. We can remove this tracking + # if/when we move to having the multiprocess process directly + # perform the test logic. The Queue size needs to be able to + # hold 2 * (num inferior dotest.py processes started) entries. + inferior_pid_events = multiprocessing.Queue(4096) + + # Create workers. We don't use multiprocessing.Pool due to + # challenges with handling ^C keyboard interrupts. + workers = [] + for _ in range(num_threads): + worker = multiprocessing.Process( + target=process_dir_worker_multiprocessing, + args=(output_lock, + test_counter, + total_tests, + test_name_len, + dotest_options, + job_queue, + result_queue, + inferior_pid_events)) + worker.start() + workers.append(worker) + + # Wait for all workers to finish, handling ^C as needed. + try: + for worker in workers: + worker.join() + except KeyboardInterrupt: + # First try to drain the queue of work and let the + # running tests complete. + while not job_queue.empty(): + try: + # Just drain it to stop more work from being started. + job_queue.get_nowait() + except Queue.Empty: + pass + + print ('\nFirst KeyboardInterrupt received, stopping ' + 'future work. Press again to hard-stop existing tests.') + try: + for worker in workers: + worker.join() + except KeyboardInterrupt: + print ('\nSecond KeyboardInterrupt received, killing ' + 'all worker process trees.') + kill_all_worker_processes(workers, inferior_pid_events) + + test_results = [] + while not result_queue.empty(): + test_results.append(result_queue.get(block=False)) + return test_results + + +def multiprocessing_test_runner_pool(num_threads, test_work_items): + # Initialize our global state. + initialize_global_vars_multiprocessing(num_threads, test_work_items) + + pool = multiprocessing.Pool( + num_threads, + initializer=setup_global_variables, + initargs=(output_lock, test_counter, total_tests, test_name_len, + dotest_options)) + return pool.map(process_dir_worker_multiprocessing_pool, test_work_items) + + +def inprocess_exec_test_runner(test_work_items): + # Initialize our global state. + initialize_global_vars_multiprocessing(1, test_work_items) + return map(process_dir_mapper_inprocess, test_work_items) + + +def walk_and_invoke(test_directory, test_subdir, dotest_argv, + test_runner_func): + """Look for matched files and invoke test driver on each one. + In single-threaded mode, each test driver is invoked directly. + In multi-threaded mode, submit each test driver to a worker + queue, and then wait for all to complete. + + test_directory - lldb/test/ directory + test_subdir - lldb/test/ or a subfolder with the tests we're interested in + running + """ + + # Collect the test files that we'll run. + test_work_items = [] + find_test_files_in_dir_tree( + test_subdir, lambda testdir, test_files: test_work_items.append([ + test_subdir, test_files, test_directory, dotest_argv, None])) + + # Convert test work items into test results using whatever + # was provided as the test run function. + test_results = test_runner_func(test_work_items) + + # Summarize the results and return to caller. timed_out = sum([result[0] for result in test_results], []) passed = sum([result[1] for result in test_results], []) failed = sum([result[2] for result in test_results], []) @@ -268,7 +478,8 @@ pass_count = sum([result[4] for result in test_results]) fail_count = sum([result[5] for result in test_results]) - return (timed_out, passed, failed, unexpected_successes, pass_count, fail_count) + return (timed_out, passed, failed, unexpected_successes, pass_count, + fail_count) def getExpectedTimeouts(platform_name): @@ -354,7 +565,39 @@ return result -def main(print_details_on_success, num_threads, test_subdir): +def get_test_runner_strategies(num_threads): + """Returns the test runner strategies by name in a dictionary. + + @param num_threads specifies the number of threads/processes + that will be used for concurrent test runners. + + @return dictionary with key as test runner strategy name and + value set to a callable object that takes the test work item + and returns a test result tuple. + """ + return { + # multiprocessing supports ctrl-c and does not use + # multiprocessing.Pool. + "multiprocessing": + (lambda work_items: multiprocessing_test_runner( + num_threads, work_items)), + + # multiprocessing-pool uses multiprocessing.Pool but + # does not support Ctrl-C. + "multiprocessing-pool": + (lambda work_items: multiprocessing_test_runner_pool( + num_threads, work_items)), + + # serial uses the subprocess-based, single process + # test runner. This provides process isolation but + # no concurrent test running. + "serial": + inprocess_exec_test_runner + } + + +def main(print_details_on_success, num_threads, test_subdir, + test_runner_name): """Run dotest.py in inferior mode in parallel. @param print_details_on_success the parsed value of the output-on-success @@ -368,6 +611,13 @@ @param test_subdir optionally specifies a subdir to limit testing within. May be None if the entire test tree is to be used. This subdir is assumed to be relative to the lldb/test root of the test hierarchy. + + @param test_runner_name if specified, contains the test runner + name which selects the strategy used to run the isolated and + optionally concurrent test runner. Specify None to allow the + system to choose the most appropriate test runner given desired + thread count and OS type. + """ dotest_argv = sys.argv[1:] @@ -435,8 +685,38 @@ num_threads = 1 system_info = " ".join(platform.uname()) - (timed_out, passed, failed, unexpected_successes, pass_count, fail_count) = walk_and_invoke( - test_directory, test_subdir, dotest_argv, num_threads) + + # Figure out which testrunner strategy we'll use. + runner_strategies_by_name = get_test_runner_strategies(num_threads) + + # If the user didn't specify a test runner strategy, determine + # the default now based on number of threads and OS type. + if not test_runner_name: + if num_threads == 1: + # Use the serial runner. + test_runner_name = "serial" + elif os.name == "nt": + # Currently the multiprocessing test runner with ctrl-c + # support isn't running correctly on nt. Use the pool + # support without ctrl-c. + test_runner_name = "multiprocessing-pool" + else: + # For everyone else, use the ctrl-c-enabled + # multiprocessing support. + test_runner_name = "multiprocessing" + + if test_runner_name not in runner_strategies_by_name: + raise ("specified testrunner name '{}' unknown. " + "Valid choices: {}".format( + test_runner_name, + runner_strategies_by_name.keys())) + test_runner_func = runner_strategies_by_name[test_runner_name] + + summary_results = walk_and_invoke( + test_directory, test_subdir, dotest_argv, test_runner_func) + + (timed_out, passed, failed, unexpected_successes, pass_count, + fail_count) = summary_results timed_out = set(timed_out) num_test_files = len(passed) + len(failed)
_______________________________________________ lldb-commits mailing list lldb-commits@lists.llvm.org http://lists.llvm.org/cgi-bin/mailman/listinfo/lldb-commits