Merge branch '1.5.1-SNAPSHOT' into 1.6.1-SNAPSHOT Conflicts: core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java core/src/main/java/org/apache/accumulo/core/security/CredentialHelper.java server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/aec43807 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/aec43807 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/aec43807 Branch: refs/heads/master Commit: aec438077c9848ec95642b89755582d6ff1007fa Parents: 7dd3592 3a703d7 Author: Josh Elser <els...@apache.org> Authored: Sat Nov 23 01:56:04 2013 -0500 Committer: Josh Elser <els...@apache.org> Committed: Sat Nov 23 01:56:04 2013 -0500 ---------------------------------------------------------------------- .../core/client/mapred/AbstractInputFormat.java | 154 ++++-- .../core/client/mapred/AccumuloInputFormat.java | 12 + .../core/client/mapred/InputFormatBase.java | 14 +- .../core/client/mapred/RangeInputSplit.java | 40 ++ .../client/mapreduce/AbstractInputFormat.java | 288 +++++------ .../client/mapreduce/AccumuloInputFormat.java | 11 + .../AccumuloMultiTableInputFormat.java | 8 +- .../core/client/mapreduce/InputFormatBase.java | 13 +- .../core/client/mapreduce/RangeInputSplit.java | 486 +++++++++++++++++++ .../mapreduce/lib/util/ConfiguratorBase.java | 8 +- .../mapreduce/lib/util/InputConfigurator.java | 24 +- .../AccumuloMultiTableInputFormatTest.java | 6 +- .../mapreduce/AccumuloInputFormatTest.java | 431 ++++++++++++++++ .../AccumuloMultiTableInputFormatTest.java | 2 +- .../client/mapreduce/RangeInputSplitTest.java | 91 ++++ 15 files changed, 1351 insertions(+), 237 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/aec43807/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java index c0ef0b5,0000000..4e84909 mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java @@@ -1,572 -1,0 +1,646 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.mapred; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.ArrayList; ++import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ClientSideIteratorScanner; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.IsolatedScanner; ++import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableDeletedException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.TableOfflineException; +import org.apache.accumulo.core.client.impl.OfflineScanner; +import org.apache.accumulo.core.client.impl.ScannerImpl; +import org.apache.accumulo.core.client.impl.Tables; +import org.apache.accumulo.core.client.impl.TabletLocator; +import org.apache.accumulo.core.client.mapreduce.InputTableConfig; +import org.apache.accumulo.core.client.mapreduce.lib.util.InputConfigurator; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.master.state.tables.TableState; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.Credentials; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +/** + * An abstract input format to provide shared methods common to all other input format classes. At the very least, any classes inheriting from this class will + * need to define their own {@link RecordReader}. + */ +public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> { + protected static final Class<?> CLASS = AccumuloInputFormat.class; + protected static final Logger log = Logger.getLogger(CLASS); + + /** + * Sets the connector information needed to communicate with Accumulo in this job. + * + * <p> + * <b>WARNING:</b> The serialized token is stored in the configuration and shared with all MapReduce tasks. It is BASE64 encoded to provide a charset safe + * conversion to a string, and is not intended to be secure. + * + * @param job + * the Hadoop job instance to be configured + * @param principal + * a valid Accumulo user name (user must have Table.CREATE permission) + * @param token + * the user's password + * @throws org.apache.accumulo.core.client.AccumuloSecurityException + * @since 1.5.0 + */ + public static void setConnectorInfo(JobConf job, String principal, AuthenticationToken token) throws AccumuloSecurityException { + InputConfigurator.setConnectorInfo(CLASS, job, principal, token); + } + + /** + * Sets the connector information needed to communicate with Accumulo in this job. + * + * <p> + * Stores the password in a file in HDFS and pulls that into the Distributed Cache in an attempt to be more secure than storing it in the Configuration. + * + * @param job + * the Hadoop job instance to be configured + * @param principal + * a valid Accumulo user name (user must have Table.CREATE permission) + * @param tokenFile + * the path to the token file + * @throws AccumuloSecurityException + * @since 1.6.0 + */ + public static void setConnectorInfo(JobConf job, String principal, String tokenFile) throws AccumuloSecurityException { + InputConfigurator.setConnectorInfo(CLASS, job, principal, tokenFile); + } + + /** + * Determines if the connector has been configured. + * + * @param job + * the Hadoop context for the configured job + * @return true if the connector has been configured, false otherwise + * @since 1.5.0 + * @see #setConnectorInfo(JobConf, String, AuthenticationToken) + */ + protected static Boolean isConnectorInfoSet(JobConf job) { + return InputConfigurator.isConnectorInfoSet(CLASS, job); + } + + /** + * Gets the user name from the configuration. + * + * @param job + * the Hadoop context for the configured job + * @return the user name + * @since 1.5.0 + * @see #setConnectorInfo(JobConf, String, AuthenticationToken) + */ + protected static String getPrincipal(JobConf job) { + return InputConfigurator.getPrincipal(CLASS, job); + } + + /** + * Gets the serialized token class from either the configuration or the token file. + * + * @since 1.5.0 + * @deprecated since 1.6.0; Use {@link #getAuthenticationToken(JobConf)} instead. + */ + @Deprecated + protected static String getTokenClass(JobConf job) { + return getAuthenticationToken(job).getClass().getName(); + } + + /** + * Gets the serialized token from either the configuration or the token file. + * + * @since 1.5.0 + * @deprecated since 1.6.0; Use {@link #getAuthenticationToken(JobConf)} instead. + */ + @Deprecated + protected static byte[] getToken(JobConf job) { + return AuthenticationToken.AuthenticationTokenSerializer.serialize(getAuthenticationToken(job)); + } + + /** + * Gets the authenticated token from either the specified token file or directly from the configuration, whichever was used when the job was configured. + * + * @param job + * the Hadoop context for the configured job + * @return the principal's authentication token + * @since 1.6.0 + * @see #setConnectorInfo(JobConf, String, AuthenticationToken) + * @see #setConnectorInfo(JobConf, String, String) + */ + protected static AuthenticationToken getAuthenticationToken(JobConf job) { + return InputConfigurator.getAuthenticationToken(CLASS, job); + } + + /** + * Configures a {@link org.apache.accumulo.core.client.ZooKeeperInstance} for this job. + * + * @param job + * the Hadoop job instance to be configured + * @param instanceName + * the Accumulo instance name + * @param zooKeepers + * a comma-separated list of zookeeper servers + * @since 1.5.0 + * @deprecated since 1.6.0; Use {@link #setZooKeeperInstance(JobConf, ClientConfiguration)} instead. + */ + @Deprecated + public static void setZooKeeperInstance(JobConf job, String instanceName, String zooKeepers) { + InputConfigurator.setZooKeeperInstance(CLASS, job, instanceName, zooKeepers); + } + + /** + * Configures a {@link org.apache.accumulo.core.client.ZooKeeperInstance} for this job. + * + * @param job + * the Hadoop job instance to be configured + * @param clientConfig + * client configuration containing connection options + * @since 1.6.0 + */ + public static void setZooKeeperInstance(JobConf job, ClientConfiguration clientConfig) { + InputConfigurator.setZooKeeperInstance(CLASS, job, clientConfig); + } + + /** + * Configures a {@link org.apache.accumulo.core.client.mock.MockInstance} for this job. + * + * @param job + * the Hadoop job instance to be configured + * @param instanceName + * the Accumulo instance name + * @since 1.5.0 + */ + public static void setMockInstance(JobConf job, String instanceName) { + InputConfigurator.setMockInstance(CLASS, job, instanceName); + } + + /** + * Initializes an Accumulo {@link org.apache.accumulo.core.client.Instance} based on the configuration. + * + * @param job + * the Hadoop context for the configured job + * @return an Accumulo instance + * @since 1.5.0 + * @see #setZooKeeperInstance(JobConf, String, String) + * @see #setMockInstance(JobConf, String) + */ + protected static Instance getInstance(JobConf job) { + return InputConfigurator.getInstance(CLASS, job); + } + + /** + * Sets the log level for this job. + * + * @param job + * the Hadoop job instance to be configured + * @param level + * the logging level + * @since 1.5.0 + */ + public static void setLogLevel(JobConf job, Level level) { + InputConfigurator.setLogLevel(CLASS, job, level); + } + + /** + * Gets the log level from this configuration. + * + * @param job + * the Hadoop context for the configured job + * @return the log level + * @since 1.5.0 + * @see #setLogLevel(JobConf, Level) + */ + protected static Level getLogLevel(JobConf job) { + return InputConfigurator.getLogLevel(CLASS, job); + } + + /** + * Sets the {@link org.apache.accumulo.core.security.Authorizations} used to scan. Must be a subset of the user's authorization. Defaults to the empty set. + * + * @param job + * the Hadoop job instance to be configured + * @param auths + * the user's authorizations + * @since 1.5.0 + */ + public static void setScanAuthorizations(JobConf job, Authorizations auths) { + InputConfigurator.setScanAuthorizations(CLASS, job, auths); + } + + /** + * Gets the authorizations to set for the scans from the configuration. + * + * @param job + * the Hadoop context for the configured job + * @return the Accumulo scan authorizations + * @since 1.5.0 + * @see #setScanAuthorizations(JobConf, Authorizations) + */ + protected static Authorizations getScanAuthorizations(JobConf job) { + return InputConfigurator.getScanAuthorizations(CLASS, job); + } + + /** + * Initializes an Accumulo {@link org.apache.accumulo.core.client.impl.TabletLocator} based on the configuration. + * + * @param job + * the Hadoop context for the configured job + * @return an Accumulo tablet locator + * @throws org.apache.accumulo.core.client.TableNotFoundException + * if the table name set on the configuration doesn't exist + * @since 1.6.0 + */ + protected static TabletLocator getTabletLocator(JobConf job, String tableId) throws TableNotFoundException { + return InputConfigurator.getTabletLocator(CLASS, job, tableId); + } + + // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job) + /** + * Check whether a configuration is fully configured to be used with an Accumulo {@link InputFormat}. + * + * @param job + * the Hadoop context for the configured job + * @throws java.io.IOException + * if the context is improperly configured + * @since 1.5.0 + */ + protected static void validateOptions(JobConf job) throws IOException { + InputConfigurator.validateOptions(CLASS, job); + } + + /** + * Fetches all {@link InputTableConfig}s that have been set on the given Hadoop job. + * + * @param job + * the Hadoop job instance to be configured + * @return the {@link InputTableConfig} objects set on the job + * @since 1.6.0 + */ + public static Map<String,InputTableConfig> getInputTableConfigs(JobConf job) { + return InputConfigurator.getInputTableConfigs(CLASS, job); + } + + /** + * Fetches a {@link InputTableConfig} that has been set on the configuration for a specific table. + * + * <p> + * null is returned in the event that the table doesn't exist. + * + * @param job + * the Hadoop job instance to be configured + * @param tableName + * the table name for which to grab the config object + * @return the {@link InputTableConfig} for the given table + * @since 1.6.0 + */ - public static InputTableConfig getInputTableConfigs(JobConf job, String tableName) { ++ public static InputTableConfig getInputTableConfig(JobConf job, String tableName) { + return InputConfigurator.getInputTableConfig(CLASS, job, tableName); + } + + /** + * An abstract base class to be used to create {@link org.apache.hadoop.mapred.RecordReader} instances that convert from Accumulo + * {@link org.apache.accumulo.core.data.Key}/{@link org.apache.accumulo.core.data.Value} pairs to the user's K/V types. + * + * Subclasses must implement {@link #next(Object, Object)} to update key and value, and also to update the following variables: + * <ul> + * <li>Key {@link #currentKey} (used for progress reporting)</li> + * <li>int {@link #numKeysRead} (used for progress reporting)</li> + * </ul> + */ + protected abstract static class AbstractRecordReader<K,V> implements RecordReader<K,V> { + protected long numKeysRead; + protected Iterator<Map.Entry<Key,Value>> scannerIterator; + protected RangeInputSplit split; + + /** + * Configures the iterators on a scanner for the given table name. + * + * @param job + * the Hadoop job configuration + * @param scanner + * the scanner for which to configure the iterators + * @param tableName + * the table name for which the scanner is configured + * @since 1.6.0 + */ - protected abstract void setupIterators(JobConf job, Scanner scanner, String tableName); ++ protected abstract void setupIterators(JobConf job, Scanner scanner, String tableName, RangeInputSplit split); + + /** + * Initialize a scanner over the given input split using this task attempt configuration. + */ + public void initialize(InputSplit inSplit, JobConf job) throws IOException { + Scanner scanner; + split = (RangeInputSplit) inSplit; + log.debug("Initializing input split: " + split.getRange()); - Instance instance = getInstance(job); - String principal = getPrincipal(job); - AuthenticationToken token = getAuthenticationToken(job); - Authorizations authorizations = getScanAuthorizations(job); ++ ++ Instance instance = split.getInstance(); ++ if (null == instance) { ++ instance = getInstance(job); ++ } ++ ++ String principal = split.getPrincipal(); ++ if (null == principal) { ++ principal = getPrincipal(job); ++ } ++ ++ AuthenticationToken token = split.getToken(); ++ if (null == token) { ++ token = getAuthenticationToken(job); ++ } ++ ++ Authorizations authorizations = split.getAuths(); ++ if (null == authorizations) { ++ authorizations = getScanAuthorizations(job); ++ } ++ ++ String table = split.getTableName(); + + // in case the table name changed, we can still use the previous name for terms of configuration, + // but the scanner will use the table id resolved at job setup time - InputTableConfig tableConfig = getInputTableConfigs(job, split.getTableName()); ++ InputTableConfig tableConfig = getInputTableConfig(job, split.getTableName()); ++ ++ Boolean isOffline = split.isOffline(); ++ if (null == isOffline) { ++ isOffline = tableConfig.isOfflineScan(); ++ } ++ ++ Boolean isIsolated = split.isIsolatedScan(); ++ if (null == isIsolated) { ++ isIsolated = tableConfig.shouldUseIsolatedScanners(); ++ } ++ ++ Boolean usesLocalIterators = split.usesLocalIterators(); ++ if (null == usesLocalIterators) { ++ usesLocalIterators = tableConfig.shouldUseLocalIterators(); ++ } ++ ++ List<IteratorSetting> iterators = split.getIterators(); ++ if (null == iterators) { ++ iterators = tableConfig.getIterators(); ++ } ++ ++ Collection<Pair<Text,Text>> columns = split.getFetchedColumns(); ++ if (null == columns) { ++ columns = tableConfig.getFetchedColumns(); ++ } + + try { + log.debug("Creating connector with user: " + principal); - log.debug("Creating scanner for table: " + split.getTableName()); ++ log.debug("Creating scanner for table: " + table); + log.debug("Authorizations are: " + authorizations); - if (tableConfig.isOfflineScan()) { ++ if (isOffline) { + scanner = new OfflineScanner(instance, new Credentials(principal, token), split.getTableId(), authorizations); + } else if (instance instanceof MockInstance) { + scanner = instance.getConnector(principal, token).createScanner(split.getTableName(), authorizations); + } else { + scanner = new ScannerImpl(instance, new Credentials(principal, token), split.getTableId(), authorizations); + } - if (tableConfig.shouldUseIsolatedScanners()) { ++ if (isIsolated) { + log.info("Creating isolated scanner"); + scanner = new IsolatedScanner(scanner); + } - if (tableConfig.shouldUseLocalIterators()) { ++ if (usesLocalIterators) { + log.info("Using local iterators"); + scanner = new ClientSideIteratorScanner(scanner); + } - setupIterators(job, scanner, split.getTableName()); ++ setupIterators(job, scanner, split.getTableName(), split); + } catch (Exception e) { + throw new IOException(e); + } + + // setup a scanner within the bounds of this split + for (Pair<Text,Text> c : tableConfig.getFetchedColumns()) { + if (c.getSecond() != null) { + log.debug("Fetching column " + c.getFirst() + ":" + c.getSecond()); + scanner.fetchColumn(c.getFirst(), c.getSecond()); + } else { + log.debug("Fetching column family " + c.getFirst()); + scanner.fetchColumnFamily(c.getFirst()); + } + } + + scanner.setRange(split.getRange()); + + numKeysRead = 0; + + // do this last after setting all scanner options + scannerIterator = scanner.iterator(); + } + + @Override + public void close() {} + + @Override + public long getPos() throws IOException { + return numKeysRead; + } + + @Override + public float getProgress() throws IOException { + if (numKeysRead > 0 && currentKey == null) + return 1.0f; + return split.getProgress(currentKey); + } + + protected Key currentKey = null; + + } + + Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobConf job, String tableId, List<Range> ranges) throws TableNotFoundException, AccumuloException, + AccumuloSecurityException { + + Instance instance = getInstance(job); + Connector conn = instance.getConnector(getPrincipal(job), getAuthenticationToken(job)); + + return InputConfigurator.binOffline(tableId, ranges, instance, conn); + } + + /** + * Read the metadata table to get tablets and match up ranges to them. + */ + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - log.setLevel(getLogLevel(job)); ++ Level logLevel = getLogLevel(job); ++ log.setLevel(logLevel); + validateOptions(job); + + LinkedList<InputSplit> splits = new LinkedList<InputSplit>(); + Map<String,InputTableConfig> tableConfigs = getInputTableConfigs(job); + for (Map.Entry<String,InputTableConfig> tableConfigEntry : tableConfigs.entrySet()) { + String tableName = tableConfigEntry.getKey(); + InputTableConfig tableConfig = tableConfigEntry.getValue(); ++ ++ Instance instance = getInstance(job); ++ boolean mockInstance; ++ String tableId; ++ // resolve table name to id once, and use id from this point forward ++ if (instance instanceof MockInstance) { ++ tableId = ""; ++ mockInstance = true; ++ } else { ++ try { ++ tableId = Tables.getTableId(instance, tableName); ++ } catch (TableNotFoundException e) { ++ throw new IOException(e); ++ } ++ mockInstance = false; ++ } ++ ++ Authorizations auths = getScanAuthorizations(job); ++ String principal = getPrincipal(job); ++ AuthenticationToken token = getAuthenticationToken(job); ++ + boolean autoAdjust = tableConfig.shouldAutoAdjustRanges(); - String tableId = null; + List<Range> ranges = autoAdjust ? Range.mergeOverlapping(tableConfig.getRanges()) : tableConfig.getRanges(); + if (ranges.isEmpty()) { + ranges = new ArrayList<Range>(1); + ranges.add(new Range()); + } + + // get the metadata information for these ranges + Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>(); + TabletLocator tl; + try { - // resolve table name to id once, and use id from this point forward - Instance instance = getInstance(job); - if (instance instanceof MockInstance) - tableId = ""; - else - tableId = Tables.getTableId(instance, tableName); + if (tableConfig.isOfflineScan()) { + binnedRanges = binOfflineTable(job, tableId, ranges); + while (binnedRanges == null) { + // Some tablets were still online, try again + UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms + binnedRanges = binOfflineTable(job, tableId, ranges); + } + } else { + tl = getTabletLocator(job, tableId); + // its possible that the cache could contain complete, but old information about a tables tablets... so clear it + tl.invalidateCache(); + Credentials creds = new Credentials(getPrincipal(job), getAuthenticationToken(job)); + + while (!tl.binRanges(creds, ranges, binnedRanges).isEmpty()) { + if (!(instance instanceof MockInstance)) { + if (!Tables.exists(instance, tableId)) + throw new TableDeletedException(tableId); + if (Tables.getTableState(instance, tableId) == TableState.OFFLINE) + throw new TableOfflineException(instance, tableId); + } + binnedRanges.clear(); + log.warn("Unable to locate bins for specified ranges. Retrying."); + UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms + tl.invalidateCache(); + } + } + } catch (Exception e) { + throw new IOException(e); + } + + HashMap<Range,ArrayList<String>> splitsToAdd = null; + + if (!autoAdjust) + splitsToAdd = new HashMap<Range,ArrayList<String>>(); + + HashMap<String,String> hostNameCache = new HashMap<String,String>(); + for (Map.Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) { + String ip = tserverBin.getKey().split(":", 2)[0]; + String location = hostNameCache.get(ip); + if (location == null) { + InetAddress inetAddress = InetAddress.getByName(ip); + location = inetAddress.getHostName(); + hostNameCache.put(ip, location); + } + for (Map.Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) { + Range ke = extentRanges.getKey().toDataRange(); + for (Range r : extentRanges.getValue()) { + if (autoAdjust) { + // divide ranges into smaller ranges, based on the tablets - splits.add(new RangeInputSplit(tableName, tableId, ke.clip(r), new String[] {location})); ++ RangeInputSplit split = new RangeInputSplit(tableName, tableId, ke.clip(r), new String[] {location}); ++ ++ split.setOffline(tableConfig.isOfflineScan()); ++ split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners()); ++ split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators()); ++ split.setMockInstance(mockInstance); ++ split.setFetchedColumns(tableConfig.getFetchedColumns()); ++ split.setPrincipal(principal); ++ split.setToken(token); ++ split.setInstanceName(instance.getInstanceName()); ++ split.setZooKeepers(instance.getZooKeepers()); ++ split.setAuths(auths); ++ split.setIterators(tableConfig.getIterators()); ++ split.setLogLevel(logLevel); ++ ++ splits.add(split); + } else { + // don't divide ranges + ArrayList<String> locations = splitsToAdd.get(r); + if (locations == null) + locations = new ArrayList<String>(1); + locations.add(location); + splitsToAdd.put(r, locations); + } + } + } + } + + if (!autoAdjust) - for (Map.Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet()) - splits.add(new RangeInputSplit(tableName, tableId, entry.getKey(), entry.getValue().toArray(new String[0]))); ++ for (Map.Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet()) { ++ RangeInputSplit split = new RangeInputSplit(tableName, tableId, entry.getKey(), entry.getValue().toArray(new String[0])); ++ ++ split.setOffline(tableConfig.isOfflineScan()); ++ split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners()); ++ split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators()); ++ split.setMockInstance(mockInstance); ++ split.setFetchedColumns(tableConfig.getFetchedColumns()); ++ split.setPrincipal(principal); ++ split.setToken(token); ++ split.setInstanceName(instance.getInstanceName()); ++ split.setZooKeepers(instance.getZooKeepers()); ++ split.setAuths(auths); ++ split.setIterators(tableConfig.getIterators()); ++ split.setLogLevel(logLevel); ++ ++ splits.add(split); ++ } + } + + return splits.toArray(new InputSplit[splits.size()]); + } + - /** - * The Class RangeInputSplit. Encapsulates an Accumulo range for use in Map Reduce jobs. - */ - public static class RangeInputSplit extends org.apache.accumulo.core.client.mapreduce.InputFormatBase.RangeInputSplit implements InputSplit { - - public RangeInputSplit() { - super(); - } - - public RangeInputSplit(RangeInputSplit split) throws IOException { - super(split); - } - - protected RangeInputSplit(String table, String tableId, Range range, String[] locations) { - super(table, tableId, range, locations); - } - } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/aec43807/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java index 917b71d,534a095..135791e --- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java @@@ -19,7 -19,6 +19,8 @@@ package org.apache.accumulo.core.client import java.io.IOException; import java.util.Map.Entry; +import org.apache.accumulo.core.client.ClientConfiguration; ++import org.apache.accumulo.core.client.mapreduce.RangeInputSplit; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; @@@ -30,6 -29,6 +31,7 @@@ import org.apache.hadoop.mapred.InputSp import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; ++import org.apache.log4j.Level; /** * This class allows MapReduce jobs to use Accumulo as the source of data. This {@link InputFormat} provides keys and values of type {@link Key} and @@@ -52,8 -49,8 +54,18 @@@ public class AccumuloInputFormat extend @Override public RecordReader<Key,Value> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { log.setLevel(getLogLevel(job)); ++ ++ // Override the log level from the configuration as if the RangeInputSplit has one it's the more correct one to use. ++ if (split instanceof RangeInputSplit) { ++ RangeInputSplit risplit = (RangeInputSplit) split; ++ Level level = risplit.getLogLevel(); ++ if (null != level) { ++ log.setLevel(level); ++ } ++ } ++ RecordReaderBase<Key,Value> recordReader = new RecordReaderBase<Key,Value>() { - + @Override public boolean next(Key key, Value value) throws IOException { if (scannerIterator.hasNext()) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/aec43807/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java index b368bf3,0be4706..b6a8258 --- a/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java @@@ -17,21 -17,49 +17,22 @@@ package org.apache.accumulo.core.client.mapred; import java.io.IOException; -import java.net.InetAddress; -import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.Map.Entry; import java.util.Set; -import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.ClientSideIteratorScanner; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.IsolatedScanner; import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.RowIterator; import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableDeletedException; import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.TableOfflineException; -import org.apache.accumulo.core.client.ZooKeeperInstance; -import org.apache.accumulo.core.client.impl.OfflineScanner; -import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.core.client.impl.TabletLocator; + import org.apache.accumulo.core.client.mapred.RangeInputSplit; import org.apache.accumulo.core.client.mapreduce.lib.util.InputConfigurator; -import org.apache.accumulo.core.client.mock.MockInstance; -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.KeyExtent; -import org.apache.accumulo.core.data.PartialKey; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.master.state.tables.TableState; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.security.CredentialHelper; -import org.apache.accumulo.core.security.thrift.TCredentials; import org.apache.accumulo.core.util.Pair; -import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; @@@ -295,30 -495,50 +296,30 @@@ public abstract class InputFormatBase<K protected static boolean isOfflineScan(JobConf job) { return InputConfigurator.isOfflineScan(CLASS, job); } - + /** - * Initializes an Accumulo {@link TabletLocator} based on the configuration. + * Initializes an Accumulo {@link org.apache.accumulo.core.client.impl.TabletLocator} based on the configuration. * * @param job - * the Hadoop context for the configured job + * the Hadoop job for the configured job * @return an Accumulo tablet locator - * @throws TableNotFoundException - * if the table name set on the configuration doesn't exist + * @throws org.apache.accumulo.core.client.TableNotFoundException + * if the table name set on the job doesn't exist * @since 1.5.0 + * @deprecated since 1.6.0 */ + @Deprecated protected static TabletLocator getTabletLocator(JobConf job) throws TableNotFoundException { - return InputConfigurator.getTabletLocator(CLASS, job); + return InputConfigurator.getTabletLocator(CLASS, job, InputConfigurator.getInputTableName(CLASS, job)); } - - // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job) - /** - * Check whether a configuration is fully configured to be used with an Accumulo {@link org.apache.hadoop.mapreduce.InputFormat}. - * - * @param job - * the Hadoop context for the configured job - * @throws IOException - * if the context is improperly configured - * @since 1.5.0 - */ - protected static void validateOptions(JobConf job) throws IOException { - InputConfigurator.validateOptions(CLASS, job); - } - - /** - * An abstract base class to be used to create {@link RecordReader} instances that convert from Accumulo {@link Key}/{@link Value} pairs to the user's K/V - * types. - * - * Subclasses must implement {@link #next(Object, Object)} to update key and value, and also to update the following variables: - * <ul> - * <li>Key {@link #currentKey} (used for progress reporting)</li> - * <li>int {@link #numKeysRead} (used for progress reporting)</li> - * </ul> - */ - protected abstract static class RecordReaderBase<K,V> implements RecordReader<K,V> { - protected long numKeysRead; - protected Iterator<Entry<Key,Value>> scannerIterator; - protected RangeInputSplit split; - + + protected abstract static class RecordReaderBase<K,V> extends AbstractRecordReader<K,V> { + + @Override - protected void setupIterators(JobConf job, Scanner scanner, String tableName) { - setupIterators(job, scanner); ++ protected void setupIterators(JobConf job, Scanner scanner, String tableName, RangeInputSplit split) { ++ setupIterators(job, scanner, split); + } + /** * Apply the configured iterators from the configuration to the scanner. * @@@ -327,11 -547,367 +328,16 @@@ * @param scanner * the scanner to configure */ - protected void setupIterators(JobConf job, Scanner scanner) { - List<IteratorSetting> iterators = getIterators(job); - protected void setupIterators(List<IteratorSetting> iterators, Scanner scanner) { - for (IteratorSetting iterator : iterators) { - scanner.addScanIterator(iterator); - } - } - - /** - * Initialize a scanner over the given input split using this task attempt configuration. - */ - public void initialize(InputSplit inSplit, JobConf job) throws IOException { - Scanner scanner; - split = (RangeInputSplit) inSplit; - log.debug("Initializing input split: " + split.getRange()); - - Instance instance = split.getInstance(); - if (null == instance) { - instance = getInstance(job); - } - - String principal = split.getPrincipal(); - if (null == principal) { - principal = getPrincipal(job); - } - - AuthenticationToken token = split.getToken(); - if (null == token) { - String tokenClass = getTokenClass(job); - byte[] tokenBytes = getToken(job); - try { - token = CredentialHelper.extractToken(tokenClass, tokenBytes); - } catch (AccumuloSecurityException e) { - throw new IOException(e); - } - } - - Authorizations authorizations = split.getAuths(); - if (null == authorizations) { - authorizations = getScanAuthorizations(job); - } - - String table = split.getTable(); - if (null == table) { - table = getInputTableName(job); - } - - Boolean isOffline = split.isOffline(); - if (null == isOffline) { - isOffline = isOfflineScan(job); - } - - Boolean isIsolated = split.isIsolatedScan(); - if (null == isIsolated) { - isIsolated = isIsolated(job); - } - - Boolean usesLocalIterators = split.usesLocalIterators(); - if (null == usesLocalIterators) { - usesLocalIterators = usesLocalIterators(job); - } - ++ protected void setupIterators(JobConf job, Scanner scanner, RangeInputSplit split) { + List<IteratorSetting> iterators = split.getIterators(); ++ + if (null == iterators) { + iterators = getIterators(job); + } + - Set<Pair<Text,Text>> columns = split.getFetchedColumns(); - if (null == columns) { - columns = getFetchedColumns(job); - } - - try { - log.debug("Creating connector with user: " + principal); - Connector conn = instance.getConnector(principal, token); - log.debug("Creating scanner for table: " + table); - log.debug("Authorizations are: " + authorizations); - if (isOfflineScan(job)) { - String tokenClass = token.getClass().getCanonicalName(); - ByteBuffer tokenBuffer = ByteBuffer.wrap(CredentialHelper.toBytes(token)); - scanner = new OfflineScanner(instance, new TCredentials(principal, tokenClass, tokenBuffer, instance.getInstanceID()), Tables.getTableId( - instance, table), authorizations); - } else { - scanner = conn.createScanner(table, authorizations); - } - if (isIsolated) { - log.info("Creating isolated scanner"); - scanner = new IsolatedScanner(scanner); - } - if (usesLocalIterators) { - log.info("Using local iterators"); - scanner = new ClientSideIteratorScanner(scanner); - } - setupIterators(iterators, scanner); - } catch (Exception e) { - throw new IOException(e); - } - - // setup a scanner within the bounds of this split - for (Pair<Text,Text> c : columns) { - if (c.getSecond() != null) { - log.debug("Fetching column " + c.getFirst() + ":" + c.getSecond()); - scanner.fetchColumn(c.getFirst(), c.getSecond()); - } else { - log.debug("Fetching column family " + c.getFirst()); - scanner.fetchColumnFamily(c.getFirst()); - } - } - - scanner.setRange(split.getRange()); - - numKeysRead = 0; - - // do this last after setting all scanner options - scannerIterator = scanner.iterator(); - } - - @Override - public void close() {} - - @Override - public long getPos() throws IOException { - return numKeysRead; - } - - @Override - public float getProgress() throws IOException { - if (numKeysRead > 0 && currentKey == null) - return 1.0f; - return split.getProgress(currentKey); - } - - protected Key currentKey = null; - - } - - Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobConf job, String tableName, List<Range> ranges) throws TableNotFoundException, AccumuloException, - AccumuloSecurityException { - - Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>(); - - Instance instance = getInstance(job); - Connector conn = instance.getConnector(getPrincipal(job), CredentialHelper.extractToken(getTokenClass(job), getToken(job))); - String tableId = Tables.getTableId(instance, tableName); - - if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) { - Tables.clearCache(instance); - if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) { - throw new AccumuloException("Table is online " + tableName + "(" + tableId + ") cannot scan table in offline mode "); - } - } - - for (Range range : ranges) { - Text startRow; - - if (range.getStartKey() != null) - startRow = range.getStartKey().getRow(); - else - startRow = new Text(); - - Range metadataRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(), true, null, false); - Scanner scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS); - Constants.METADATA_PREV_ROW_COLUMN.fetch(scanner); - scanner.fetchColumnFamily(Constants.METADATA_LAST_LOCATION_COLUMN_FAMILY); - scanner.fetchColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY); - scanner.fetchColumnFamily(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY); - scanner.setRange(metadataRange); - - RowIterator rowIter = new RowIterator(scanner); - - KeyExtent lastExtent = null; - - while (rowIter.hasNext()) { - Iterator<Entry<Key,Value>> row = rowIter.next(); - String last = ""; - KeyExtent extent = null; - String location = null; - - while (row.hasNext()) { - Entry<Key,Value> entry = row.next(); - Key key = entry.getKey(); - - if (key.getColumnFamily().equals(Constants.METADATA_LAST_LOCATION_COLUMN_FAMILY)) { - last = entry.getValue().toString(); - } - - if (key.getColumnFamily().equals(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY) - || key.getColumnFamily().equals(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY)) { - location = entry.getValue().toString(); - } - - if (Constants.METADATA_PREV_ROW_COLUMN.hasColumns(key)) { - extent = new KeyExtent(key.getRow(), entry.getValue()); - } - - } - - if (location != null) - return null; - - if (!extent.getTableId().toString().equals(tableId)) { - throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent); - } - - if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) { - throw new AccumuloException(" " + lastExtent + " is not previous extent " + extent); - } - - Map<KeyExtent,List<Range>> tabletRanges = binnedRanges.get(last); - if (tabletRanges == null) { - tabletRanges = new HashMap<KeyExtent,List<Range>>(); - binnedRanges.put(last, tabletRanges); - } - - List<Range> rangeList = tabletRanges.get(extent); - if (rangeList == null) { - rangeList = new ArrayList<Range>(); - tabletRanges.put(extent, rangeList); - } - - rangeList.add(range); - - if (extent.getEndRow() == null || range.afterEndKey(new Key(extent.getEndRow()).followingKey(PartialKey.ROW))) { - break; - } - - lastExtent = extent; - } - - } - - return binnedRanges; - } - - /** - * Read the metadata table to get tablets and match up ranges to them. - */ - @Override - public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - Level logLevel = getLogLevel(job); - log.setLevel(logLevel); - - validateOptions(job); - - String tableName = getInputTableName(job); - boolean autoAdjust = getAutoAdjustRanges(job); - List<Range> ranges = autoAdjust ? Range.mergeOverlapping(getRanges(job)) : getRanges(job); - Instance instance = getInstance(job); - boolean offline = isOfflineScan(job); - boolean isolated = isIsolated(job); - boolean localIterators = usesLocalIterators(job); - boolean mockInstance = (null != instance && MockInstance.class.equals(instance.getClass())); - Set<Pair<Text,Text>> fetchedColumns = getFetchedColumns(job); - Authorizations auths = getScanAuthorizations(job); - String principal = getPrincipal(job); - String tokenClass = getTokenClass(job); - byte[] tokenBytes = getToken(job); - - AuthenticationToken token; - try { - token = CredentialHelper.extractToken(tokenClass, tokenBytes); - } catch (AccumuloSecurityException e) { - throw new IOException(e); - } - - List<IteratorSetting> iterators = getIterators(job); - - if (ranges.isEmpty()) { - ranges = new ArrayList<Range>(1); - ranges.add(new Range()); - } - - // get the metadata information for these ranges - Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>(); - TabletLocator tl; - try { - if (isOfflineScan(job)) { - binnedRanges = binOfflineTable(job, tableName, ranges); - while (binnedRanges == null) { - // Some tablets were still online, try again - UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms - binnedRanges = binOfflineTable(job, tableName, ranges); - } - } else { - String tableId = null; - tl = getTabletLocator(job); - // its possible that the cache could contain complete, but old information about a tables tablets... so clear it - tl.invalidateCache(); - while (!tl.binRanges(ranges, binnedRanges, new TCredentials(principal, tokenClass, ByteBuffer.wrap(tokenBytes), instance.getInstanceID())).isEmpty()) { - if (!(instance instanceof MockInstance)) { - if (tableId == null) - tableId = Tables.getTableId(instance, tableName); - if (!Tables.exists(instance, tableId)) - throw new TableDeletedException(tableId); - if (Tables.getTableState(instance, tableId) == TableState.OFFLINE) - throw new TableOfflineException(instance, tableId); - } - binnedRanges.clear(); - log.warn("Unable to locate bins for specified ranges. Retrying."); - UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms - tl.invalidateCache(); - } - } - } catch (Exception e) { - throw new IOException(e); - } - - ArrayList<RangeInputSplit> splits = new ArrayList<RangeInputSplit>(ranges.size()); - HashMap<Range,ArrayList<String>> splitsToAdd = null; - - if (!autoAdjust) - splitsToAdd = new HashMap<Range,ArrayList<String>>(); - - HashMap<String,String> hostNameCache = new HashMap<String,String>(); - - for (Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) { - String ip = tserverBin.getKey().split(":", 2)[0]; - String location = hostNameCache.get(ip); - if (location == null) { - InetAddress inetAddress = InetAddress.getByName(ip); - location = inetAddress.getHostName(); - hostNameCache.put(ip, location); - } - - for (Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) { - Range ke = extentRanges.getKey().toDataRange(); - for (Range r : extentRanges.getValue()) { - if (autoAdjust) { - // divide ranges into smaller ranges, based on the tablets - splits.add(new RangeInputSplit(ke.clip(r), new String[] {location})); - } else { - // don't divide ranges - ArrayList<String> locations = splitsToAdd.get(r); - if (locations == null) - locations = new ArrayList<String>(1); - locations.add(location); - splitsToAdd.put(r, locations); - } - } - } - } - - if (!autoAdjust) - for (Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet()) - splits.add(new RangeInputSplit(entry.getKey(), entry.getValue().toArray(new String[0]))); - - for (RangeInputSplit split : splits) { - split.setTable(tableName); - split.setOffline(offline); - split.setIsolatedScan(isolated); - split.setUsesLocalIterators(localIterators); - split.setMockInstance(mockInstance); - split.setFetchedColumns(fetchedColumns); - split.setPrincipal(principal); - split.setToken(token); - split.setInstanceName(instance.getInstanceName()); - split.setZooKeepers(instance.getZooKeepers()); - split.setAuths(auths); - split.setIterators(iterators); - split.setLogLevel(logLevel); + for (IteratorSetting iterator : iterators) + scanner.addScanIterator(iterator); } - - return splits.toArray(new InputSplit[splits.size()]); } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/aec43807/core/src/main/java/org/apache/accumulo/core/client/mapred/RangeInputSplit.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/client/mapred/RangeInputSplit.java index 0000000,b35cef5..3fd2ab0 mode 000000,100644..100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapred/RangeInputSplit.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/RangeInputSplit.java @@@ -1,0 -1,35 +1,40 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.accumulo.core.client.mapred; + ++import java.io.IOException; ++ + import org.apache.accumulo.core.data.Range; + import org.apache.hadoop.mapred.InputSplit; + + /** + * The Class RangeInputSplit. Encapsulates an Accumulo range for use in Map Reduce jobs. + */ + public class RangeInputSplit extends org.apache.accumulo.core.client.mapreduce.RangeInputSplit implements InputSplit { + + public RangeInputSplit() { + super(); + } + - public RangeInputSplit(Range range, String[] locations) { - super(range, locations); ++ public RangeInputSplit(RangeInputSplit split) throws IOException { ++ super(split); ++ } ++ ++ protected RangeInputSplit(String table, String tableId, Range range, String[] locations) { ++ super(table, tableId, range, locations); + } - + }