Merge branch '1.5.2-SNAPSHOT' into 1.6.0-SNAPSHOT Conflicts: core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/86cafd97 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/86cafd97 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/86cafd97 Branch: refs/heads/master Commit: 86cafd972793330026c6e4739a2a94320608a333 Parents: 3a1b387 22a6209 Author: Josh Elser <els...@apache.org> Authored: Tue Apr 1 17:08:00 2014 -0400 Committer: Josh Elser <els...@apache.org> Committed: Tue Apr 1 17:26:44 2014 -0400 ---------------------------------------------------------------------- .../core/client/mapred/AccumuloInputFormat.java | 5 ++--- .../core/client/mapred/InputFormatBase.java | 21 ++++++++++++++++-- .../client/mapreduce/AccumuloInputFormat.java | 4 ++-- .../core/client/mapreduce/InputFormatBase.java | 23 ++++++++++++++++++-- .../BadPasswordSplitsAccumuloInputFormat.java | 2 +- .../EmptySplitsAccumuloInputFormat.java | 2 +- 6 files changed, 46 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/86cafd97/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java index 135791e,534a095..18e286a --- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java @@@ -19,8 -19,6 +19,7 @@@ package org.apache.accumulo.core.client import java.io.IOException; import java.util.Map.Entry; +import org.apache.accumulo.core.client.ClientConfiguration; - import org.apache.accumulo.core.client.mapreduce.RangeInputSplit; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; @@@ -54,18 -49,8 +53,18 @@@ public class AccumuloInputFormat extend @Override public RecordReader<Key,Value> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { log.setLevel(getLogLevel(job)); + + // Override the log level from the configuration as if the RangeInputSplit has one it's the more correct one to use. - if (split instanceof RangeInputSplit) { - RangeInputSplit risplit = (RangeInputSplit) split; ++ if (split instanceof org.apache.accumulo.core.client.mapreduce.RangeInputSplit) { ++ org.apache.accumulo.core.client.mapreduce.RangeInputSplit risplit = (org.apache.accumulo.core.client.mapreduce.RangeInputSplit) split; + Level level = risplit.getLogLevel(); + if (null != level) { + log.setLevel(level); + } + } + RecordReaderBase<Key,Value> recordReader = new RecordReaderBase<Key,Value>() { - + @Override public boolean next(Key key, Value value) throws IOException { if (scannerIterator.hasNext()) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/86cafd97/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java index 896ab1d,0438b78..db7dcd0 --- a/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java @@@ -297,27 -496,47 +297,27 @@@ public abstract class InputFormatBase<K } /** - * Initializes an Accumulo {@link TabletLocator} based on the configuration. + * Initializes an Accumulo {@link org.apache.accumulo.core.client.impl.TabletLocator} based on the configuration. * * @param job - * the Hadoop context for the configured job + * the Hadoop job for the configured job * @return an Accumulo tablet locator - * @throws TableNotFoundException - * if the table name set on the configuration doesn't exist + * @throws org.apache.accumulo.core.client.TableNotFoundException + * if the table name set on the job doesn't exist * @since 1.5.0 + * @deprecated since 1.6.0 */ + @Deprecated protected static TabletLocator getTabletLocator(JobConf job) throws TableNotFoundException { - return InputConfigurator.getTabletLocator(CLASS, job); + return InputConfigurator.getTabletLocator(CLASS, job, InputConfigurator.getInputTableName(CLASS, job)); } - // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job) - /** - * Check whether a configuration is fully configured to be used with an Accumulo {@link org.apache.hadoop.mapreduce.InputFormat}. - * - * @param job - * the Hadoop context for the configured job - * @throws IOException - * if the context is improperly configured - * @since 1.5.0 - */ - protected static void validateOptions(JobConf job) throws IOException { - InputConfigurator.validateOptions(CLASS, job); - } + protected abstract static class RecordReaderBase<K,V> extends AbstractRecordReader<K,V> { - /** - * An abstract base class to be used to create {@link RecordReader} instances that convert from Accumulo {@link Key}/{@link Value} pairs to the user's K/V - * types. - * - * Subclasses must implement {@link #next(Object, Object)} to update key and value, and also to update the following variables: - * <ul> - * <li>Key {@link #currentKey} (used for progress reporting)</li> - * <li>int {@link #numKeysRead} (used for progress reporting)</li> - * </ul> - */ - protected abstract static class RecordReaderBase<K,V> implements RecordReader<K,V> { - protected long numKeysRead; - protected Iterator<Entry<Key,Value>> scannerIterator; - protected org.apache.accumulo.core.client.mapred.RangeInputSplit split; + @Override - protected void setupIterators(JobConf job, Scanner scanner, String tableName, RangeInputSplit split) { ++ protected void setupIterators(JobConf job, Scanner scanner, String tableName, org.apache.accumulo.core.client.mapred.RangeInputSplit split) { + setupIterators(job, scanner, split); + } /** * Apply the configured iterators from the configuration to the scanner. @@@ -327,16 -544,381 +327,33 @@@ * @param scanner * the scanner to configure */ - protected void setupIterators(JobConf job, Scanner scanner, RangeInputSplit split) { - protected void setupIterators(List<IteratorSetting> iterators, Scanner scanner) { - for (IteratorSetting iterator : iterators) { - scanner.addScanIterator(iterator); - } - } - - /** - * Initialize a scanner over the given input split using this task attempt configuration. - */ - public void initialize(InputSplit inSplit, JobConf job) throws IOException { - Scanner scanner; - split = (org.apache.accumulo.core.client.mapred.RangeInputSplit) inSplit; - log.debug("Initializing input split: " + split.getRange()); - - Instance instance = split.getInstance(); - if (null == instance) { - instance = getInstance(job); - } - - String principal = split.getPrincipal(); - if (null == principal) { - principal = getPrincipal(job); - } - - AuthenticationToken token = split.getToken(); - if (null == token) { - String tokenClass = getTokenClass(job); - byte[] tokenBytes = getToken(job); - try { - token = CredentialHelper.extractToken(tokenClass, tokenBytes); - } catch (AccumuloSecurityException e) { - throw new IOException(e); - } - } - - Authorizations authorizations = split.getAuths(); - if (null == authorizations) { - authorizations = getScanAuthorizations(job); - } - - String table = split.getTable(); - if (null == table) { - table = getInputTableName(job); - } - - Boolean isOffline = split.isOffline(); - if (null == isOffline) { - isOffline = isOfflineScan(job); - } - - Boolean isIsolated = split.isIsolatedScan(); - if (null == isIsolated) { - isIsolated = isIsolated(job); - } - - Boolean usesLocalIterators = split.usesLocalIterators(); - if (null == usesLocalIterators) { - usesLocalIterators = usesLocalIterators(job); - } - ++ protected void setupIterators(JobConf job, Scanner scanner, org.apache.accumulo.core.client.mapreduce.RangeInputSplit split) { List<IteratorSetting> iterators = split.getIterators(); + if (null == iterators) { iterators = getIterators(job); } - Set<Pair<Text,Text>> columns = split.getFetchedColumns(); - if (null == columns) { - columns = getFetchedColumns(job); - } - - try { - log.debug("Creating connector with user: " + principal); - Connector conn = instance.getConnector(principal, token); - log.debug("Creating scanner for table: " + table); - log.debug("Authorizations are: " + authorizations); - if (isOffline) { - String tokenClass = token.getClass().getCanonicalName(); - ByteBuffer tokenBuffer = ByteBuffer.wrap(CredentialHelper.toBytes(token)); - scanner = new OfflineScanner(instance, new TCredentials(principal, tokenClass, tokenBuffer, instance.getInstanceID()), Tables.getTableId(instance, - table), authorizations); - } else { - scanner = conn.createScanner(table, authorizations); - } - if (isIsolated) { - log.info("Creating isolated scanner"); - scanner = new IsolatedScanner(scanner); - } - if (usesLocalIterators) { - log.info("Using local iterators"); - scanner = new ClientSideIteratorScanner(scanner); - } - setupIterators(iterators, scanner); - } catch (Exception e) { - throw new IOException(e); - } - - // setup a scanner within the bounds of this split - for (Pair<Text,Text> c : columns) { - if (c.getSecond() != null) { - log.debug("Fetching column " + c.getFirst() + ":" + c.getSecond()); - scanner.fetchColumn(c.getFirst(), c.getSecond()); - } else { - log.debug("Fetching column family " + c.getFirst()); - scanner.fetchColumnFamily(c.getFirst()); - } - } - - scanner.setRange(split.getRange()); - - numKeysRead = 0; - - // do this last after setting all scanner options - scannerIterator = scanner.iterator(); - } - - @Override - public void close() {} - - @Override - public long getPos() throws IOException { - return numKeysRead; - } - - @Override - public float getProgress() throws IOException { - if (numKeysRead > 0 && currentKey == null) - return 1.0f; - return split.getProgress(currentKey); - } - - protected Key currentKey = null; - - } - - Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobConf job, String tableName, List<Range> ranges) throws TableNotFoundException, AccumuloException, - AccumuloSecurityException { - - Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>(); - - Instance instance = getInstance(job); - Connector conn = instance.getConnector(getPrincipal(job), CredentialHelper.extractToken(getTokenClass(job), getToken(job))); - String tableId = Tables.getTableId(instance, tableName); - - if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) { - Tables.clearCache(instance); - if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) { - throw new AccumuloException("Table is online " + tableName + "(" + tableId + ") cannot scan table in offline mode "); - } - } - - for (Range range : ranges) { - Text startRow; - - if (range.getStartKey() != null) - startRow = range.getStartKey().getRow(); - else - startRow = new Text(); - - Range metadataRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(), true, null, false); - Scanner scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS); - Constants.METADATA_PREV_ROW_COLUMN.fetch(scanner); - scanner.fetchColumnFamily(Constants.METADATA_LAST_LOCATION_COLUMN_FAMILY); - scanner.fetchColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY); - scanner.fetchColumnFamily(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY); - scanner.setRange(metadataRange); - - RowIterator rowIter = new RowIterator(scanner); - - KeyExtent lastExtent = null; - - while (rowIter.hasNext()) { - Iterator<Entry<Key,Value>> row = rowIter.next(); - String last = ""; - KeyExtent extent = null; - String location = null; - - while (row.hasNext()) { - Entry<Key,Value> entry = row.next(); - Key key = entry.getKey(); - - if (key.getColumnFamily().equals(Constants.METADATA_LAST_LOCATION_COLUMN_FAMILY)) { - last = entry.getValue().toString(); - } - - if (key.getColumnFamily().equals(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY) - || key.getColumnFamily().equals(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY)) { - location = entry.getValue().toString(); - } - - if (Constants.METADATA_PREV_ROW_COLUMN.hasColumns(key)) { - extent = new KeyExtent(key.getRow(), entry.getValue()); - } - - } - - if (location != null) - return null; - - if (!extent.getTableId().toString().equals(tableId)) { - throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent); - } - - if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) { - throw new AccumuloException(" " + lastExtent + " is not previous extent " + extent); - } - - Map<KeyExtent,List<Range>> tabletRanges = binnedRanges.get(last); - if (tabletRanges == null) { - tabletRanges = new HashMap<KeyExtent,List<Range>>(); - binnedRanges.put(last, tabletRanges); - } - - List<Range> rangeList = tabletRanges.get(extent); - if (rangeList == null) { - rangeList = new ArrayList<Range>(); - tabletRanges.put(extent, rangeList); - } - - rangeList.add(range); - - if (extent.getEndRow() == null || range.afterEndKey(new Key(extent.getEndRow()).followingKey(PartialKey.ROW))) { - break; - } - - lastExtent = extent; - } - - } - - return binnedRanges; - } - - /** - * Read the metadata table to get tablets and match up ranges to them. - */ - @Override - public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - Level logLevel = getLogLevel(job); - log.setLevel(logLevel); - - validateOptions(job); - - String tableName = getInputTableName(job); - boolean autoAdjust = getAutoAdjustRanges(job); - List<Range> ranges = autoAdjust ? Range.mergeOverlapping(getRanges(job)) : getRanges(job); - Instance instance = getInstance(job); - boolean offline = isOfflineScan(job); - boolean isolated = isIsolated(job); - boolean localIterators = usesLocalIterators(job); - boolean mockInstance = (null != instance && MockInstance.class.equals(instance.getClass())); - Set<Pair<Text,Text>> fetchedColumns = getFetchedColumns(job); - Authorizations auths = getScanAuthorizations(job); - String principal = getPrincipal(job); - String tokenClass = getTokenClass(job); - byte[] tokenBytes = getToken(job); - - AuthenticationToken token; - try { - token = CredentialHelper.extractToken(tokenClass, tokenBytes); - } catch (AccumuloSecurityException e) { - throw new IOException(e); - } - - List<IteratorSetting> iterators = getIterators(job); - - if (ranges.isEmpty()) { - ranges = new ArrayList<Range>(1); - ranges.add(new Range()); - } - - // get the metadata information for these ranges - Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>(); - TabletLocator tl; - try { - if (isOfflineScan(job)) { - binnedRanges = binOfflineTable(job, tableName, ranges); - while (binnedRanges == null) { - // Some tablets were still online, try again - UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms - binnedRanges = binOfflineTable(job, tableName, ranges); - } - } else { - String tableId = null; - tl = getTabletLocator(job); - // its possible that the cache could contain complete, but old information about a tables tablets... so clear it - tl.invalidateCache(); - while (!tl.binRanges(ranges, binnedRanges, new TCredentials(principal, tokenClass, ByteBuffer.wrap(tokenBytes), instance.getInstanceID())).isEmpty()) { - if (!(instance instanceof MockInstance)) { - if (tableId == null) - tableId = Tables.getTableId(instance, tableName); - if (!Tables.exists(instance, tableId)) - throw new TableDeletedException(tableId); - if (Tables.getTableState(instance, tableId) == TableState.OFFLINE) - throw new TableOfflineException(instance, tableId); - } - binnedRanges.clear(); - log.warn("Unable to locate bins for specified ranges. Retrying."); - UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms - tl.invalidateCache(); - } - } - } catch (Exception e) { - throw new IOException(e); - } - - ArrayList<org.apache.accumulo.core.client.mapred.RangeInputSplit> splits = new ArrayList<org.apache.accumulo.core.client.mapred.RangeInputSplit>( - ranges.size()); - HashMap<Range,ArrayList<String>> splitsToAdd = null; - - if (!autoAdjust) - splitsToAdd = new HashMap<Range,ArrayList<String>>(); - - HashMap<String,String> hostNameCache = new HashMap<String,String>(); - - for (Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) { - String ip = tserverBin.getKey().split(":", 2)[0]; - String location = hostNameCache.get(ip); - if (location == null) { - InetAddress inetAddress = InetAddress.getByName(ip); - location = inetAddress.getHostName(); - hostNameCache.put(ip, location); - } - - for (Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) { - Range ke = extentRanges.getKey().toDataRange(); - for (Range r : extentRanges.getValue()) { - if (autoAdjust) { - // divide ranges into smaller ranges, based on the tablets - splits.add(new org.apache.accumulo.core.client.mapred.RangeInputSplit(ke.clip(r), new String[] {location})); - } else { - // don't divide ranges - ArrayList<String> locations = splitsToAdd.get(r); - if (locations == null) - locations = new ArrayList<String>(1); - locations.add(location); - splitsToAdd.put(r, locations); - } - } - } - } - - if (!autoAdjust) - for (Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet()) - splits.add(new org.apache.accumulo.core.client.mapred.RangeInputSplit(entry.getKey(), entry.getValue().toArray(new String[0]))); - - for (org.apache.accumulo.core.client.mapred.RangeInputSplit split : splits) { - split.setTable(tableName); - split.setOffline(offline); - split.setIsolatedScan(isolated); - split.setUsesLocalIterators(localIterators); - split.setMockInstance(mockInstance); - split.setFetchedColumns(fetchedColumns); - split.setPrincipal(principal); - split.setToken(token); - split.setInstanceName(instance.getInstanceName()); - split.setZooKeepers(instance.getZooKeepers()); - split.setAuths(auths); - split.setIterators(iterators); - split.setLogLevel(logLevel); + for (IteratorSetting iterator : iterators) + scanner.addScanIterator(iterator); } - - return splits.toArray(new InputSplit[splits.size()]); } + /** + * @see org.apache.accumulo.core.client.mapred.RangeInputSplit + */ + @Deprecated + public static class RangeInputSplit extends org.apache.accumulo.core.client.mapred.RangeInputSplit { + public RangeInputSplit() { + super(); + } + - public RangeInputSplit(Range range, String[] locations) { - super(range, locations); ++ public RangeInputSplit(RangeInputSplit other) throws IOException { ++ super(other); ++ } ++ ++ public RangeInputSplit(String table, String tableId, Range range, String[] locations) { ++ super(table, tableId, range, locations); + } + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/86cafd97/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java index 7a82652,0220339..35400fc --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java @@@ -53,8 -52,8 +53,8 @@@ public class AccumuloInputFormat extend log.setLevel(getLogLevel(context)); // Override the log level from the configuration as if the RangeInputSplit has one it's the more correct one to use. -- if (split instanceof RangeInputSplit) { -- RangeInputSplit risplit = (RangeInputSplit) split; ++ if (split instanceof org.apache.accumulo.core.client.mapreduce.RangeInputSplit) { ++ org.apache.accumulo.core.client.mapreduce.RangeInputSplit risplit = (org.apache.accumulo.core.client.mapreduce.RangeInputSplit) split; Level level = risplit.getLogLevel(); if (null != level) { log.setLevel(level); http://git-wip-us.apache.org/repos/asf/accumulo/blob/86cafd97/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java index ad26dc6,5066620..d27ec61 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java @@@ -301,47 -510,827 +301,66 @@@ public abstract class InputFormatBase<K * @param context * the Hadoop context for the configured job * @return an Accumulo tablet locator - * @throws TableNotFoundException + * @throws org.apache.accumulo.core.client.TableNotFoundException * if the table name set on the configuration doesn't exist * @since 1.5.0 + * @deprecated since 1.6.0 */ + @Deprecated protected static TabletLocator getTabletLocator(JobContext context) throws TableNotFoundException { - return InputConfigurator.getTabletLocator(CLASS, getConfiguration(context)); - } - - // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job) - /** - * Check whether a configuration is fully configured to be used with an Accumulo {@link org.apache.hadoop.mapreduce.InputFormat}. - * - * @param context - * the Hadoop context for the configured job - * @throws IOException - * if the context is improperly configured - * @since 1.5.0 - */ - protected static void validateOptions(JobContext context) throws IOException { - InputConfigurator.validateOptions(CLASS, getConfiguration(context)); + return InputConfigurator.getTabletLocator(CLASS, getConfiguration(context), InputConfigurator.getInputTableName(CLASS, getConfiguration(context))); } - /** - * An abstract base class to be used to create {@link RecordReader} instances that convert from Accumulo {@link Key}/{@link Value} pairs to the user's K/V - * types. - * - * Subclasses must implement {@link #nextKeyValue()} and use it to update the following variables: - * <ul> - * <li>K {@link #currentK}</li> - * <li>V {@link #currentV}</li> - * <li>Key {@link #currentKey} (used for progress reporting)</li> - * <li>int {@link #numKeysRead} (used for progress reporting)</li> - * </ul> - */ - protected abstract static class RecordReaderBase<K,V> extends RecordReader<K,V> { - protected long numKeysRead; - protected Iterator<Entry<Key,Value>> scannerIterator; - protected org.apache.accumulo.core.client.mapreduce.RangeInputSplit split; + protected abstract static class RecordReaderBase<K,V> extends AbstractRecordReader<K,V> { /** - * Apply the configured iterators from the configuration to the scanner. + * Apply the configured iterators from the configuration to the scanner for the specified table name * + * @param context + * the Hadoop context for the configured job * @param scanner * the scanner to configure + * @since 1.6.0 */ - protected void setupIterators(List<IteratorSetting> iterators, Scanner scanner) { - for (IteratorSetting iterator : iterators) { - scanner.addScanIterator(iterator); - } + @Override - protected void setupIterators(TaskAttemptContext context, Scanner scanner, String tableName, RangeInputSplit split) { ++ protected void setupIterators(TaskAttemptContext context, Scanner scanner, String tableName, org.apache.accumulo.core.client.mapreduce.RangeInputSplit split) { + setupIterators(context, scanner, split); } /** - * Initialize a scanner over the given input split using this task attempt configuration. + * Apply the configured iterators from the configuration to the scanner. + * + * @param context + * the Hadoop context for the configured job + * @param scanner + * the scanner to configure */ - protected void setupIterators(TaskAttemptContext context, Scanner scanner, RangeInputSplit split) { - @Override - public void initialize(InputSplit inSplit, TaskAttemptContext attempt) throws IOException { - Scanner scanner; - split = (org.apache.accumulo.core.client.mapreduce.RangeInputSplit) inSplit; - log.debug("Initializing input split: " + split.getRange()); - - Instance instance = split.getInstance(); - if (null == instance) { - instance = getInstance(attempt); - } - - String principal = split.getPrincipal(); - if (null == principal) { - principal = getPrincipal(attempt); - } - - AuthenticationToken token = split.getToken(); - if (null == token) { - String tokenClass = getTokenClass(attempt); - byte[] tokenBytes = getToken(attempt); - try { - token = CredentialHelper.extractToken(tokenClass, tokenBytes); - } catch (AccumuloSecurityException e) { - throw new IOException(e); - } - } - - Authorizations authorizations = split.getAuths(); - if (null == authorizations) { - authorizations = getScanAuthorizations(attempt); - } - - String table = split.getTable(); - if (null == table) { - table = getInputTableName(attempt); - } - - Boolean isOffline = split.isOffline(); - if (null == isOffline) { - isOffline = isOfflineScan(attempt); - } - - Boolean isIsolated = split.isIsolatedScan(); - if (null == isIsolated) { - isIsolated = isIsolated(attempt); - } - - Boolean usesLocalIterators = split.usesLocalIterators(); - if (null == usesLocalIterators) { - usesLocalIterators = usesLocalIterators(attempt); - } - ++ protected void setupIterators(TaskAttemptContext context, Scanner scanner, org.apache.accumulo.core.client.mapreduce.RangeInputSplit split) { List<IteratorSetting> iterators = split.getIterators(); if (null == iterators) { - iterators = getIterators(attempt); + iterators = getIterators(context); } - - Set<Pair<Text,Text>> columns = split.getFetchedColumns(); - if (null == columns) { - columns = getFetchedColumns(attempt); - } - - try { - log.debug("Creating connector with user: " + principal); - Connector conn = instance.getConnector(principal, token); - log.debug("Creating scanner for table: " + table); - log.debug("Authorizations are: " + authorizations); - if (isOffline) { - String tokenClass = token.getClass().getCanonicalName(); - ByteBuffer tokenBuffer = ByteBuffer.wrap(CredentialHelper.toBytes(token)); - scanner = new OfflineScanner(instance, new TCredentials(principal, tokenClass, tokenBuffer, instance.getInstanceID()), Tables.getTableId(instance, - table), authorizations); - } else { - scanner = conn.createScanner(table, authorizations); - } - if (isIsolated) { - log.info("Creating isolated scanner"); - scanner = new IsolatedScanner(scanner); - } - if (usesLocalIterators) { - log.info("Using local iterators"); - scanner = new ClientSideIteratorScanner(scanner); - } - setupIterators(iterators, scanner); - } catch (Exception e) { - throw new IOException(e); - } - - // setup a scanner within the bounds of this split - for (Pair<Text,Text> c : columns) { - if (c.getSecond() != null) { - log.debug("Fetching column " + c.getFirst() + ":" + c.getSecond()); - scanner.fetchColumn(c.getFirst(), c.getSecond()); - } else { - log.debug("Fetching column family " + c.getFirst()); - scanner.fetchColumnFamily(c.getFirst()); - } - } - - scanner.setRange(split.getRange()); - - numKeysRead = 0; - - // do this last after setting all scanner options - scannerIterator = scanner.iterator(); - } - - @Override - public void close() {} - - @Override - public float getProgress() throws IOException { - if (numKeysRead > 0 && currentKey == null) - return 1.0f; - return split.getProgress(currentKey); - } - - protected K currentK = null; - protected V currentV = null; - protected Key currentKey = null; - protected Value currentValue = null; - - @Override - public K getCurrentKey() throws IOException, InterruptedException { - return currentK; - } - - @Override - public V getCurrentValue() throws IOException, InterruptedException { - return currentV; - } - } - - Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobContext context, String tableName, List<Range> ranges) throws TableNotFoundException, - AccumuloException, AccumuloSecurityException { - - Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>(); - - Instance instance = getInstance(context); - Connector conn = instance.getConnector(getPrincipal(context), CredentialHelper.extractToken(getTokenClass(context), getToken(context))); - String tableId = Tables.getTableId(instance, tableName); - - if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) { - Tables.clearCache(instance); - if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) { - throw new AccumuloException("Table is online " + tableName + "(" + tableId + ") cannot scan table in offline mode "); - } - } - - for (Range range : ranges) { - Text startRow; - - if (range.getStartKey() != null) - startRow = range.getStartKey().getRow(); - else - startRow = new Text(); - - Range metadataRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(), true, null, false); - Scanner scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS); - Constants.METADATA_PREV_ROW_COLUMN.fetch(scanner); - scanner.fetchColumnFamily(Constants.METADATA_LAST_LOCATION_COLUMN_FAMILY); - scanner.fetchColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY); - scanner.fetchColumnFamily(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY); - scanner.setRange(metadataRange); - - RowIterator rowIter = new RowIterator(scanner); - - KeyExtent lastExtent = null; - - while (rowIter.hasNext()) { - Iterator<Entry<Key,Value>> row = rowIter.next(); - String last = ""; - KeyExtent extent = null; - String location = null; - - while (row.hasNext()) { - Entry<Key,Value> entry = row.next(); - Key key = entry.getKey(); - - if (key.getColumnFamily().equals(Constants.METADATA_LAST_LOCATION_COLUMN_FAMILY)) { - last = entry.getValue().toString(); - } - - if (key.getColumnFamily().equals(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY) - || key.getColumnFamily().equals(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY)) { - location = entry.getValue().toString(); - } - - if (Constants.METADATA_PREV_ROW_COLUMN.hasColumns(key)) { - extent = new KeyExtent(key.getRow(), entry.getValue()); - } - - } - - if (location != null) - return null; - - if (!extent.getTableId().toString().equals(tableId)) { - throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent); - } - - if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) { - throw new AccumuloException(" " + lastExtent + " is not previous extent " + extent); - } - - Map<KeyExtent,List<Range>> tabletRanges = binnedRanges.get(last); - if (tabletRanges == null) { - tabletRanges = new HashMap<KeyExtent,List<Range>>(); - binnedRanges.put(last, tabletRanges); - } - - List<Range> rangeList = tabletRanges.get(extent); - if (rangeList == null) { - rangeList = new ArrayList<Range>(); - tabletRanges.put(extent, rangeList); - } - - rangeList.add(range); - - if (extent.getEndRow() == null || range.afterEndKey(new Key(extent.getEndRow()).followingKey(PartialKey.ROW))) { - break; - } - - lastExtent = extent; - } - - } - - return binnedRanges; - } - - /** - * Read the metadata table to get tablets and match up ranges to them. - */ - @Override - public List<InputSplit> getSplits(JobContext context) throws IOException { - Level logLevel = getLogLevel(context); - log.setLevel(logLevel); - - validateOptions(context); - - String tableName = getInputTableName(context); - boolean autoAdjust = getAutoAdjustRanges(context); - List<Range> ranges = autoAdjust ? Range.mergeOverlapping(getRanges(context)) : getRanges(context); - Instance instance = getInstance(context); - boolean offline = isOfflineScan(context); - boolean isolated = isIsolated(context); - boolean localIterators = usesLocalIterators(context); - boolean mockInstance = (null != instance && MockInstance.class.equals(instance.getClass())); - Set<Pair<Text,Text>> fetchedColumns = getFetchedColumns(context); - Authorizations auths = getScanAuthorizations(context); - String principal = getPrincipal(context); - String tokenClass = getTokenClass(context); - byte[] tokenBytes = getToken(context); - - AuthenticationToken token; - try { - token = CredentialHelper.extractToken(tokenClass, tokenBytes); - } catch (AccumuloSecurityException e) { - throw new IOException(e); - } - - List<IteratorSetting> iterators = getIterators(context); - - if (ranges.isEmpty()) { - ranges = new ArrayList<Range>(1); - ranges.add(new Range()); - } - - // get the metadata information for these ranges - Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>(); - TabletLocator tl; - try { - if (isOfflineScan(context)) { - binnedRanges = binOfflineTable(context, tableName, ranges); - while (binnedRanges == null) { - // Some tablets were still online, try again - UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms - binnedRanges = binOfflineTable(context, tableName, ranges); - } - } else { - String tableId = null; - tl = getTabletLocator(context); - // its possible that the cache could contain complete, but old information about a tables tablets... so clear it - tl.invalidateCache(); - while (!tl.binRanges(ranges, binnedRanges, new TCredentials(principal, tokenClass, ByteBuffer.wrap(tokenBytes), instance.getInstanceID())).isEmpty()) { - if (!(instance instanceof MockInstance)) { - if (tableId == null) - tableId = Tables.getTableId(instance, tableName); - if (!Tables.exists(instance, tableId)) - throw new TableDeletedException(tableId); - if (Tables.getTableState(instance, tableId) == TableState.OFFLINE) - throw new TableOfflineException(instance, tableId); - } - binnedRanges.clear(); - log.warn("Unable to locate bins for specified ranges. Retrying."); - UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms - tl.invalidateCache(); - } - } - } catch (Exception e) { - throw new IOException(e); - } - - ArrayList<InputSplit> splits = new ArrayList<InputSplit>(ranges.size()); - HashMap<Range,ArrayList<String>> splitsToAdd = null; - - if (!autoAdjust) - splitsToAdd = new HashMap<Range,ArrayList<String>>(); - - HashMap<String,String> hostNameCache = new HashMap<String,String>(); - - for (Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) { - String ip = tserverBin.getKey().split(":", 2)[0]; - String location = hostNameCache.get(ip); - if (location == null) { - InetAddress inetAddress = InetAddress.getByName(ip); - location = inetAddress.getHostName(); - hostNameCache.put(ip, location); - } - - for (Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) { - Range ke = extentRanges.getKey().toDataRange(); - for (Range r : extentRanges.getValue()) { - if (autoAdjust) { - // divide ranges into smaller ranges, based on the tablets - splits.add(new org.apache.accumulo.core.client.mapreduce.RangeInputSplit(ke.clip(r), new String[] {location})); - } else { - // don't divide ranges - ArrayList<String> locations = splitsToAdd.get(r); - if (locations == null) - locations = new ArrayList<String>(1); - locations.add(location); - splitsToAdd.put(r, locations); - } - } - } - } - - if (!autoAdjust) - for (Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet()) - splits.add(new org.apache.accumulo.core.client.mapreduce.RangeInputSplit(entry.getKey(), entry.getValue().toArray(new String[0]))); - - for (InputSplit inputSplit : splits) { - org.apache.accumulo.core.client.mapreduce.RangeInputSplit split = (org.apache.accumulo.core.client.mapreduce.RangeInputSplit) inputSplit; - - split.setTable(tableName); - split.setOffline(offline); - split.setIsolatedScan(isolated); - split.setUsesLocalIterators(localIterators); - split.setMockInstance(mockInstance); - split.setFetchedColumns(fetchedColumns); - split.setPrincipal(principal); - split.setToken(token); - split.setInstanceName(instance.getInstanceName()); - split.setZooKeepers(instance.getZooKeepers()); - split.setAuths(auths); - split.setIterators(iterators); - split.setLogLevel(logLevel); - } - - return splits; - } - - // ---------------------------------------------------------------------------------------------------- - // Everything below this line is deprecated and should go away in future versions - // ---------------------------------------------------------------------------------------------------- - - /** - * @deprecated since 1.5.0; Use {@link #setScanIsolation(Job, boolean)} instead. - */ - @Deprecated - public static void setIsolated(Configuration conf, boolean enable) { - InputConfigurator.setScanIsolation(CLASS, conf, enable); - } - - /** - * @deprecated since 1.5.0; Use {@link #setLocalIterators(Job, boolean)} instead. - */ - @Deprecated - public static void setLocalIterators(Configuration conf, boolean enable) { - InputConfigurator.setLocalIterators(CLASS, conf, enable); - } - - /** - * @deprecated since 1.5.0; Use {@link #setConnectorInfo(Job, String, AuthenticationToken)}, {@link #setInputTableName(Job, String)}, and - * {@link #setScanAuthorizations(Job, Authorizations)} instead. - */ - @Deprecated - public static void setInputInfo(Configuration conf, String user, byte[] passwd, String table, Authorizations auths) { - try { - InputConfigurator.setConnectorInfo(CLASS, conf, user, new PasswordToken(passwd)); - } catch (AccumuloSecurityException e) { - throw new RuntimeException(e); - } - InputConfigurator.setInputTableName(CLASS, conf, table); - InputConfigurator.setScanAuthorizations(CLASS, conf, auths); - } - - /** - * @deprecated since 1.5.0; Use {@link #setZooKeeperInstance(Job, String, String)} instead. - */ - @Deprecated - public static void setZooKeeperInstance(Configuration conf, String instanceName, String zooKeepers) { - InputConfigurator.setZooKeeperInstance(CLASS, conf, instanceName, zooKeepers); - } - - /** - * @deprecated since 1.5.0; Use {@link #setMockInstance(Job, String)} instead. - */ - @Deprecated - public static void setMockInstance(Configuration conf, String instanceName) { - InputConfigurator.setMockInstance(CLASS, conf, instanceName); - } - - /** - * @deprecated since 1.5.0; Use {@link #setRanges(Job, Collection)} instead. - */ - @Deprecated - public static void setRanges(Configuration conf, Collection<Range> ranges) { - InputConfigurator.setRanges(CLASS, conf, ranges); - } - - /** - * @deprecated since 1.5.0; Use {@link #setAutoAdjustRanges(Job, boolean)} instead. - */ - @Deprecated - public static void disableAutoAdjustRanges(Configuration conf) { - InputConfigurator.setAutoAdjustRanges(CLASS, conf, false); - } - - /** - * @deprecated since 1.5.0; Use {@link #addIterator(Job, IteratorSetting)} to add the {@link VersioningIterator} instead. - */ - @Deprecated - public static void setMaxVersions(Configuration conf, int maxVersions) throws IOException { - IteratorSetting vers = new IteratorSetting(1, "vers", VersioningIterator.class); - try { - VersioningIterator.setMaxVersions(vers, maxVersions); - } catch (IllegalArgumentException e) { - throw new IOException(e); - } - InputConfigurator.addIterator(CLASS, conf, vers); - } - - /** - * @deprecated since 1.5.0; Use {@link #setOfflineTableScan(Job, boolean)} instead. - */ - @Deprecated - public static void setScanOffline(Configuration conf, boolean scanOff) { - InputConfigurator.setOfflineTableScan(CLASS, conf, scanOff); - } - - /** - * @deprecated since 1.5.0; Use {@link #fetchColumns(Job, Collection)} instead. - */ - @Deprecated - public static void fetchColumns(Configuration conf, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) { - InputConfigurator.fetchColumns(CLASS, conf, columnFamilyColumnQualifierPairs); - } - - /** - * @deprecated since 1.5.0; Use {@link #setLogLevel(Job, Level)} instead. - */ - @Deprecated - public static void setLogLevel(Configuration conf, Level level) { - InputConfigurator.setLogLevel(CLASS, conf, level); - } - - /** - * @deprecated since 1.5.0; Use {@link #addIterator(Job, IteratorSetting)} instead. - */ - @Deprecated - public static void addIterator(Configuration conf, IteratorSetting cfg) { - InputConfigurator.addIterator(CLASS, conf, cfg); - } - - /** - * @deprecated since 1.5.0; Use {@link #isIsolated(JobContext)} instead. - */ - @Deprecated - protected static boolean isIsolated(Configuration conf) { - return InputConfigurator.isIsolated(CLASS, conf); - } - - /** - * @deprecated since 1.5.0; Use {@link #usesLocalIterators(JobContext)} instead. - */ - @Deprecated - protected static boolean usesLocalIterators(Configuration conf) { - return InputConfigurator.usesLocalIterators(CLASS, conf); - } - - /** - * @deprecated since 1.5.0; Use {@link #getPrincipal(JobContext)} instead. - */ - @Deprecated - protected static String getPrincipal(Configuration conf) { - return InputConfigurator.getPrincipal(CLASS, conf); - } - - /** - * @deprecated since 1.5.0; Use {@link #getToken(JobContext)} instead. - */ - @Deprecated - protected static byte[] getToken(Configuration conf) { - return InputConfigurator.getToken(CLASS, conf); - } - - /** - * @deprecated since 1.5.0; Use {@link #getInputTableName(JobContext)} instead. - */ - @Deprecated - protected static String getTablename(Configuration conf) { - return InputConfigurator.getInputTableName(CLASS, conf); - } - - /** - * @deprecated since 1.5.0; Use {@link #getScanAuthorizations(JobContext)} instead. - */ - @Deprecated - protected static Authorizations getAuthorizations(Configuration conf) { - return InputConfigurator.getScanAuthorizations(CLASS, conf); - } - - /** - * @deprecated since 1.5.0; Use {@link #getInstance(JobContext)} instead. - */ - @Deprecated - protected static Instance getInstance(Configuration conf) { - return InputConfigurator.getInstance(CLASS, conf); - } - - /** - * @deprecated since 1.5.0; Use {@link #getTabletLocator(JobContext)} instead. - */ - @Deprecated - protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException { - return InputConfigurator.getTabletLocator(CLASS, conf); - } - - /** - * @deprecated since 1.5.0; Use {@link #getRanges(JobContext)} instead. - */ - @Deprecated - protected static List<Range> getRanges(Configuration conf) throws IOException { - return InputConfigurator.getRanges(CLASS, conf); - } - - /** - * @deprecated since 1.5.0; Use {@link #getFetchedColumns(JobContext)} instead. - */ - @Deprecated - protected static Set<Pair<Text,Text>> getFetchedColumns(Configuration conf) { - return InputConfigurator.getFetchedColumns(CLASS, conf); - } - - /** - * @deprecated since 1.5.0; Use {@link #getAutoAdjustRanges(JobContext)} instead. - */ - @Deprecated - protected static boolean getAutoAdjustRanges(Configuration conf) { - return InputConfigurator.getAutoAdjustRanges(CLASS, conf); - } - - /** - * @deprecated since 1.5.0; Use {@link #getLogLevel(JobContext)} instead. - */ - @Deprecated - protected static Level getLogLevel(Configuration conf) { - return InputConfigurator.getLogLevel(CLASS, conf); - } - - /** - * @deprecated since 1.5.0; Use {@link #validateOptions(JobContext)} instead. - */ - @Deprecated - protected static void validateOptions(Configuration conf) throws IOException { - InputConfigurator.validateOptions(CLASS, conf); - } - - /** - * @deprecated since 1.5.0; Use {@link #addIterator(Job, IteratorSetting)} to add the {@link VersioningIterator} instead. - */ - @Deprecated - protected static int getMaxVersions(Configuration conf) { - // This is so convoluted, because the only reason to get the number of maxVersions is to construct the same type of IteratorSetting object we have to - // deconstruct to get at this option in the first place, but to preserve correct behavior, this appears necessary. - List<IteratorSetting> iteratorSettings = InputConfigurator.getIterators(CLASS, conf); - for (IteratorSetting setting : iteratorSettings) { - if ("vers".equals(setting.getName()) && 1 == setting.getPriority() && VersioningIterator.class.getName().equals(setting.getIteratorClass())) { - if (setting.getOptions().containsKey("maxVersions")) - return Integer.parseInt(setting.getOptions().get("maxVersions")); - else - return -1; - } - } - return -1; - } - - /** - * @deprecated since 1.5.0; Use {@link #isOfflineScan(JobContext)} instead. - */ - @Deprecated - protected static boolean isOfflineScan(Configuration conf) { - return InputConfigurator.isOfflineScan(CLASS, conf); - } - - /** - * @deprecated since 1.5.0; Use {@link #getIterators(JobContext)} instead. - */ - @Deprecated - protected static List<AccumuloIterator> getIterators(Configuration conf) { - List<IteratorSetting> iteratorSettings = InputConfigurator.getIterators(CLASS, conf); - List<AccumuloIterator> deprecatedIterators = new ArrayList<AccumuloIterator>(iteratorSettings.size()); - for (IteratorSetting setting : iteratorSettings) { - AccumuloIterator deprecatedIter = new AccumuloIterator(setting.getPriority() + AccumuloIterator.FIELD_SEP + setting.getIteratorClass() - + AccumuloIterator.FIELD_SEP + setting.getName()); - deprecatedIterators.add(deprecatedIter); - } - return deprecatedIterators; - } - - /** - * @deprecated since 1.5.0; Use {@link #getIterators(JobContext)} instead. - */ - @Deprecated - protected static List<AccumuloIteratorOption> getIteratorOptions(Configuration conf) { - List<IteratorSetting> iteratorSettings = InputConfigurator.getIterators(CLASS, conf); - List<AccumuloIteratorOption> deprecatedIteratorOptions = new ArrayList<AccumuloIteratorOption>(iteratorSettings.size()); - for (IteratorSetting setting : iteratorSettings) { - for (Entry<String,String> opt : setting.getOptions().entrySet()) { - String deprecatedOption; - try { - deprecatedOption = setting.getName() + AccumuloIteratorOption.FIELD_SEP + URLEncoder.encode(opt.getKey(), "UTF-8") - + AccumuloIteratorOption.FIELD_SEP + URLEncoder.encode(opt.getValue(), "UTF-8"); - } catch (UnsupportedEncodingException e) { - throw new RuntimeException(e); - } - deprecatedIteratorOptions.add(new AccumuloIteratorOption(deprecatedOption)); - } - } - return deprecatedIteratorOptions; - } - - /** - * @deprecated since 1.5.0; Use {@link IteratorSetting} instead. - */ - @Deprecated - static class AccumuloIterator { - - private static final String FIELD_SEP = ":"; - - private int priority; - private String iteratorClass; - private String iteratorName; - - public AccumuloIterator(int priority, String iteratorClass, String iteratorName) { - this.priority = priority; - this.iteratorClass = iteratorClass; - this.iteratorName = iteratorName; - } - - // Parses out a setting given an string supplied from an earlier toString() call - public AccumuloIterator(String iteratorSetting) { - // Parse the string to expand the iterator - StringTokenizer tokenizer = new StringTokenizer(iteratorSetting, FIELD_SEP); - priority = Integer.parseInt(tokenizer.nextToken()); - iteratorClass = tokenizer.nextToken(); - iteratorName = tokenizer.nextToken(); - } - - public int getPriority() { - return priority; - } - - public String getIteratorClass() { - return iteratorClass; - } - - public String getIteratorName() { - return iteratorName; - } - - @Override - public String toString() { - return priority + FIELD_SEP + iteratorClass + FIELD_SEP + iteratorName; - } - - } - - /** - * @deprecated since 1.5.0; Use {@link IteratorSetting} instead. - */ - @Deprecated - static class AccumuloIteratorOption { - private static final String FIELD_SEP = ":"; - - private String iteratorName; - private String key; - private String value; - - public AccumuloIteratorOption(String iteratorName, String key, String value) { - this.iteratorName = iteratorName; - this.key = key; - this.value = value; - } - - // Parses out an option given a string supplied from an earlier toString() call - public AccumuloIteratorOption(String iteratorOption) { - StringTokenizer tokenizer = new StringTokenizer(iteratorOption, FIELD_SEP); - this.iteratorName = tokenizer.nextToken(); - try { - this.key = URLDecoder.decode(tokenizer.nextToken(), "UTF-8"); - this.value = URLDecoder.decode(tokenizer.nextToken(), "UTF-8"); - } catch (UnsupportedEncodingException e) { - throw new RuntimeException(e); - } - } - - public String getIteratorName() { - return iteratorName; - } - - public String getKey() { - return key; - } - - public String getValue() { - return value; - } - - @Override - public String toString() { - try { - return iteratorName + FIELD_SEP + URLEncoder.encode(key, "UTF-8") + FIELD_SEP + URLEncoder.encode(value, "UTF-8"); - } catch (UnsupportedEncodingException e) { - throw new RuntimeException(e); - } - } - - } - - // use reflection to pull the Configuration out of the JobContext for Hadoop 1 and Hadoop 2 compatibility - static Configuration getConfiguration(JobContext context) { - try { - Class<?> c = InputFormatBase.class.getClassLoader().loadClass("org.apache.hadoop.mapreduce.JobContext"); - Method m = c.getMethod("getConfiguration"); - Object o = m.invoke(context, new Object[0]); - return (Configuration) o; - } catch (Exception e) { - throw new RuntimeException(e); + for (IteratorSetting iterator : iterators) + scanner.addScanIterator(iterator); } } + + /** + * @see org.apache.accumulo.core.client.mapreduce.RangeInputSplit + */ + @Deprecated + public static class RangeInputSplit extends org.apache.accumulo.core.client.mapreduce.RangeInputSplit { + + public RangeInputSplit() { + super(); + } + - public RangeInputSplit(Range range, String[] locations) { - super(range, locations); ++ public RangeInputSplit(RangeInputSplit other) throws IOException { ++ super(other); ++ } ++ ++ protected RangeInputSplit(String table, String tableId, Range range, String[] locations) { ++ super(table, tableId, range, locations); + } + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/86cafd97/core/src/test/java/org/apache/accumulo/core/client/mapreduce/EmptySplitsAccumuloInputFormat.java ---------------------------------------------------------------------- diff --cc core/src/test/java/org/apache/accumulo/core/client/mapreduce/EmptySplitsAccumuloInputFormat.java index 8b2b020,440dbf7..7d3dde6 --- a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/EmptySplitsAccumuloInputFormat.java +++ b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/EmptySplitsAccumuloInputFormat.java @@@ -30,16 -30,8 +30,16 @@@ public class EmptySplitsAccumuloInputFo @Override public List<InputSplit> getSplits(JobContext context) throws IOException { - super.getSplits(context); + List<InputSplit> oldSplits = super.getSplits(context); + List<InputSplit> newSplits = new ArrayList<InputSplit>(oldSplits.size()); - return Arrays.<InputSplit> asList(new RangeInputSplit()); + // Copy only the necessary information + for (InputSplit oldSplit : oldSplits) { - RangeInputSplit newSplit = new RangeInputSplit((RangeInputSplit) oldSplit); ++ org.apache.accumulo.core.client.mapreduce.RangeInputSplit newSplit = new org.apache.accumulo.core.client.mapreduce.RangeInputSplit((org.apache.accumulo.core.client.mapreduce.RangeInputSplit) oldSplit); + newSplits.add(newSplit); + } + + + return newSplits; } }