ACCUMULO-2380 Improve RunTests to produce useful mapper and job output

When passed the -m option, run.py for functional tests will emit one line on 
standard
output for each test that passes or fails. The output is suitable for 
processing when
tests are run within a mapreduce job.

The mapper for RunTests, which runs functional tests, is updated to pass -m to 
run.py.
It also inserts the map task attempt ID into the custom output, so that testers 
can trace
back any test to the task where it ran, to view its logs for example.

The mapper also now uses counters to track the number of successes, failures, 
errors, etc.
Note that since the job may attempt to re-run tests that error out, the total 
for these
counters can exceed the total number of unique tests run.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/7059c767
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/7059c767
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/7059c767

Branch: refs/heads/master
Commit: 7059c767bfbcd2def59556ed63cdfe443f2d336b
Parents: ac18b56
Author: Bill Havanki <bhava...@cloudera.com>
Authored: Thu Feb 20 09:16:53 2014 -0500
Committer: Bill Havanki <bhava...@cloudera.com>
Committed: Thu Feb 20 16:44:32 2014 -0500

----------------------------------------------------------------------
 .../server/test/functional/RunTests.java        | 50 ++++++++++++++++----
 test/system/auto/TestUtils.py                   |  9 ++--
 test/system/auto/run.py                         | 34 ++++++++++++-
 3 files changed, 79 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/7059c767/src/server/src/main/java/org/apache/accumulo/server/test/functional/RunTests.java
----------------------------------------------------------------------
diff --git 
a/src/server/src/main/java/org/apache/accumulo/server/test/functional/RunTests.java
 
b/src/server/src/main/java/org/apache/accumulo/server/test/functional/RunTests.java
index 1a19bb5..4dac215 100644
--- 
a/src/server/src/main/java/org/apache/accumulo/server/test/functional/RunTests.java
+++ 
b/src/server/src/main/java/org/apache/accumulo/server/test/functional/RunTests.java
@@ -16,11 +16,14 @@
  */
 package org.apache.accumulo.server.test.functional;
 
+import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.accumulo.server.logger.IdentityReducer;
 import org.apache.hadoop.conf.Configuration;
