ACCUMULO-1854 Ended up re-implementing some of the old approach to get
back to a functional state.

Couldn't solely use the Configuration for things as getting the same
Configuration each time getSplits is called isn't guaranteed. Since
getSplits are always called serially by one client, we can use that fact
to keep some state and not read the same data many times.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c50a2229
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c50a2229
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c50a2229

Branch: refs/heads/ACCUMULO-1854-multi-aif
Commit: c50a22296d80042a86639129a02e2b9468dc3330
Parents: 0f10a6f
Author: Josh Elser <els...@apache.org>
Authored: Thu Nov 7 00:16:17 2013 -0500
Committer: Josh Elser <els...@apache.org>
Committed: Thu Nov 7 00:20:00 2013 -0500

----------------------------------------------------------------------
 .../core/client/mapreduce/InputFormatBase.java  | 38 ++++++++++++++---
 .../client/mapreduce/SequencedFormatHelper.java | 45 +++++++++++++++++---
 .../mapreduce/AccumuloOutputFormatTest.java     |  6 +++
 .../client/mapreduce/InputFormatBaseTest.java   |  1 +
 4 files changed, 77 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/c50a2229/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
----------------------------------------------------------------------
diff --git 
a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
 
b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
index 5c87c13..9ce98ba 100644
--- 
a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
+++ 
b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
@@ -42,8 +42,8 @@ import java.util.Map.Entry;
 import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.StringTokenizer;
