ACCUMULO-1854 First stab at being able to use multiple
AccumuloInputFormats and AccumuloOutputFormats in the same
Configuration.

Modified the static API calls on both AIF and AOF to include the notion
of a "sequence" number. The original methods still exist, defaulting to
'0', to preserve backwards compatibility with API and functionality. A
primitive method to get a sequence number was added to both and is used
to properly generate InputSplits and output info.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/921617d4
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/921617d4
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/921617d4

Branch: refs/heads/ACCUMULO-1854-multi-aif
Commit: 921617d4dac38e81571dc852308ff4924530092b
Parents: 62c6a22
Author: Josh Elser <els...@apache.org>
Authored: Tue Nov 5 16:58:52 2013 -0500
Committer: Josh Elser <els...@apache.org>
Committed: Tue Nov 5 17:04:26 2013 -0500

----------------------------------------------------------------------
 .../client/mapreduce/AccumuloOutputFormat.java  |  240 +++-
 .../core/client/mapreduce/InputFormatBase.java  | 1050 +++++++++++++-----
 .../mapreduce/AccumuloInputFormatTest.java      |   47 +-
 .../mapreduce/AccumuloOutputFormatTest.java     |   39 +
 4 files changed, 1045 insertions(+), 331 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/921617d4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
 
b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
index 02516a3..9b9041a 100644
--- 
a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
+++ 
b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
@@ -19,6 +19,7 @@ package org.apache.accumulo.core.client.mapreduce;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -87,6 +88,29 @@ public class AccumuloOutputFormat extends 
OutputFormat<Text,Mutation> {
   private static final long DEFAULT_MAX_MUTATION_BUFFER_SIZE = 50 * 1024 * 
1024; // 50MB
   private static final int DEFAULT_MAX_LATENCY = 60 * 1000; // 1 minute
   private static final int DEFAULT_NUM_WRITE_THREADS = 2;
+
+  private static final AtomicInteger NUM_CONFIGURATIONS_LOADED = new 
AtomicInteger(0);
+  private static final AtomicInteger NUM_CONFIGURATIONS_PROCESSED = new 
AtomicInteger(0);
+  private static final int DEFAULT_SEQUENCE = 0;
+  private static final String SEQ_DELIM = ".";
+
+  /**
+   * Get a unique identifier for these configurations
+   * 
+   * @return A unique number to provide to future AccumuloInputFormat calls
+   */
+  public static int nextSequence() {
+    return NUM_CONFIGURATIONS_LOADED.incrementAndGet();
+  }
+
+  protected static String merge(String name, Integer sequence) {
+    return name + SEQ_DELIM + sequence;
+  }
+  
+  public static void resetCounters() {
+    NUM_CONFIGURATIONS_LOADED.set(0);
+    NUM_CONFIGURATIONS_PROCESSED.set(0);
+  }
   
   /**
    * Configure the output format.
@@ -122,16 +146,35 @@ public class AccumuloOutputFormat extends 
OutputFormat<Text,Mutation> {
    *          the table to use when the tablename is null in the write call
    */
   public static void setOutputInfo(Configuration conf, String user, byte[] 
passwd, boolean createTables, String defaultTable) {
-    if (conf.getBoolean(OUTPUT_INFO_HAS_BEEN_SET, false))
-      throw new IllegalStateException("Output info can only be set once per 
job");
-    conf.setBoolean(OUTPUT_INFO_HAS_BEEN_SET, true);
+    setOutputInfo(conf, DEFAULT_SEQUENCE, user, passwd, createTables, 
defaultTable);
+  }
+  
+  /**
+   * Configure the output format.
+   * 
+   * @param conf
+   *          the Map/Reduce job object
+   * @param user
+   *          the username, which must have the Table.CREATE permission to 
create tables
+   * @param passwd
+   *          the passwd for the username
+   * @param createTables
+   *          the output format will create new tables as necessary. Table 
names can only be alpha-numeric and underscores.
+   * @param defaultTable
+   *          the table to use when the tablename is null in the write call
+   */
+  public static void setOutputInfo(Configuration conf, int sequence, String 
user, byte[] passwd, boolean createTables, String defaultTable) {
+    final String outputInfoSet = merge(OUTPUT_INFO_HAS_BEEN_SET, sequence);
+    if (conf.getBoolean(outputInfoSet, false))
+      throw new IllegalStateException("Output info for sequence " + sequence + 
" can only be set once per job");
+    conf.setBoolean(outputInfoSet, true);
     
     ArgumentChecker.notNull(user, passwd);
-    conf.set(USERNAME, user);
-    conf.set(PASSWORD, new String(Base64.encodeBase64(passwd)));
-    conf.setBoolean(CREATETABLES, createTables);
+    conf.set(merge(USERNAME, sequence), user);
+    conf.set(merge(PASSWORD, sequence), new 
String(Base64.encodeBase64(passwd)));
+    conf.setBoolean(merge(CREATETABLES, sequence), createTables);
     if (defaultTable != null)
-      conf.set(DEFAULT_TABLE_NAME, defaultTable);
+      conf.set(merge(DEFAULT_TABLE_NAME, sequence), defaultTable);
   }
   
   /**
@@ -142,13 +185,18 @@ public class AccumuloOutputFormat extends 
OutputFormat<Text,Mutation> {
   }
   
   public static void setZooKeeperInstance(Configuration conf, String 
instanceName, String zooKeepers) {
-    if (conf.getBoolean(INSTANCE_HAS_BEEN_SET, false))
-      throw new IllegalStateException("Instance info can only be set once per 
job");
-    conf.setBoolean(INSTANCE_HAS_BEEN_SET, true);
+    setZooKeeperInstance(conf, DEFAULT_SEQUENCE, instanceName, zooKeepers);
+  }
+  
+  public static void setZooKeeperInstance(Configuration conf, int sequence, 
String instanceName, String zooKeepers) {
+    final String instanceSet = merge(INSTANCE_HAS_BEEN_SET, sequence);
+    if (conf.getBoolean(instanceSet, false))
+      throw new IllegalStateException("Instance info for sequence " + sequence 
+ " can only be set once per job");
+    conf.setBoolean(instanceSet, true);
     
     ArgumentChecker.notNull(instanceName, zooKeepers);
-    conf.set(INSTANCE_NAME, instanceName);
-    conf.set(ZOOKEEPERS, zooKeepers);
+    conf.set(merge(INSTANCE_NAME, sequence), instanceName);
+    conf.set(merge(ZOOKEEPERS, sequence), zooKeepers);
   }
   
   /**
@@ -159,9 +207,13 @@ public class AccumuloOutputFormat extends 
OutputFormat<Text,Mutation> {
   }
   
   public static void setMockInstance(Configuration conf, String instanceName) {
-    conf.setBoolean(INSTANCE_HAS_BEEN_SET, true);
-    conf.setBoolean(MOCK, true);
-    conf.set(INSTANCE_NAME, instanceName);
+    setMockInstance(conf, DEFAULT_SEQUENCE, instanceName);
+  }
+  
+  public static void setMockInstance(Configuration conf, int sequence, String 
instanceName) {
+    conf.setBoolean(merge(INSTANCE_HAS_BEEN_SET, sequence), true);
+    conf.setBoolean(merge(MOCK, sequence), true);
+    conf.set(merge(INSTANCE_NAME, sequence), instanceName);
   }
   
   /**
@@ -172,7 +224,11 @@ public class AccumuloOutputFormat extends 
OutputFormat<Text,Mutation> {
   }
   
   public static void setMaxMutationBufferSize(Configuration conf, long 
numberOfBytes) {
-    conf.setLong(MAX_MUTATION_BUFFER_SIZE, numberOfBytes);
+    setMaxMutationBufferSize(conf, DEFAULT_SEQUENCE, numberOfBytes);
+  }
+  
+  public static void setMaxMutationBufferSize(Configuration conf, int 
sequence, long numberOfBytes) {
+    conf.setLong(merge(MAX_MUTATION_BUFFER_SIZE, sequence), numberOfBytes);
   }
   
   /**
@@ -183,7 +239,11 @@ public class AccumuloOutputFormat extends 
OutputFormat<Text,Mutation> {
   }
   
   public static void setMaxLatency(Configuration conf, int 
numberOfMilliseconds) {
-    conf.setInt(MAX_LATENCY, numberOfMilliseconds);
+    setMaxLatency(conf, DEFAULT_SEQUENCE, numberOfMilliseconds);
+  }
+  
+  public static void setMaxLatency(Configuration conf, int sequence, int 
numberOfMilliseconds) {
+    conf.setInt(merge(MAX_LATENCY, sequence), numberOfMilliseconds);
   }
   
   /**
@@ -194,7 +254,11 @@ public class AccumuloOutputFormat extends 
OutputFormat<Text,Mutation> {
   }
   
   public static void setMaxWriteThreads(Configuration conf, int 
numberOfThreads) {
-    conf.setInt(NUM_WRITE_THREADS, numberOfThreads);
+    setMaxWriteThreads(conf, DEFAULT_SEQUENCE, numberOfThreads);
+  }
+  
+  public static void setMaxWriteThreads(Configuration conf, int sequence, int 
numberOfThreads) {
+    conf.setInt(merge(NUM_WRITE_THREADS, sequence), numberOfThreads);
   }
   
   /**
@@ -205,8 +269,12 @@ public class AccumuloOutputFormat extends 
OutputFormat<Text,Mutation> {
   }
   
   public static void setLogLevel(Configuration conf, Level level) {
+    setLogLevel(conf, DEFAULT_SEQUENCE, level);
+  }
+  
+  public static void setLogLevel(Configuration conf, int sequence, Level 
level) {
     ArgumentChecker.notNull(level);
-    conf.setInt(LOGLEVEL, level.toInt());
+    conf.setInt(merge(LOGLEVEL, sequence), level.toInt());
   }
   
   /**
@@ -217,7 +285,11 @@ public class AccumuloOutputFormat extends 
OutputFormat<Text,Mutation> {
   }
   
   public static void setSimulationMode(Configuration conf) {
-    conf.setBoolean(SIMULATE, true);
+    setSimulationMode(conf, DEFAULT_SEQUENCE);
+  }
+  
+  public static void setSimulationMode(Configuration conf, int sequence) {
+    conf.setBoolean(merge(SIMULATE, sequence), true);
   }
   
   /**
@@ -228,7 +300,11 @@ public class AccumuloOutputFormat extends 
OutputFormat<Text,Mutation> {
   }
   
   protected static String getUsername(Configuration conf) {
-    return conf.get(USERNAME);
+    return getUsername(conf, DEFAULT_SEQUENCE);
+  }
+  
+  protected static String getUsername(Configuration conf, int sequence) {
+    return conf.get(merge(USERNAME, sequence));
   }
   
   /**
@@ -240,13 +316,16 @@ public class AccumuloOutputFormat extends 
OutputFormat<Text,Mutation> {
   protected static byte[] getPassword(JobContext job) {
     return getPassword(job.getConfiguration());
   }
+  protected static byte[] getPassword(Configuration conf) {
+    return getPassword(conf, DEFAULT_SEQUENCE);
+  }
   
   /**
    * WARNING: The password is stored in the Configuration and shared with all 
MapReduce tasks; It is BASE64 encoded to provide a charset safe conversion to a
    * string, and is not intended to be secure.
    */
-  protected static byte[] getPassword(Configuration conf) {
-    return Base64.decodeBase64(conf.get(PASSWORD, "").getBytes());
+  protected static byte[] getPassword(Configuration conf, int sequence) {
+    return Base64.decodeBase64(conf.get(merge(PASSWORD, sequence), 
"").getBytes());
   }
   
   /**
@@ -257,7 +336,11 @@ public class AccumuloOutputFormat extends 
OutputFormat<Text,Mutation> {
   }
   
   protected static boolean canCreateTables(Configuration conf) {
-    return conf.getBoolean(CREATETABLES, false);
+    return canCreateTables(conf, DEFAULT_SEQUENCE);
+  }
+  
+  protected static boolean canCreateTables(Configuration conf, int sequence) {
+    return conf.getBoolean(merge(CREATETABLES, sequence), false);
   }
   
   /**
@@ -268,7 +351,11 @@ public class AccumuloOutputFormat extends 
OutputFormat<Text,Mutation> {
   }
   
   protected static String getDefaultTableName(Configuration conf) {
-    return conf.get(DEFAULT_TABLE_NAME);
+    return getDefaultTableName(conf, DEFAULT_SEQUENCE);
+  }
+  
+  protected static String getDefaultTableName(Configuration conf, int 
sequence) {
+    return conf.get(merge(DEFAULT_TABLE_NAME, sequence));
   }
   
   /**
@@ -279,9 +366,13 @@ public class AccumuloOutputFormat extends 
OutputFormat<Text,Mutation> {
   }
   
   protected static Instance getInstance(Configuration conf) {
-    if (conf.getBoolean(MOCK, false))
-      return new MockInstance(conf.get(INSTANCE_NAME));
-    return new ZooKeeperInstance(conf.get(INSTANCE_NAME), 
conf.get(ZOOKEEPERS));
+    return getInstance(conf, DEFAULT_SEQUENCE);
+  }
+  
+  protected static Instance getInstance(Configuration conf, int sequence) {
+    if (conf.getBoolean(merge(MOCK, sequence), false))
+      return new MockInstance(conf.get(merge(INSTANCE_NAME, sequence)));
+    return new ZooKeeperInstance(conf.get(merge(INSTANCE_NAME, sequence)), 
conf.get(merge(ZOOKEEPERS, sequence)));
   }
   
   /**
@@ -292,7 +383,11 @@ public class AccumuloOutputFormat extends 
OutputFormat<Text,Mutation> {
   }
   
   protected static long getMaxMutationBufferSize(Configuration conf) {
-    return conf.getLong(MAX_MUTATION_BUFFER_SIZE, 
DEFAULT_MAX_MUTATION_BUFFER_SIZE);
+    return getMaxMutationBufferSize(conf, DEFAULT_SEQUENCE);
+  }
+  
+  protected static long getMaxMutationBufferSize(Configuration conf, int 
sequence) {
+    return conf.getLong(merge(MAX_MUTATION_BUFFER_SIZE, sequence), 
DEFAULT_MAX_MUTATION_BUFFER_SIZE);
   }
   
   /**
@@ -303,7 +398,11 @@ public class AccumuloOutputFormat extends 
OutputFormat<Text,Mutation> {
   }
   
   protected static int getMaxLatency(Configuration conf) {
-    return conf.getInt(MAX_LATENCY, DEFAULT_MAX_LATENCY);
+    return getMaxLatency(conf, DEFAULT_SEQUENCE);
+  }
+  
+  protected static int getMaxLatency(Configuration conf, int sequence) {
+    return conf.getInt(merge(MAX_LATENCY, sequence), DEFAULT_MAX_LATENCY);
   }
   
   /**
@@ -314,7 +413,11 @@ public class AccumuloOutputFormat extends 
OutputFormat<Text,Mutation> {
   }
   
   protected static int getMaxWriteThreads(Configuration conf) {
-    return conf.getInt(NUM_WRITE_THREADS, DEFAULT_NUM_WRITE_THREADS);
+    return getMaxWriteThreads(conf, DEFAULT_SEQUENCE);
+  }
+  
+  protected static int getMaxWriteThreads(Configuration conf, int sequence) {
+    return conf.getInt(merge(NUM_WRITE_THREADS, sequence), 
DEFAULT_NUM_WRITE_THREADS);
   }
   
   /**
@@ -325,8 +428,13 @@ public class AccumuloOutputFormat extends 
OutputFormat<Text,Mutation> {
   }
   
   protected static Level getLogLevel(Configuration conf) {
-    if (conf.get(LOGLEVEL) != null)
-      return Level.toLevel(conf.getInt(LOGLEVEL, Level.INFO.toInt()));
+    return getLogLevel(conf, DEFAULT_SEQUENCE);
+  }
+  
+  protected static Level getLogLevel(Configuration conf, int sequence) {
+    final String logLevel = merge(LOGLEVEL, sequence);
+    if (conf.get(logLevel) != null)
+      return Level.toLevel(conf.getInt(logLevel, Level.INFO.toInt()));
     return null;
   }
   
@@ -338,7 +446,11 @@ public class AccumuloOutputFormat extends 
OutputFormat<Text,Mutation> {
   }
   
   protected static boolean getSimulationMode(Configuration conf) {
-    return conf.getBoolean(SIMULATE, false);
+    return getSimulationMode(conf, DEFAULT_SEQUENCE);
+  }
+  
+  protected static boolean getSimulationMode(Configuration conf, int sequence) 
{
+    return conf.getBoolean(merge(SIMULATE, sequence), false);
   }
   
   private static class AccumuloRecordWriter extends 
RecordWriter<Text,Mutation> {
@@ -354,24 +466,25 @@ public class AccumuloOutputFormat extends 
OutputFormat<Text,Mutation> {
     
     private Connector conn;
     
-    AccumuloRecordWriter(TaskAttemptContext attempt) throws AccumuloException, 
AccumuloSecurityException {
-      Level l = getLogLevel(attempt);
+    AccumuloRecordWriter(TaskAttemptContext attempt, int sequence) throws 
AccumuloException, AccumuloSecurityException {
+      Configuration conf = attempt.getConfiguration();
+      Level l = getLogLevel(conf, sequence);
       if (l != null)
-        log.setLevel(getLogLevel(attempt));
-      this.simulate = getSimulationMode(attempt);
-      this.createTables = canCreateTables(attempt);
+        log.setLevel(getLogLevel(conf, sequence));
+      this.simulate = getSimulationMode(conf, sequence);
+      this.createTables = canCreateTables(conf, sequence);
       
       if (simulate)
         log.info("Simulating output only. No writes to tables will occur");
       
       this.bws = new HashMap<Text,BatchWriter>();
       
-      String tname = getDefaultTableName(attempt);
+      String tname = getDefaultTableName(conf, sequence);
       this.defaultTableName = (tname == null) ? null : new Text(tname);
       
       if (!simulate) {
-        this.conn = getInstance(attempt).getConnector(getUsername(attempt), 
getPassword(attempt));
-        mtbw = 
conn.createMultiTableBatchWriter(getMaxMutationBufferSize(attempt), 
getMaxLatency(attempt), getMaxWriteThreads(attempt));
+        this.conn = getInstance(conf, sequence).getConnector(getUsername(conf, 
sequence), getPassword(conf, sequence));
+        mtbw = conn.createMultiTableBatchWriter(getMaxMutationBufferSize(conf, 
sequence), getMaxLatency(conf, sequence), getMaxWriteThreads(conf, sequence));
       }
     }
     
@@ -495,15 +608,27 @@ public class AccumuloOutputFormat extends 
OutputFormat<Text,Mutation> {
   
   @Override
   public void checkOutputSpecs(JobContext job) throws IOException {
-    Configuration conf = job.getConfiguration();
-    if (!conf.getBoolean(OUTPUT_INFO_HAS_BEEN_SET, false))
-      throw new IOException("Output info has not been set.");
-    if (!conf.getBoolean(INSTANCE_HAS_BEEN_SET, false))
-      throw new IOException("Instance info has not been set.");
+    final int sequencesToCheck = NUM_CONFIGURATIONS_LOADED.get();
+    final Configuration conf = job.getConfiguration();
+    
+    if (0 == sequencesToCheck) {
+      checkConfiguration(conf, sequencesToCheck);
+    } else {
+      for (int i = 1; i <= sequencesToCheck; i++) {
+        checkConfiguration(conf, i);
+      }
+    }
+  }
+  
+  private void checkConfiguration(Configuration conf, int sequence) throws 
IOException {
+    if (!conf.getBoolean(merge(OUTPUT_INFO_HAS_BEEN_SET, sequence), false))
+      throw new IOException("Output info for sequence " + sequence + " has not 
been set.");
+    if (!conf.getBoolean(merge(INSTANCE_HAS_BEEN_SET, sequence), false))
+      throw new IOException("Instance info for sequence " + sequence + " has 
not been set.");
     try {
-      Connector c = getInstance(job).getConnector(getUsername(job), 
getPassword(job));
-      if (!c.securityOperations().authenticateUser(getUsername(job), 
getPassword(job)))
-        throw new IOException("Unable to authenticate user");
+      Connector c = getInstance(conf, sequence).getConnector(getUsername(conf, 
sequence), getPassword(conf, sequence));
+      if (!c.securityOperations().authenticateUser(getUsername(conf, 
sequence), getPassword(conf, sequence)))
+        throw new IOException("Unable to authenticate user for sequence " + 
sequence);
     } catch (AccumuloException e) {
       throw new IOException(e);
     } catch (AccumuloSecurityException e) {
@@ -518,8 +643,21 @@ public class AccumuloOutputFormat extends 
OutputFormat<Text,Mutation> {
   
   @Override
   public RecordWriter<Text,Mutation> getRecordWriter(TaskAttemptContext 
attempt) throws IOException {
+    final int sequence;
+    if (0 == NUM_CONFIGURATIONS_LOADED.get()) {
+      sequence = DEFAULT_SEQUENCE;
+      
+      log.debug("No sequence numbers were given, falling back to the default 
sequence number");
+    } else {
+      sequence = NUM_CONFIGURATIONS_PROCESSED.incrementAndGet();
+      
+      if (sequence > NUM_CONFIGURATIONS_LOADED.get()) {
+        log.warn("Attempting to use AccumuloOutputFormat information from 
Configuration using a sequence number that wasn't assigned");
+      }
+    }
+    
     try {
-      return new AccumuloRecordWriter(attempt);
+      return new AccumuloRecordWriter(attempt, sequence);
     } catch (Exception e) {
       throw new IOException(e);
     }

Reply via email to