ACCUMULO-1854 Lift duplicated code between AIF and AOF into a helper class
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0f10a6ff Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0f10a6ff Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0f10a6ff Branch: refs/heads/ACCUMULO-1854-multi-aif Commit: 0f10a6ffb0400424d30d3f49312bb500265cb276 Parents: 1fe2238 Author: Josh Elser <els...@apache.org> Authored: Wed Nov 6 17:51:06 2013 -0500 Committer: Josh Elser <els...@apache.org> Committed: Wed Nov 6 17:51:06 2013 -0500 ---------------------------------------------------------------------- .../client/mapreduce/AccumuloOutputFormat.java | 130 +++---------- .../core/client/mapreduce/InputFormatBase.java | 184 +++++-------------- .../client/mapreduce/SequencedFormatHelper.java | 145 +++++++++++++++ .../mapreduce/AccumuloInputFormatTest.java | 7 +- 4 files changed, 214 insertions(+), 252 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/0f10a6ff/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java index 5e5e43d..dd9762e 100644 --- a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java +++ b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java @@ -96,14 +96,9 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { private static final int DEFAULT_MAX_LATENCY = 60 * 1000; // 1 minute private static final int DEFAULT_NUM_WRITE_THREADS = 2; - private static final int DEFAULT_SEQUENCE = 0; private static final String SEQ_DELIM = "."; - private static final String COMMA = ","; private static final String CONFIGURED_SEQUENCES = PREFIX + ".configuredSeqs"; - private static final String DEFAULT_SEQ_USED = PREFIX + ".defaultSequenceUsed"; - private static final String PROCESSED_SEQUENCES = PREFIX + ".processedSeqs"; - private static final String TRUE = "true"; @@ -113,17 +108,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { * @return A unique number to provide to future AccumuloInputFormat calls */ public static synchronized int nextSequence(Configuration conf) { - String value = conf.get(CONFIGURED_SEQUENCES); - if (null == value) { - conf.set(CONFIGURED_SEQUENCES, "1"); - return 1; - } else { - String[] splitValues = StringUtils.split(value, COMMA); - int newValue = Integer.parseInt(splitValues[splitValues.length-1]) + 1; - - conf.set(CONFIGURED_SEQUENCES, value + COMMA + newValue); - return newValue; - } + return SequencedFormatHelper.nextSequence(conf, PREFIX); } /** @@ -133,84 +118,11 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { * @throws NoSuchElementException */ protected static synchronized int nextSequenceToProcess(Configuration conf) throws NoSuchElementException { - String[] processedConfs = conf.getStrings(PROCESSED_SEQUENCES); - - // We haven't set anything, so we need to find the first to return - if (null == processedConfs || 0 == processedConfs.length) { - // Check to see if the default sequence was used - boolean defaultSeqUsed = conf.getBoolean(DEFAULT_SEQ_USED, false); - - // If so, set that we're processing it and return the value of the default - if (defaultSeqUsed) { - conf.set(PROCESSED_SEQUENCES, Integer.toString(DEFAULT_SEQUENCE)); - return DEFAULT_SEQUENCE; - } - - String[] loadedConfs = conf.getStrings(CONFIGURED_SEQUENCES); - - // There was *nothing* loaded, fail. - if (null == loadedConfs || 0 == loadedConfs.length) { - throw new NoSuchElementException("Sequence was requested to process but none exist to return"); - } - - // We have loaded configuration(s), use the first - int firstLoaded = Integer.parseInt(loadedConfs[0]); - conf.setInt(PROCESSED_SEQUENCES, firstLoaded); - - return firstLoaded; - } - - // We've previously parsed some confs, need to find the next one to load - int lastProcessedSeq = Integer.valueOf(processedConfs[processedConfs.length - 1]); - String[] configuredSequencesArray = conf.getStrings(CONFIGURED_SEQUENCES); - - // We only have the default sequence, no specifics. - // Getting here, we already know that we processed that default - if (null == configuredSequencesArray) { - return -1; - } - - List<Integer> configuredSequences = new ArrayList<Integer>(configuredSequencesArray.length + 1); - - // If we used the default sequence ID, add that into the list of configured sequences - if (conf.getBoolean(DEFAULT_SEQ_USED, false)) { - configuredSequences.add(DEFAULT_SEQUENCE); - } - - // Add the rest of any sequences to our list - for (String configuredSequence : configuredSequencesArray) { - configuredSequences.add(Integer.parseInt(configuredSequence)); - } - - int lastParsedSeqIndex = configuredSequences.size() - 1; - - // Find the next sequence number after the one we last processed - for (; lastParsedSeqIndex >= 0; lastParsedSeqIndex--) { - int lastLoadedValue = configuredSequences.get(lastParsedSeqIndex); - - if (lastLoadedValue == lastProcessedSeq) { - break; - } - } - - // We either had no sequences to match or we matched the last configured sequence - // Both of which are equivalent to no (more) sequences to process - if (-1 == lastParsedSeqIndex || lastParsedSeqIndex + 1 >= configuredSequences.size()) { - return -1; - } - - // Get the value of the sequence at that offset - int nextSequence = configuredSequences.get(lastParsedSeqIndex + 1); - conf.set(PROCESSED_SEQUENCES, conf.get(PROCESSED_SEQUENCES) + COMMA + nextSequence); - - return nextSequence; + return SequencedFormatHelper.nextSequenceToProcess(conf, PREFIX); } protected static void setDefaultSequenceUsed(Configuration conf) { - String value = conf.get(DEFAULT_SEQ_USED); - if (null == value || !TRUE.equals(value)) { - conf.setBoolean(DEFAULT_SEQ_USED, true); - } + SequencedFormatHelper.setDefaultSequenceUsed(conf, PREFIX); } protected static String merge(String name, Integer sequence) { @@ -266,7 +178,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { */ public static void setOutputInfo(Configuration conf, String user, byte[] passwd, boolean createTables, String defaultTable) { setDefaultSequenceUsed(conf); - setOutputInfo(conf, DEFAULT_SEQUENCE, user, passwd, createTables, defaultTable); + setOutputInfo(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, user, passwd, createTables, defaultTable); } /** @@ -306,7 +218,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { public static void setZooKeeperInstance(Configuration conf, String instanceName, String zooKeepers) { setDefaultSequenceUsed(conf); - setZooKeeperInstance(conf, DEFAULT_SEQUENCE, instanceName, zooKeepers); + setZooKeeperInstance(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, instanceName, zooKeepers); } public static void setZooKeeperInstance(Configuration conf, int sequence, String instanceName, String zooKeepers) { @@ -329,7 +241,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { public static void setMockInstance(Configuration conf, String instanceName) { setDefaultSequenceUsed(conf); - setMockInstance(conf, DEFAULT_SEQUENCE, instanceName); + setMockInstance(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, instanceName); } public static void setMockInstance(Configuration conf, int sequence, String instanceName) { @@ -347,7 +259,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { public static void setMaxMutationBufferSize(Configuration conf, long numberOfBytes) { setDefaultSequenceUsed(conf); - setMaxMutationBufferSize(conf, DEFAULT_SEQUENCE, numberOfBytes); + setMaxMutationBufferSize(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, numberOfBytes); } public static void setMaxMutationBufferSize(Configuration conf, int sequence, long numberOfBytes) { @@ -363,7 +275,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { public static void setMaxLatency(Configuration conf, int numberOfMilliseconds) { setDefaultSequenceUsed(conf); - setMaxLatency(conf, DEFAULT_SEQUENCE, numberOfMilliseconds); + setMaxLatency(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, numberOfMilliseconds); } public static void setMaxLatency(Configuration conf, int sequence, int numberOfMilliseconds) { @@ -379,7 +291,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { public static void setMaxWriteThreads(Configuration conf, int numberOfThreads) { setDefaultSequenceUsed(conf); - setMaxWriteThreads(conf, DEFAULT_SEQUENCE, numberOfThreads); + setMaxWriteThreads(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, numberOfThreads); } public static void setMaxWriteThreads(Configuration conf, int sequence, int numberOfThreads) { @@ -395,7 +307,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { public static void setLogLevel(Configuration conf, Level level) { setDefaultSequenceUsed(conf); - setLogLevel(conf, DEFAULT_SEQUENCE, level); + setLogLevel(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, level); } public static void setLogLevel(Configuration conf, int sequence, Level level) { @@ -412,7 +324,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { public static void setSimulationMode(Configuration conf) { setDefaultSequenceUsed(conf); - setSimulationMode(conf, DEFAULT_SEQUENCE); + setSimulationMode(conf, SequencedFormatHelper.DEFAULT_SEQUENCE); } public static void setSimulationMode(Configuration conf, int sequence) { @@ -427,7 +339,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { } protected static String getUsername(Configuration conf) { - return getUsername(conf, DEFAULT_SEQUENCE); + return getUsername(conf, SequencedFormatHelper.DEFAULT_SEQUENCE); } protected static String getUsername(Configuration conf, int sequence) { @@ -444,7 +356,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { return getPassword(job.getConfiguration()); } protected static byte[] getPassword(Configuration conf) { - return getPassword(conf, DEFAULT_SEQUENCE); + return getPassword(conf, SequencedFormatHelper.DEFAULT_SEQUENCE); } /** @@ -463,7 +375,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { } protected static boolean canCreateTables(Configuration conf) { - return canCreateTables(conf, DEFAULT_SEQUENCE); + return canCreateTables(conf, SequencedFormatHelper.DEFAULT_SEQUENCE); } protected static boolean canCreateTables(Configuration conf, int sequence) { @@ -478,7 +390,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { } protected static String getDefaultTableName(Configuration conf) { - return getDefaultTableName(conf, DEFAULT_SEQUENCE); + return getDefaultTableName(conf, SequencedFormatHelper.DEFAULT_SEQUENCE); } protected static String getDefaultTableName(Configuration conf, int sequence) { @@ -493,7 +405,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { } protected static Instance getInstance(Configuration conf) { - return getInstance(conf, DEFAULT_SEQUENCE); + return getInstance(conf, SequencedFormatHelper.DEFAULT_SEQUENCE); } protected static Instance getInstance(Configuration conf, int sequence) { @@ -510,7 +422,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { } protected static long getMaxMutationBufferSize(Configuration conf) { - return getMaxMutationBufferSize(conf, DEFAULT_SEQUENCE); + return getMaxMutationBufferSize(conf, SequencedFormatHelper.DEFAULT_SEQUENCE); } protected static long getMaxMutationBufferSize(Configuration conf, int sequence) { @@ -525,7 +437,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { } protected static int getMaxLatency(Configuration conf) { - return getMaxLatency(conf, DEFAULT_SEQUENCE); + return getMaxLatency(conf, SequencedFormatHelper.DEFAULT_SEQUENCE); } protected static int getMaxLatency(Configuration conf, int sequence) { @@ -540,7 +452,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { } protected static int getMaxWriteThreads(Configuration conf) { - return getMaxWriteThreads(conf, DEFAULT_SEQUENCE); + return getMaxWriteThreads(conf, SequencedFormatHelper.DEFAULT_SEQUENCE); } protected static int getMaxWriteThreads(Configuration conf, int sequence) { @@ -555,7 +467,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { } protected static Level getLogLevel(Configuration conf) { - return getLogLevel(conf, DEFAULT_SEQUENCE); + return getLogLevel(conf, SequencedFormatHelper.DEFAULT_SEQUENCE); } protected static Level getLogLevel(Configuration conf, int sequence) { @@ -573,7 +485,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { } protected static boolean getSimulationMode(Configuration conf) { - return getSimulationMode(conf, DEFAULT_SEQUENCE); + return getSimulationMode(conf, SequencedFormatHelper.DEFAULT_SEQUENCE); } protected static boolean getSimulationMode(Configuration conf, int sequence) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/0f10a6ff/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java ---------------------------------------------------------------------- diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java index 7042f19..5c87c13 100644 --- a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java +++ b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java @@ -151,124 +151,30 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { private static final String ITERATORS_DELIM = ","; private static final String SEQ_DELIM = "."; - protected static final int DEFAULT_SEQUENCE = 0; - private static final String COMMA = ","; - private static final String CONFIGURED_SEQUENCES = PREFIX + ".configuredsSeqs"; - private static final String DEFAULT_SEQ_USED = PREFIX + ".defaultSequenceUsed"; - private static final String PROCESSED_SEQUENCES = PREFIX + ".processedSeqs"; - private static final String TRUE = "true"; private static final String READ_OFFLINE = PREFIX + ".read.offline"; + protected static String merge(String name, Integer sequence) { + return name + SEQ_DELIM + sequence; + } + + /** * Get a unique identifier for these configurations * * @return A unique number to provide to future AccumuloInputFormat calls */ public static synchronized int nextSequence(Configuration conf) { - String value = conf.get(CONFIGURED_SEQUENCES); - if (null == value) { - conf.set(CONFIGURED_SEQUENCES, "1"); - return 1; - } else { - String[] splitValues = StringUtils.split(value, COMMA); - int newValue = Integer.parseInt(splitValues[splitValues.length-1]) + 1; - - conf.set(CONFIGURED_SEQUENCES, value + COMMA + newValue); - return newValue; - } + return SequencedFormatHelper.nextSequence(conf, PREFIX); } - /** - * Using the provided Configuration, return the next sequence number to process. - * @param conf A Configuration object used to store AccumuloInputFormat information into - * @return The next sequence number to process, -1 when finished. - * @throws NoSuchElementException - */ - protected static synchronized int nextSequenceToProcess(Configuration conf) throws NoSuchElementException { - String[] processedConfs = conf.getStrings(PROCESSED_SEQUENCES); - - // We haven't set anything, so we need to find the first to return - if (null == processedConfs || 0 == processedConfs.length) { - // Check to see if the default sequence was used - boolean defaultSeqUsed = conf.getBoolean(DEFAULT_SEQ_USED, false); - - // If so, set that we're processing it and return the value of the default - if (defaultSeqUsed) { - conf.set(PROCESSED_SEQUENCES, Integer.toString(DEFAULT_SEQUENCE)); - return DEFAULT_SEQUENCE; - } - - String[] loadedConfs = conf.getStrings(CONFIGURED_SEQUENCES); - - // There was *nothing* loaded, fail. - if (null == loadedConfs || 0 == loadedConfs.length) { - throw new NoSuchElementException("Sequence was requested to process but none exist to return"); - } - - // We have loaded configuration(s), use the first - int firstLoaded = Integer.parseInt(loadedConfs[0]); - conf.setInt(PROCESSED_SEQUENCES, firstLoaded); - - return firstLoaded; - } - - // We've previously parsed some confs, need to find the next one to load - int lastProcessedSeq = Integer.valueOf(processedConfs[processedConfs.length - 1]); - String[] configuredSequencesArray = conf.getStrings(CONFIGURED_SEQUENCES); - - // We only have the default sequence, no specifics. - // Getting here, we already know that we processed that default - if (null == configuredSequencesArray) { - return -1; - } - - List<Integer> configuredSequences = new ArrayList<Integer>(configuredSequencesArray.length + 1); - - // If we used the default sequence ID, add that into the list of configured sequences - if (conf.getBoolean(DEFAULT_SEQ_USED, false)) { - configuredSequences.add(DEFAULT_SEQUENCE); - } - - // Add the rest of any sequences to our list - for (String configuredSequence : configuredSequencesArray) { - configuredSequences.add(Integer.parseInt(configuredSequence)); - } - - int lastParsedSeqIndex = configuredSequences.size() - 1; - - // Find the next sequence number after the one we last processed - for (; lastParsedSeqIndex >= 0; lastParsedSeqIndex--) { - int lastLoadedValue = configuredSequences.get(lastParsedSeqIndex); - - if (lastLoadedValue == lastProcessedSeq) { - break; - } - } - - // We either had no sequences to match or we matched the last configured sequence - // Both of which are equivalent to no (more) sequences to process - if (-1 == lastParsedSeqIndex || lastParsedSeqIndex + 1 >= configuredSequences.size()) { - return -1; - } - - // Get the value of the sequence at that offset - int nextSequence = configuredSequences.get(lastParsedSeqIndex + 1); - conf.set(PROCESSED_SEQUENCES, conf.get(PROCESSED_SEQUENCES) + COMMA + nextSequence); - - return nextSequence; + protected static int nextSequenceToProcess(Configuration conf) { + return SequencedFormatHelper.nextSequenceToProcess(conf, PREFIX); } protected static void setDefaultSequenceUsed(Configuration conf) { - String value = conf.get(DEFAULT_SEQ_USED); - if (null == value || !TRUE.equals(value)) { - conf.setBoolean(DEFAULT_SEQ_USED, true); - } - } - - protected static String merge(String name, Integer sequence) { - return name + SEQ_DELIM + sequence; + SequencedFormatHelper.setDefaultSequenceUsed(conf, PREFIX); } public static Map<String,String> getRelevantEntries(Configuration conf) { @@ -302,7 +208,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { */ public static void setIsolated(Configuration conf, boolean enable) { setDefaultSequenceUsed(conf); - setIsolated(conf, DEFAULT_SEQUENCE, enable); + setIsolated(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, enable); } /** @@ -334,7 +240,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { */ public static void setLocalIterators(Configuration conf, boolean enable) { setDefaultSequenceUsed(conf); - setLocalIterators(conf, DEFAULT_SEQUENCE, enable); + setLocalIterators(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, enable); } /** @@ -372,7 +278,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { */ public static void setInputInfo(Configuration conf, String user, byte[] passwd, String table, Authorizations auths) { setDefaultSequenceUsed(conf); - setInputInfo(conf, DEFAULT_SEQUENCE, user, passwd, table, auths); + setInputInfo(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, user, passwd, table, auths); } /** @@ -422,7 +328,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { */ public static void setZooKeeperInstance(Configuration conf, String instanceName, String zooKeepers) { setDefaultSequenceUsed(conf); - setZooKeeperInstance(conf, DEFAULT_SEQUENCE, instanceName, zooKeepers); + setZooKeeperInstance(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, instanceName, zooKeepers); } /** @@ -463,7 +369,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { */ public static void setMockInstance(Configuration conf, String instanceName) { setDefaultSequenceUsed(conf); - setMockInstance(conf, DEFAULT_SEQUENCE, instanceName); + setMockInstance(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, instanceName); } /** @@ -497,7 +403,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { */ public static void setRanges(Configuration conf, Collection<Range> ranges) { setDefaultSequenceUsed(conf); - setRanges(conf, DEFAULT_SEQUENCE, ranges); + setRanges(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, ranges); } /** @@ -539,7 +445,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { */ public static void disableAutoAdjustRanges(Configuration conf) { setDefaultSequenceUsed(conf); - disableAutoAdjustRanges(conf, DEFAULT_SEQUENCE); + disableAutoAdjustRanges(conf, SequencedFormatHelper.DEFAULT_SEQUENCE); } /** @@ -569,7 +475,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { */ public static void setRegex(JobContext job, RegexType type, String regex) { setDefaultSequenceUsed(job.getConfiguration()); - setRegex(job, DEFAULT_SEQUENCE, type, regex); + setRegex(job, SequencedFormatHelper.DEFAULT_SEQUENCE, type, regex); } /** @@ -626,7 +532,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { */ public static void setMaxVersions(Configuration conf, int maxVersions) throws IOException { setDefaultSequenceUsed(conf); - setMaxVersions(conf, DEFAULT_SEQUENCE, maxVersions); + setMaxVersions(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, maxVersions); } /** @@ -675,7 +581,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { public static void setScanOffline(Configuration conf, boolean scanOff) { setDefaultSequenceUsed(conf); - setScanOffline(conf, DEFAULT_SEQUENCE, scanOff); + setScanOffline(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, scanOff); } /** @@ -727,7 +633,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { */ public static void fetchColumns(Configuration conf, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) { setDefaultSequenceUsed(conf); - fetchColumns(conf, DEFAULT_SEQUENCE, columnFamilyColumnQualifierPairs); + fetchColumns(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, columnFamilyColumnQualifierPairs); } /** @@ -771,7 +677,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { */ public static void setLogLevel(Configuration conf, Level level) { setDefaultSequenceUsed(conf); - setLogLevel(conf, DEFAULT_SEQUENCE, level); + setLogLevel(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, level); } /** @@ -806,7 +712,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { */ public static void addIterator(Configuration conf, IteratorSetting cfg) { setDefaultSequenceUsed(conf); - addIterator(conf, DEFAULT_SEQUENCE, cfg); + addIterator(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, cfg); } /** @@ -866,7 +772,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { */ public static void setIterator(JobContext job, int priority, String iteratorClass, String iteratorName) { setDefaultSequenceUsed(job.getConfiguration()); - setIterator(job, DEFAULT_SEQUENCE, priority, iteratorClass, iteratorName); + setIterator(job, SequencedFormatHelper.DEFAULT_SEQUENCE, priority, iteratorClass, iteratorName); } /** @@ -916,7 +822,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { */ public static void setIteratorOption(JobContext job, String iteratorName, String key, String value) { setDefaultSequenceUsed(job.getConfiguration()); - setIteratorOption(job, DEFAULT_SEQUENCE, iteratorName, key, value); + setIteratorOption(job, SequencedFormatHelper.DEFAULT_SEQUENCE, iteratorName, key, value); } /** @@ -967,7 +873,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { * @see #setIsolated(Configuration, boolean) */ protected static boolean isIsolated(Configuration conf) { - return isIsolated(conf, DEFAULT_SEQUENCE); + return isIsolated(conf, SequencedFormatHelper.DEFAULT_SEQUENCE); } /** @@ -998,7 +904,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { * @see #setLocalIterators(Configuration, boolean) */ protected static boolean usesLocalIterators(Configuration conf) { - return usesLocalIterators(conf, DEFAULT_SEQUENCE); + return usesLocalIterators(conf, SequencedFormatHelper.DEFAULT_SEQUENCE); } /** @@ -1029,7 +935,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { * @see #setInputInfo(Configuration, String, byte[], String, Authorizations) */ protected static String getUsername(Configuration conf) { - return getUsername(conf, DEFAULT_SEQUENCE); + return getUsername(conf, SequencedFormatHelper.DEFAULT_SEQUENCE); } /** @@ -1064,7 +970,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { * @see #setInputInfo(Configuration, String, byte[], String, Authorizations) */ protected static byte[] getPassword(Configuration conf) { - return getPassword(conf, DEFAULT_SEQUENCE); + return getPassword(conf, SequencedFormatHelper.DEFAULT_SEQUENCE); } /** @@ -1096,7 +1002,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { * @see #setInputInfo(Configuration, String, byte[], String, Authorizations) */ protected static String getTablename(Configuration conf) { - return getTablename(conf, DEFAULT_SEQUENCE); + return getTablename(conf, SequencedFormatHelper.DEFAULT_SEQUENCE); } /** @@ -1127,7 +1033,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { * @see #setInputInfo(Configuration, String, byte[], String, Authorizations) */ protected static Authorizations getAuthorizations(Configuration conf) { - return getAuthorizations(conf, DEFAULT_SEQUENCE); + return getAuthorizations(conf, SequencedFormatHelper.DEFAULT_SEQUENCE); } /** @@ -1160,7 +1066,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { * @see #setMockInstance(Configuration, String) */ protected static Instance getInstance(Configuration conf) { - return getInstance(conf, DEFAULT_SEQUENCE); + return getInstance(conf, SequencedFormatHelper.DEFAULT_SEQUENCE); } /** @@ -1195,7 +1101,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { * if the table name set on the configuration doesn't exist */ protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException { - return getTabletLocator(conf, DEFAULT_SEQUENCE); + return getTabletLocator(conf, SequencedFormatHelper.DEFAULT_SEQUENCE); } /** @@ -1236,7 +1142,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { * @see #setRanges(Configuration, Collection) */ protected static List<Range> getRanges(Configuration conf) throws IOException { - return getRanges(conf, DEFAULT_SEQUENCE); + return getRanges(conf, SequencedFormatHelper.DEFAULT_SEQUENCE); } /** @@ -1265,7 +1171,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { * @see #setRegex(JobContext, RegexType, String) */ protected static String getRegex(JobContext job, RegexType type) { - return getRegex(job, DEFAULT_SEQUENCE, type); + return getRegex(job, SequencedFormatHelper.DEFAULT_SEQUENCE, type); } /** @@ -1317,7 +1223,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { * @see #fetchColumns(Configuration, Collection) */ protected static Set<Pair<Text,Text>> getFetchedColumns(Configuration conf) { - return getFetchedColumns(conf, DEFAULT_SEQUENCE); + return getFetchedColumns(conf, SequencedFormatHelper.DEFAULT_SEQUENCE); } /** @@ -1355,7 +1261,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { * @see #disableAutoAdjustRanges(Configuration) */ protected static boolean getAutoAdjustRanges(Configuration conf) { - return getAutoAdjustRanges(conf, DEFAULT_SEQUENCE); + return getAutoAdjustRanges(conf, SequencedFormatHelper.DEFAULT_SEQUENCE); } /** @@ -1386,7 +1292,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { * @see #setLogLevel(Configuration, Level) */ protected static Level getLogLevel(Configuration conf) { - return getLogLevel(conf, DEFAULT_SEQUENCE); + return getLogLevel(conf, SequencedFormatHelper.DEFAULT_SEQUENCE); } /** @@ -1421,7 +1327,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { * if the configuration is improperly configured */ protected static void validateOptions(Configuration conf) throws IOException { - validateOptions(conf, DEFAULT_SEQUENCE); + validateOptions(conf, SequencedFormatHelper.DEFAULT_SEQUENCE); } // InputFormat doesn't have the equivalent of OutputFormat's @@ -1479,7 +1385,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { */ protected static int getMaxVersions(Configuration conf) { setDefaultSequenceUsed(conf); - return getMaxVersions(conf, DEFAULT_SEQUENCE); + return getMaxVersions(conf, SequencedFormatHelper.DEFAULT_SEQUENCE); } /** @@ -1495,7 +1401,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { } protected static boolean isOfflineScan(Configuration conf) { - return isOfflineScan(conf, DEFAULT_SEQUENCE); + return isOfflineScan(conf, SequencedFormatHelper.DEFAULT_SEQUENCE); } protected static boolean isOfflineScan(Configuration conf, int sequence) { @@ -1520,7 +1426,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { * @see #addIterator(Configuration, IteratorSetting) */ protected static List<AccumuloIterator> getIterators(Configuration conf) { - return getIterators(conf, DEFAULT_SEQUENCE); + return getIterators(conf, SequencedFormatHelper.DEFAULT_SEQUENCE); } /** @@ -1565,7 +1471,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { * @see #addIterator(Configuration, IteratorSetting) */ protected static List<AccumuloIteratorOption> getIteratorOptions(Configuration conf) { - return getIteratorOptions(conf, DEFAULT_SEQUENCE); + return getIteratorOptions(conf, SequencedFormatHelper.DEFAULT_SEQUENCE); } /** @@ -1635,7 +1541,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { * @deprecated Use {@link #setupIterators(Configuration,Scanner)} instead */ protected void setupIterators(TaskAttemptContext attempt, Scanner scanner) throws AccumuloException { - setupIterators(attempt.getConfiguration(), DEFAULT_SEQUENCE, scanner); + setupIterators(attempt.getConfiguration(), SequencedFormatHelper.DEFAULT_SEQUENCE, scanner); } /** @@ -1667,7 +1573,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { * @deprecated Use {@link #setupMaxVersions(Configuration,Scanner)} instead */ protected void setupMaxVersions(TaskAttemptContext attempt, Scanner scanner) { - setupMaxVersions(attempt.getConfiguration(), DEFAULT_SEQUENCE, scanner); + setupMaxVersions(attempt.getConfiguration(), SequencedFormatHelper.DEFAULT_SEQUENCE, scanner); } /** @@ -2046,7 +1952,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V> { } RangeInputSplit(String table, Range range, String[] locations) { - this(table, range, locations, DEFAULT_SEQUENCE); + this(table, range, locations, SequencedFormatHelper.DEFAULT_SEQUENCE); } RangeInputSplit(String table, Range range, String[] locations, int sequence) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/0f10a6ff/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/SequencedFormatHelper.java ---------------------------------------------------------------------- diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/SequencedFormatHelper.java b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/SequencedFormatHelper.java new file mode 100644 index 0000000..ff18754 --- /dev/null +++ b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/SequencedFormatHelper.java @@ -0,0 +1,145 @@ +package org.apache.accumulo.core.client.mapreduce; + +import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; + +import org.apache.accumulo.core.util.ArgumentChecker; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; + +/** + * Convenience class with methods useful to dealing with multiple configurations of AccumuloInputFormat and/or AccumuloOutputFormat in the same Configuration + * object. + */ +public class SequencedFormatHelper { + + private static final String COMMA = ","; + private static final String TRUE = "true"; + protected static final int DEFAULT_SEQUENCE = 0; + + private static final String DEFAULT_SEQ_USED = ".defaultSequenceUsed"; + private static final String CONFIGURED_SEQUENCES = ".configuredSeqs"; + private static final String PROCESSED_SEQUENCES = ".processedSeqs"; + + /** + * Get a unique identifier for these configurations + * + * @return A unique number to provide to future AccumuloInputFormat calls + */ + public static synchronized int nextSequence(Configuration conf, String prefix) { + ArgumentChecker.notNull(conf, prefix); + + final String configuredSequences = prefix + CONFIGURED_SEQUENCES; + + String value = conf.get(configuredSequences); + if (null == value) { + conf.set(configuredSequences, "1"); + return 1; + } else { + String[] splitValues = StringUtils.split(value, COMMA); + int newValue = Integer.parseInt(splitValues[splitValues.length - 1]) + 1; + + conf.set(configuredSequences, value + COMMA + newValue); + return newValue; + } + } + + protected static void setDefaultSequenceUsed(Configuration conf, String prefix) { + ArgumentChecker.notNull(conf, prefix); + + final String configuredSequences = prefix + DEFAULT_SEQ_USED; + + String value = conf.get(configuredSequences); + if (null == value || !TRUE.equals(value)) { + conf.setBoolean(configuredSequences, true); + } + } + + /** + * Using the provided Configuration, return the next sequence number to process. + * + * @param conf + * A Configuration object used to store AccumuloInputFormat information into + * @return The next sequence number to process, -1 when finished. + * @throws NoSuchElementException + */ + protected static synchronized int nextSequenceToProcess(Configuration conf, String prefix) throws NoSuchElementException { + ArgumentChecker.notNull(prefix); + + final String processedSequences = prefix + PROCESSED_SEQUENCES, defaultSequenceUsed = prefix + DEFAULT_SEQ_USED, configuredSequences = prefix + + CONFIGURED_SEQUENCES; + + String[] processedConfs = conf.getStrings(processedSequences); + + // We haven't set anything, so we need to find the first to return + if (null == processedConfs || 0 == processedConfs.length) { + // Check to see if the default sequence was used + boolean defaultSeqUsed = conf.getBoolean(defaultSequenceUsed, false); + + // If so, set that we're processing it and return the value of the default + if (defaultSeqUsed) { + conf.set(processedSequences, Integer.toString(DEFAULT_SEQUENCE)); + return DEFAULT_SEQUENCE; + } + + String[] loadedConfs = conf.getStrings(configuredSequences); + + // There was *nothing* loaded, fail. + if (null == loadedConfs || 0 == loadedConfs.length) { + throw new NoSuchElementException("Sequence was requested to process but none exist to return"); + } + + // We have loaded configuration(s), use the first + int firstLoaded = Integer.parseInt(loadedConfs[0]); + conf.setInt(processedSequences, firstLoaded); + + return firstLoaded; + } + + // We've previously parsed some confs, need to find the next one to load + int lastProcessedSeq = Integer.valueOf(processedConfs[processedConfs.length - 1]); + String[] configuredSequencesArray = conf.getStrings(configuredSequences); + + // We only have the default sequence, no specifics. + // Getting here, we already know that we processed that default + if (null == configuredSequencesArray) { + return -1; + } + + List<Integer> configuredSequencesList = new ArrayList<Integer>(configuredSequencesArray.length + 1); + + // If we used the default sequence ID, add that into the list of configured sequences + if (conf.getBoolean(defaultSequenceUsed, false)) { + configuredSequencesList.add(DEFAULT_SEQUENCE); + } + + // Add the rest of any sequences to our list + for (String configuredSequence : configuredSequencesArray) { + configuredSequencesList.add(Integer.parseInt(configuredSequence)); + } + + int lastParsedSeqIndex = configuredSequencesList.size() - 1; + + // Find the next sequence number after the one we last processed + for (; lastParsedSeqIndex >= 0; lastParsedSeqIndex--) { + int lastLoadedValue = configuredSequencesList.get(lastParsedSeqIndex); + + if (lastLoadedValue == lastProcessedSeq) { + break; + } + } + + // We either had no sequences to match or we matched the last configured sequence + // Both of which are equivalent to no (more) sequences to process + if (-1 == lastParsedSeqIndex || lastParsedSeqIndex + 1 >= configuredSequencesList.size()) { + return -1; + } + + // Get the value of the sequence at that offset + int nextSequence = configuredSequencesList.get(lastParsedSeqIndex + 1); + conf.set(processedSequences, conf.get(processedSequences) + COMMA + nextSequence); + + return nextSequence; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/0f10a6ff/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java ---------------------------------------------------------------------- diff --git a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java index 98f3e7a..a6f5c48 100644 --- a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java +++ b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java @@ -48,7 +48,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.junit.After; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; public class AccumuloInputFormatTest { @@ -101,7 +100,7 @@ public class AccumuloInputFormatTest { AccumuloInputFormat.setIterator(job, 1, "org.apache.accumulo.core.iterators.WholeRowIterator", "WholeRow"); Configuration conf = job.getConfiguration(); - String iterators = conf.get("AccumuloInputFormat.iterators." + InputFormatBase.DEFAULT_SEQUENCE); + String iterators = conf.get("AccumuloInputFormat.iterators." + SequencedFormatHelper.DEFAULT_SEQUENCE); assertEquals("1:org.apache.accumulo.core.iterators.WholeRowIterator:WholeRow", iterators); } @@ -164,7 +163,7 @@ public class AccumuloInputFormatTest { final String rawConfigOpt = new AccumuloIteratorOption("iterator", key, value).toString(); - assertEquals(rawConfigOpt, job.getConfiguration().get("AccumuloInputFormat.iterators.options." + InputFormatBase.DEFAULT_SEQUENCE)); + assertEquals(rawConfigOpt, job.getConfiguration().get("AccumuloInputFormat.iterators.options." + SequencedFormatHelper.DEFAULT_SEQUENCE)); List<AccumuloIteratorOption> opts = AccumuloInputFormat.getIteratorOptions(job.getConfiguration()); assertEquals(1, opts.size()); @@ -228,7 +227,7 @@ public class AccumuloInputFormatTest { AccumuloInputFormat.setIteratorOption(job, "someIterator", "aKey", "aValue"); Configuration conf = job.getConfiguration(); - String options = conf.get("AccumuloInputFormat.iterators.options." + InputFormatBase.DEFAULT_SEQUENCE); + String options = conf.get("AccumuloInputFormat.iterators.options." + SequencedFormatHelper.DEFAULT_SEQUENCE); assertEquals(new String("someIterator:aKey:aValue"), options); }