-
-import javax.servlet.jsp.jstl.core.Config;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
@@ -83,7 +83,6 @@ import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.TextUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -151,15 +150,22 @@ public abstract class InputFormatBase<K,V> extends 
InputFormat<K,V> {
   private static final String ITERATORS_DELIM = ",";
 
   private static final String SEQ_DELIM = ".";
-  
 
   private static final String READ_OFFLINE = PREFIX + ".read.offline";
+  
+  private static final AtomicBoolean DEFAULT_SEQUENCE_READ = new 
AtomicBoolean(false);
+  private static final AtomicInteger SEQUENCES_READ = new AtomicInteger(0);
 
   protected static String merge(String name, Integer sequence) {
     return name + SEQ_DELIM + sequence;
   }
 
 
+  protected static void resetInternals() {
+    DEFAULT_SEQUENCE_READ.set(false);
+    SEQUENCES_READ.set(0);
+  }
+  
   /**
    * Get a unique identifier for these configurations
    * 
@@ -169,8 +175,25 @@ public abstract class InputFormatBase<K,V> extends 
InputFormat<K,V> {
     return SequencedFormatHelper.nextSequence(conf, PREFIX);
   }
   
-  protected static int nextSequenceToProcess(Configuration conf) {
-    return SequencedFormatHelper.nextSequenceToProcess(conf, PREFIX);
+  protected static synchronized int nextSequenceToProcess(Configuration conf) {
+    boolean isDefaultSequenceUsed = 
SequencedFormatHelper.isDefaultSequenceUsed(conf, PREFIX);
+    
+    if (isDefaultSequenceUsed && !DEFAULT_SEQUENCE_READ.get()) {
+      DEFAULT_SEQUENCE_READ.set(true);
+      return 0;
+    }
+    
+    Integer[] configuredSequences = 
SequencedFormatHelper.configuredSequences(conf, PREFIX);
+    
+    int sequenceOffset = SEQUENCES_READ.getAndAdd(1);    
+    
+    if (0 == configuredSequences.length && !isDefaultSequenceUsed) {
+      throw new NoSuchElementException();
+    } else if (sequenceOffset >= configuredSequences.length) {
+      return -1;
+    }
+    
+    return configuredSequences[sequenceOffset];
   }
   
   protected static void setDefaultSequenceUsed(Configuration conf) {
@@ -1791,6 +1814,9 @@ public abstract class InputFormatBase<K,V> extends 
InputFormat<K,V> {
    */
   @Override
   public List<InputSplit> getSplits(JobContext job) throws IOException {
+    // Disclaimer: the only reason this works as it does is because getSplits 
is 
+    // called serially by the JobClient before the job starts (one node, one 
thread).
+    // If it was called by multiple nodes, this approach would fail miserably.
     final Configuration conf = job.getConfiguration();
     final int sequence = nextSequenceToProcess(conf);
     

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c50a2229/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/SequencedFormatHelper.java
----------------------------------------------------------------------
diff --git 
a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/SequencedFormatHelper.java
 
b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/SequencedFormatHelper.java
index ff18754..ab6dd3a 100644
--- 
a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/SequencedFormatHelper.java
+++ 
b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/SequencedFormatHelper.java
@@ -16,11 +16,11 @@ public class SequencedFormatHelper {
 
   private static final String COMMA = ",";
   private static final String TRUE = "true";
-  protected static final int DEFAULT_SEQUENCE = 0;
+  public static final int DEFAULT_SEQUENCE = 0;
 
-  private static final String DEFAULT_SEQ_USED = ".defaultSequenceUsed";
-  private static final String CONFIGURED_SEQUENCES = ".configuredSeqs";
-  private static final String PROCESSED_SEQUENCES = ".processedSeqs";
+  public static final String DEFAULT_SEQ_USED = ".defaultSequenceUsed";
+  public static final String CONFIGURED_SEQUENCES = ".configuredSeqs";
+  public static final String PROCESSED_SEQUENCES = ".processedSeqs";
 
   /**
    * Get a unique identifier for these configurations
@@ -44,15 +44,46 @@ public class SequencedFormatHelper {
       return newValue;
     }
   }
+  
+  /**
+   * Returns all configured sequences but not the default sequence
+   * @param conf
+   * @param prefix
+   * @return
+   */
+  public static Integer[] configuredSequences(Configuration conf, String 
prefix) {
+    ArgumentChecker.notNull(conf, prefix);
+    
+    final String configuredSequences = prefix + CONFIGURED_SEQUENCES;
+    String[] values = conf.getStrings(configuredSequences);
+    if (null == values) {
+      return new Integer[0];
+    }
+    
+    Integer[] intValues = new Integer[values.length];
+    for (int i = 0; i < values.length; i++) {
+      intValues[i] = Integer.parseInt(values[i]);
+    }
+    
+    return intValues;
+  }
 
+  protected static boolean isDefaultSequenceUsed(Configuration conf, String 
prefix) {
+    ArgumentChecker.notNull(conf, prefix);
+    
+    final String defaultSequenceUsedKey = prefix + DEFAULT_SEQ_USED;
+    
+    return conf.getBoolean(defaultSequenceUsedKey, false);
+  }
+  
   protected static void setDefaultSequenceUsed(Configuration conf, String 
prefix) {
     ArgumentChecker.notNull(conf, prefix);
 
-    final String configuredSequences = prefix + DEFAULT_SEQ_USED;
+    final String defaultSequenceUsedKey = prefix + DEFAULT_SEQ_USED;
 
-    String value = conf.get(configuredSequences);
+    String value = conf.get(defaultSequenceUsedKey);
     if (null == value || !TRUE.equals(value)) {
-      conf.setBoolean(configuredSequences, true);
+      conf.setBoolean(defaultSequenceUsedKey, true);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c50a2229/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
 
b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
index 5599cae..c4c2e76 100644
--- 
a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
+++ 
b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.junit.Before;
 import org.junit.Test;
 
 /**
@@ -77,6 +78,11 @@ public class AccumuloOutputFormatTest {
     }
   }
   
+  @Before
+  public void clearInputFormatState() {
+    InputFormatBase.resetInternals();
+  }
+  
   @Test
   public void testMR() throws Exception {
     MockInstance mockInstance = new MockInstance("testmrinstance");

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c50a2229/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/InputFormatBaseTest.java
----------------------------------------------------------------------
diff --git 
a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/InputFormatBaseTest.java
 
b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/InputFormatBaseTest.java
index 9d167a9..f52c7a1 100644
--- 
a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/InputFormatBaseTest.java
+++ 
b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/InputFormatBaseTest.java
@@ -14,6 +14,7 @@ public class InputFormatBaseTest {
   
   @Before
   public void setup() {
+    InputFormatBase.resetInternals();
     conf = new Configuration();
   }
 

Reply via email to