ACCUMULO-1854 First stab at being able to use multiple AccumuloInputFormats and AccumuloOutputFormats in the same Configuration.
Modified the static API calls on both AIF and AOF to include the notion of a "sequence" number. The original methods still exist, defaulting to '0', to preserve backwards compatibility with API and functionality. A primitive method to get a sequence number was added to both and is used to properly generate InputSplits and output info. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/921617d4 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/921617d4 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/921617d4 Branch: refs/heads/ACCUMULO-1854-multi-aif Commit: 921617d4dac38e81571dc852308ff4924530092b Parents: 62c6a22 Author: Josh Elser <els...@apache.org> Authored: Tue Nov 5 16:58:52 2013 -0500 Committer: Josh Elser <els...@apache.org> Committed: Tue Nov 5 17:04:26 2013 -0500 ---------------------------------------------------------------------- .../client/mapreduce/AccumuloOutputFormat.java | 240 +++- .../core/client/mapreduce/InputFormatBase.java | 1050 +++++++++++++----- .../mapreduce/AccumuloInputFormatTest.java | 47 +- .../mapreduce/AccumuloOutputFormatTest.java | 39 + 4 files changed, 1045 insertions(+), 331 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/921617d4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java index 02516a3..9b9041a 100644 --- a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java +++ b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java @@ -19,6 +19,7 @@ package org.apache.accumulo.core.client.mapreduce; import java.io.IOException; import java.util.HashMap; import java.util.HashSet; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; @@ -87,6 +88,29 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { private static final long DEFAULT_MAX_MUTATION_BUFFER_SIZE = 50 * 1024 * 1024; // 50MB private static final int DEFAULT_MAX_LATENCY = 60 * 1000; // 1 minute private static final int DEFAULT_NUM_WRITE_THREADS = 2; + + private static final AtomicInteger NUM_CONFIGURATIONS_LOADED = new AtomicInteger(0); + private static final AtomicInteger NUM_CONFIGURATIONS_PROCESSED = new AtomicInteger(0); + private static final int DEFAULT_SEQUENCE = 0; + private static final String SEQ_DELIM = "."; + + /** + * Get a unique identifier for these configurations + * + * @return A unique number to provide to future AccumuloInputFormat calls + */ + public static int nextSequence() { + return NUM_CONFIGURATIONS_LOADED.incrementAndGet(); + } + + protected static String merge(String name, Integer sequence) { + return name + SEQ_DELIM + sequence; + } + + public static void resetCounters() { + NUM_CONFIGURATIONS_LOADED.set(0); + NUM_CONFIGURATIONS_PROCESSED.set(0); + } /** * Configure the output format. @@ -122,16 +146,35 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { * the table to use when the tablename is null in the write call */ public static void setOutputInfo(Configuration conf, String user, byte[] passwd, boolean createTables, String defaultTable) { - if (conf.getBoolean(OUTPUT_INFO_HAS_BEEN_SET, false)) - throw new IllegalStateException("Output info can only be set once per job"); - conf.setBoolean(OUTPUT_INFO_HAS_BEEN_SET, true); + setOutputInfo(conf, DEFAULT_SEQUENCE, user, passwd, createTables, defaultTable); + } + + /** + * Configure the output format. + * + * @param conf + * the Map/Reduce job object + * @param user + * the username, which must have the Table.CREATE permission to create tables + * @param passwd + * the passwd for the username + * @param createTables + * the output format will create new tables as necessary. Table names can only be alpha-numeric and underscores. + * @param defaultTable + * the table to use when the tablename is null in the write call + */ + public static void setOutputInfo(Configuration conf, int sequence, String user, byte[] passwd, boolean createTables, String defaultTable) { + final String outputInfoSet = merge(OUTPUT_INFO_HAS_BEEN_SET, sequence); + if (conf.getBoolean(outputInfoSet, false)) + throw new IllegalStateException("Output info for sequence " + sequence + " can only be set once per job"); + conf.setBoolean(outputInfoSet, true); ArgumentChecker.notNull(user, passwd); - conf.set(USERNAME, user); - conf.set(PASSWORD, new String(Base64.encodeBase64(passwd))); - conf.setBoolean(CREATETABLES, createTables); + conf.set(merge(USERNAME, sequence), user); + conf.set(merge(PASSWORD, sequence), new String(Base64.encodeBase64(passwd))); + conf.setBoolean(merge(CREATETABLES, sequence), createTables); if (defaultTable != null) - conf.set(DEFAULT_TABLE_NAME, defaultTable); + conf.set(merge(DEFAULT_TABLE_NAME, sequence), defaultTable); } /** @@ -142,13 +185,18 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { } public static void setZooKeeperInstance(Configuration conf, String instanceName, String zooKeepers) { - if (conf.getBoolean(INSTANCE_HAS_BEEN_SET, false)) - throw new IllegalStateException("Instance info can only be set once per job"); - conf.setBoolean(INSTANCE_HAS_BEEN_SET, true); + setZooKeeperInstance(conf, DEFAULT_SEQUENCE, instanceName, zooKeepers); + } + + public static void setZooKeeperInstance(Configuration conf, int sequence, String instanceName, String zooKeepers) { + final String instanceSet = merge(INSTANCE_HAS_BEEN_SET, sequence); + if (conf.getBoolean(instanceSet, false)) + throw new IllegalStateException("Instance info for sequence " + sequence + " can only be set once per job"); + conf.setBoolean(instanceSet, true); ArgumentChecker.notNull(instanceName, zooKeepers); - conf.set(INSTANCE_NAME, instanceName); - conf.set(ZOOKEEPERS, zooKeepers); + conf.set(merge(INSTANCE_NAME, sequence), instanceName); + conf.set(merge(ZOOKEEPERS, sequence), zooKeepers); } /** @@ -159,9 +207,13 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { } public static void setMockInstance(Configuration conf, String instanceName) { - conf.setBoolean(INSTANCE_HAS_BEEN_SET, true); - conf.setBoolean(MOCK, true); - conf.set(INSTANCE_NAME, instanceName); + setMockInstance(conf, DEFAULT_SEQUENCE, instanceName); + } + + public static void setMockInstance(Configuration conf, int sequence, String instanceName) { + conf.setBoolean(merge(INSTANCE_HAS_BEEN_SET, sequence), true); + conf.setBoolean(merge(MOCK, sequence), true); + conf.set(merge(INSTANCE_NAME, sequence), instanceName); } /** @@ -172,7 +224,11 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { } public static void setMaxMutationBufferSize(Configuration conf, long numberOfBytes) { - conf.setLong(MAX_MUTATION_BUFFER_SIZE, numberOfBytes); + setMaxMutationBufferSize(conf, DEFAULT_SEQUENCE, numberOfBytes); + } + + public static void setMaxMutationBufferSize(Configuration conf, int sequence, long numberOfBytes) { + conf.setLong(merge(MAX_MUTATION_BUFFER_SIZE, sequence), numberOfBytes); } /** @@ -183,7 +239,11 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { } public static void setMaxLatency(Configuration conf, int numberOfMilliseconds) { - conf.setInt(MAX_LATENCY, numberOfMilliseconds); + setMaxLatency(conf, DEFAULT_SEQUENCE, numberOfMilliseconds); + } + + public static void setMaxLatency(Configuration conf, int sequence, int numberOfMilliseconds) { + conf.setInt(merge(MAX_LATENCY, sequence), numberOfMilliseconds); } /** @@ -194,7 +254,11 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { } public static void setMaxWriteThreads(Configuration conf, int numberOfThreads) { - conf.setInt(NUM_WRITE_THREADS, numberOfThreads); + setMaxWriteThreads(conf, DEFAULT_SEQUENCE, numberOfThreads); + } + + public static void setMaxWriteThreads(Configuration conf, int sequence, int numberOfThreads) { + conf.setInt(merge(NUM_WRITE_THREADS, sequence), numberOfThreads); } /** @@ -205,8 +269,12 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { } public static void setLogLevel(Configuration conf, Level level) { + setLogLevel(conf, DEFAULT_SEQUENCE, level); + } + + public static void setLogLevel(Configuration conf, int sequence, Level level) { ArgumentChecker.notNull(level); - conf.setInt(LOGLEVEL, level.toInt()); + conf.setInt(merge(LOGLEVEL, sequence), level.toInt()); } /** @@ -217,7 +285,11 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { } public static void setSimulationMode(Configuration conf) { - conf.setBoolean(SIMULATE, true); + setSimulationMode(conf, DEFAULT_SEQUENCE); + } + + public static void setSimulationMode(Configuration conf, int sequence) { + conf.setBoolean(merge(SIMULATE, sequence), true); } /** @@ -228,7 +300,11 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { } protected static String getUsername(Configuration conf) { - return conf.get(USERNAME); + return getUsername(conf, DEFAULT_SEQUENCE); + } + + protected static String getUsername(Configuration conf, int sequence) { + return conf.get(merge(USERNAME, sequence)); } /** @@ -240,13 +316,16 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { protected static byte[] getPassword(JobContext job) { return getPassword(job.getConfiguration()); } + protected static byte[] getPassword(Configuration conf) { + return getPassword(conf, DEFAULT_SEQUENCE); + } /** * WARNING: The password is stored in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded to provide a charset safe conversion to a * string, and is not intended to be secure. */ - protected static byte[] getPassword(Configuration conf) { - return Base64.decodeBase64(conf.get(PASSWORD, "").getBytes()); + protected static byte[] getPassword(Configuration conf, int sequence) { + return Base64.decodeBase64(conf.get(merge(PASSWORD, sequence), "").getBytes()); } /** @@ -257,7 +336,11 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { } protected static boolean canCreateTables(Configuration conf) { - return conf.getBoolean(CREATETABLES, false); + return canCreateTables(conf, DEFAULT_SEQUENCE); + } + + protected static boolean canCreateTables(Configuration conf, int sequence) { + return conf.getBoolean(merge(CREATETABLES, sequence), false); } /** @@ -268,7 +351,11 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { } protected static String getDefaultTableName(Configuration conf) { - return conf.get(DEFAULT_TABLE_NAME); + return getDefaultTableName(conf, DEFAULT_SEQUENCE); + } + + protected static String getDefaultTableName(Configuration conf, int sequence) { + return conf.get(merge(DEFAULT_TABLE_NAME, sequence)); } /** @@ -279,9 +366,13 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { } protected static Instance getInstance(Configuration conf) { - if (conf.getBoolean(MOCK, false)) - return new MockInstance(conf.get(INSTANCE_NAME)); - return new ZooKeeperInstance(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS)); + return getInstance(conf, DEFAULT_SEQUENCE); + } + + protected static Instance getInstance(Configuration conf, int sequence) { + if (conf.getBoolean(merge(MOCK, sequence), false)) + return new MockInstance(conf.get(merge(INSTANCE_NAME, sequence))); + return new ZooKeeperInstance(conf.get(merge(INSTANCE_NAME, sequence)), conf.get(merge(ZOOKEEPERS, sequence))); } /** @@ -292,7 +383,11 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { } protected static long getMaxMutationBufferSize(Configuration conf) { - return conf.getLong(MAX_MUTATION_BUFFER_SIZE, DEFAULT_MAX_MUTATION_BUFFER_SIZE); + return getMaxMutationBufferSize(conf, DEFAULT_SEQUENCE); + } + + protected static long getMaxMutationBufferSize(Configuration conf, int sequence) { + return conf.getLong(merge(MAX_MUTATION_BUFFER_SIZE, sequence), DEFAULT_MAX_MUTATION_BUFFER_SIZE); } /** @@ -303,7 +398,11 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { } protected static int getMaxLatency(Configuration conf) { - return conf.getInt(MAX_LATENCY, DEFAULT_MAX_LATENCY); + return getMaxLatency(conf, DEFAULT_SEQUENCE); + } + + protected static int getMaxLatency(Configuration conf, int sequence) { + return conf.getInt(merge(MAX_LATENCY, sequence), DEFAULT_MAX_LATENCY); } /** @@ -314,7 +413,11 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { } protected static int getMaxWriteThreads(Configuration conf) { - return conf.getInt(NUM_WRITE_THREADS, DEFAULT_NUM_WRITE_THREADS); + return getMaxWriteThreads(conf, DEFAULT_SEQUENCE); + } + + protected static int getMaxWriteThreads(Configuration conf, int sequence) { + return conf.getInt(merge(NUM_WRITE_THREADS, sequence), DEFAULT_NUM_WRITE_THREADS); } /** @@ -325,8 +428,13 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { } protected static Level getLogLevel(Configuration conf) { - if (conf.get(LOGLEVEL) != null) - return Level.toLevel(conf.getInt(LOGLEVEL, Level.INFO.toInt())); + return getLogLevel(conf, DEFAULT_SEQUENCE); + } + + protected static Level getLogLevel(Configuration conf, int sequence) { + final String logLevel = merge(LOGLEVEL, sequence); + if (conf.get(logLevel) != null) + return Level.toLevel(conf.getInt(logLevel, Level.INFO.toInt())); return null; } @@ -338,7 +446,11 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { } protected static boolean getSimulationMode(Configuration conf) { - return conf.getBoolean(SIMULATE, false); + return getSimulationMode(conf, DEFAULT_SEQUENCE); + } + + protected static boolean getSimulationMode(Configuration conf, int sequence) { + return conf.getBoolean(merge(SIMULATE, sequence), false); } private static class AccumuloRecordWriter extends RecordWriter<Text,Mutation> { @@ -354,24 +466,25 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { private Connector conn; - AccumuloRecordWriter(TaskAttemptContext attempt) throws AccumuloException, AccumuloSecurityException { - Level l = getLogLevel(attempt); + AccumuloRecordWriter(TaskAttemptContext attempt, int sequence) throws AccumuloException, AccumuloSecurityException { + Configuration conf = attempt.getConfiguration(); + Level l = getLogLevel(conf, sequence); if (l != null) - log.setLevel(getLogLevel(attempt)); - this.simulate = getSimulationMode(attempt); - this.createTables = canCreateTables(attempt); + log.setLevel(getLogLevel(conf, sequence)); + this.simulate = getSimulationMode(conf, sequence); + this.createTables = canCreateTables(conf, sequence); if (simulate) log.info("Simulating output only. No writes to tables will occur"); this.bws = new HashMap<Text,BatchWriter>(); - String tname = getDefaultTableName(attempt); + String tname = getDefaultTableName(conf, sequence); this.defaultTableName = (tname == null) ? null : new Text(tname); if (!simulate) { - this.conn = getInstance(attempt).getConnector(getUsername(attempt), getPassword(attempt)); - mtbw = conn.createMultiTableBatchWriter(getMaxMutationBufferSize(attempt), getMaxLatency(attempt), getMaxWriteThreads(attempt)); + this.conn = getInstance(conf, sequence).getConnector(getUsername(conf, sequence), getPassword(conf, sequence)); + mtbw = conn.createMultiTableBatchWriter(getMaxMutationBufferSize(conf, sequence), getMaxLatency(conf, sequence), getMaxWriteThreads(conf, sequence)); } } @@ -495,15 +608,27 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { @Override public void checkOutputSpecs(JobContext job) throws IOException { - Configuration conf = job.getConfiguration(); - if (!conf.getBoolean(OUTPUT_INFO_HAS_BEEN_SET, false)) - throw new IOException("Output info has not been set."); - if (!conf.getBoolean(INSTANCE_HAS_BEEN_SET, false)) - throw new IOException("Instance info has not been set."); + final int sequencesToCheck = NUM_CONFIGURATIONS_LOADED.get(); + final Configuration conf = job.getConfiguration(); + + if (0 == sequencesToCheck) { + checkConfiguration(conf, sequencesToCheck); + } else { + for (int i = 1; i <= sequencesToCheck; i++) { + checkConfiguration(conf, i); + } + } + } + + private void checkConfiguration(Configuration conf, int sequence) throws IOException { + if (!conf.getBoolean(merge(OUTPUT_INFO_HAS_BEEN_SET, sequence), false)) + throw new IOException("Output info for sequence " + sequence + " has not been set."); + if (!conf.getBoolean(merge(INSTANCE_HAS_BEEN_SET, sequence), false)) + throw new IOException("Instance info for sequence " + sequence + " has not been set."); try { - Connector c = getInstance(job).getConnector(getUsername(job), getPassword(job)); - if (!c.securityOperations().authenticateUser(getUsername(job), getPassword(job))) - throw new IOException("Unable to authenticate user"); + Connector c = getInstance(conf, sequence).getConnector(getUsername(conf, sequence), getPassword(conf, sequence)); + if (!c.securityOperations().authenticateUser(getUsername(conf, sequence), getPassword(conf, sequence))) + throw new IOException("Unable to authenticate user for sequence " + sequence); } catch (AccumuloException e) { throw new IOException(e); } catch (AccumuloSecurityException e) { @@ -518,8 +643,21 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { @Override public RecordWriter<Text,Mutation> getRecordWriter(TaskAttemptContext attempt) throws IOException { + final int sequence; + if (0 == NUM_CONFIGURATIONS_LOADED.get()) { + sequence = DEFAULT_SEQUENCE; + + log.debug("No sequence numbers were given, falling back to the default sequence number"); + } else { + sequence = NUM_CONFIGURATIONS_PROCESSED.incrementAndGet(); + + if (sequence > NUM_CONFIGURATIONS_LOADED.get()) { + log.warn("Attempting to use AccumuloOutputFormat information from Configuration using a sequence number that wasn't assigned"); + } + } + try { - return new AccumuloRecordWriter(attempt); + return new AccumuloRecordWriter(attempt, sequence); } catch (Exception e) { throw new IOException(e); }