@@ -73,9 +76,26 @@ public class RunTests extends Configured implements Tool {
   
   static public class TestMapper extends Mapper<LongWritable,Text,Text,Text> {
     
+    private static final String REDUCER_RESULT_START = "::::: ";
+    private static final int RRS_LEN = REDUCER_RESULT_START.length();
+    private Text result = new Text();
+
+    private static enum Outcome {
+      SUCCESS, FAILURE, ERROR, UNEXPECTED_SUCCESS, EXPECTED_FAILURE
+    }
+    private static final Map<Character, Outcome> OUTCOME_COUNTERS;
+    static {
+      OUTCOME_COUNTERS = new java.util.HashMap<Character, Outcome>();
+      OUTCOME_COUNTERS.put('S', Outcome.SUCCESS);
+      OUTCOME_COUNTERS.put('F', Outcome.FAILURE);
+      OUTCOME_COUNTERS.put('E', Outcome.ERROR);
+      OUTCOME_COUNTERS.put('T', Outcome.UNEXPECTED_SUCCESS);
+      OUTCOME_COUNTERS.put('G', Outcome.EXPECTED_FAILURE);
+    }
+
     @Override
     protected void map(LongWritable key, Text value, Context context) throws 
IOException, InterruptedException {
-      List<String> cmd = Arrays.asList("/usr/bin/python", 
"test/system/auto/run.py", "-t", value.toString());
+      List<String> cmd = Arrays.asList("/usr/bin/python", 
"test/system/auto/run.py", "-m", "-t", value.toString());
       log.info("Running test " + cmd);
       ProcessBuilder pb = new ProcessBuilder(cmd);
       pb.directory(new File(context.getConfiguration().get("accumulo.home")));
@@ -83,19 +103,31 @@ public class RunTests extends Configured implements Tool {
       Process p = pb.start();
       p.getOutputStream().close();
       InputStream out = p.getInputStream();
-      byte[] buffer = new byte[1024];
-      int len = 0;
-      Text result = new Text();
+      InputStreamReader outr = new InputStreamReader(out);
+      BufferedReader br = new BufferedReader(outr);
+      String line;
       try {
-        while ((len = out.read(buffer)) > 0) {
-          log.info("More: " + new String(buffer, 0, len));
-          result.append(buffer, 0, len);
+        while ((line = br.readLine()) != null) {
+          log.info("More: " + line);
+          if (line.startsWith(REDUCER_RESULT_START)) {
+            String resultLine = line.substring(RRS_LEN);
+            if (resultLine.length() > 0) {
+              Outcome outcome = OUTCOME_COUNTERS.get(resultLine.charAt(0));
+              if (outcome != null) {
+                context.getCounter(outcome).increment(1);
+              }
+            }
+            String taskAttemptId = context.getTaskAttemptID().toString();
+            result.set(taskAttemptId + " " + resultLine);
+            context.write(value, result);
+          }
         }
       } catch (Exception ex) {
-        log.error(ex, ex);
+        log.error(ex);
+        context.progress();
       }
+
       p.waitFor();
-      context.write(value, result);
     }
     
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7059c767/test/system/auto/TestUtils.py
----------------------------------------------------------------------
diff --git a/test/system/auto/TestUtils.py b/test/system/auto/TestUtils.py
index f36b017..68d981d 100644
--- a/test/system/auto/TestUtils.py
+++ b/test/system/auto/TestUtils.py
@@ -443,9 +443,12 @@ class TestUtilsMixin:
       LOG_PROPERTIES_BACKUP='%s.bkp' % LOG_PROPERTIES 
       LOG_GENERIC_BACKUP='%s.bkp' % LOG_GENERIC
       LOG_MONITOR_BACKUP='%s.bkp' % LOG_MONITOR
-      os.remove(LOG_PROPERTIES)
-      os.remove(LOG_GENERIC)
-      os.remove(LOG_MONITOR)
+      if os.path.exists(LOG_PROPERTIES):
+        os.remove(LOG_PROPERTIES)
+      if os.path.exists(LOG_GENERIC):
+        os.remove(LOG_GENERIC)
+      if os.path.exists(LOG_MONITOR):
+        os.remove(LOG_MONITOR)
       if os.path.exists(LOG_PROPERTIES_BACKUP):
         os.rename(LOG_PROPERTIES_BACKUP, LOG_PROPERTIES)
       if os.path.exists(LOG_GENERIC_BACKUP):

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7059c767/test/system/auto/run.py
----------------------------------------------------------------------
diff --git a/test/system/auto/run.py b/test/system/auto/run.py
index 4475398..299bcae 100755
--- a/test/system/auto/run.py
+++ b/test/system/auto/run.py
@@ -167,6 +167,7 @@ class _TextTestResult(unittest.TestResult):
         unittest.TestResult.__init__(self)
         self.stream = stream
         self.descriptions = descriptions
+        self.successes = []
 
     def getDescription(self, test):
         if self.descriptions:
@@ -183,6 +184,7 @@ class _TextTestResult(unittest.TestResult):
 
     def addSuccess(self, test):
         unittest.TestResult.addSuccess(self, test)
+        self.successes.append(test)
         self.stream.writeln("ok")
 
     def addError(self, test, err):
@@ -229,6 +231,9 @@ def makeDiskFailureLibrary():
     except:
         compile()
     
+def emitMapReduceResult(code, test):
+    print '::::: %s %s' % (code, str(test))
+
 def main():
     makeDiskFailureLibrary()
     
@@ -255,7 +260,9 @@ def main():
     parser.add_option('-s', '--start', dest='start', default=None, 
                       help='Start the test list at the given test name')
     parser.add_option('-x', '--xml', dest='xmlreport', default=False, 
action='store_true',
-                      help='Output tests results to xml (jenkins conpatible)')
+                      help='Output test results to xml (jenkins compatible)')
+    parser.add_option('-m', '--mapreduce', dest='mapreduce', default=False, 
action='store_true',
+                      help='Output test results suitable for mapreduce')
     parser.add_option('-f', '--timeout-factor', dest='timeout_factor',
                       default=1, type=int,
                       help="Multiplier for some timeouts (use on slower 
hardware) (%default)")
@@ -315,8 +322,9 @@ def main():
     else:
         removeInstrumentedAccumuloJars()
 
+    results = []
     for i in range(options.repeat):
-        runner.run(suite)
+        results.append(runner.run(suite))
 
     if options.coverage:
         mergeCoverage()
@@ -325,6 +333,28 @@ def main():
              os.path.join(ACCUMULO_HOME,'src','server','src','main','java')]
             )
 
+    numFailures = 0
+    doEmitMR = options.mapreduce and not options.xmlreport
+    for result in results:
+        if doEmitMR:
+            for test in result.successes:
+                emitMapReduceResult('S', test)
+            if hasattr(result, 'expectedFailures'):
+                for test, err in result.expectedFailures:
+                    emitMapReduceResult('G', test)
+            for test, err in result.failures:
+                emitMapReduceResult('F', test)
+            for test, err in result.errors:
+                emitMapReduceResult('E', test)
+        numFailures += len(result.failures)
+        numFailures += len(result.errors)
+        if hasattr(result, 'unexpectedSuccesses'):
+            if doEmitMR:
+                for test in result.unexpectedSuccesses:
+                    emitMapReduceResult('T', test)
+            numFailures += len(result.unexpectedSuccesses)
+    if numFailures > 0:
+        sys.exit(1)
 
 if __name__ == '__main__':
     main()

Reply via email to