ACCUMULO-3602 ACCUMULO-3657 Minimize AccumuloInputSplit in API
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/d1e6e79c Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/d1e6e79c Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/d1e6e79c Branch: refs/heads/1.7 Commit: d1e6e79cf12ee420dd1d20fd605723f0e5505f68 Parents: c625291 Author: Keith Turner <ktur...@apache.org> Authored: Tue Apr 21 17:30:48 2015 -0400 Committer: Keith Turner <ktur...@apache.org> Committed: Tue Apr 21 17:53:32 2015 -0400 ---------------------------------------------------------------------- .../core/client/mapred/AbstractInputFormat.java | 58 ++++++++-------- .../client/mapreduce/AbstractInputFormat.java | 56 ++++++++------- .../core/client/mapreduce/RangeInputSplit.java | 24 ++----- .../mapreduce/impl/AccumuloInputSplit.java | 73 +++++++++----------- .../client/mapreduce/impl/BatchInputSplit.java | 21 ++---- .../core/client/mapreduce/impl/SplitUtils.java | 59 ++++++++++++++++ 6 files changed, 162 insertions(+), 129 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/d1e6e79c/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java index b97d4de..f2e3a79 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java @@ -29,18 +29,18 @@ import java.util.Random; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchScanner; import org.apache.accumulo.core.client.ClientConfiguration; import org.apache.accumulo.core.client.ClientSideIteratorScanner; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.IsolatedScanner; +import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.BatchScanner; import org.apache.accumulo.core.client.ScannerBase; import org.apache.accumulo.core.client.TableDeletedException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.TableOfflineException; -import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.admin.DelegationTokenConfig; import org.apache.accumulo.core.client.admin.SecurityOperations; import org.apache.accumulo.core.client.impl.AuthenticationTokenIdentifier; @@ -54,6 +54,7 @@ import org.apache.accumulo.core.client.impl.TabletLocator; import org.apache.accumulo.core.client.mapred.impl.BatchInputSplit; import org.apache.accumulo.core.client.mapreduce.InputTableConfig; import org.apache.accumulo.core.client.mapreduce.impl.AccumuloInputSplit; +import org.apache.accumulo.core.client.mapreduce.impl.SplitUtils; import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase; import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator; import org.apache.accumulo.core.client.mock.MockInstance; @@ -394,7 +395,8 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> { protected abstract static class AbstractRecordReader<K,V> implements RecordReader<K,V> { protected long numKeysRead; protected Iterator<Map.Entry<Key,Value>> scannerIterator; - protected org.apache.accumulo.core.client.mapreduce.impl.AccumuloInputSplit split; + protected RangeInputSplit split; + private org.apache.accumulo.core.client.mapreduce.impl.AccumuloInputSplit aiSplit; protected ScannerBase scannerBase; @@ -458,42 +460,42 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> { * Initialize a scanner over the given input split using this task attempt configuration. */ public void initialize(InputSplit inSplit, JobConf job) throws IOException { - split = (AccumuloInputSplit) inSplit; - log.debug("Initializing input split: " + split.toString()); + aiSplit = (AccumuloInputSplit) inSplit; + log.debug("Initializing input split: " + aiSplit.toString()); - Instance instance = split.getInstance(getClientConfiguration(job)); + Instance instance = aiSplit.getInstance(getClientConfiguration(job)); if (null == instance) { instance = getInstance(job); } - String principal = split.getPrincipal(); + String principal = aiSplit.getPrincipal(); if (null == principal) { principal = getPrincipal(job); } - AuthenticationToken token = split.getToken(); + AuthenticationToken token = aiSplit.getToken(); if (null == token) { token = getAuthenticationToken(job); } - Authorizations authorizations = split.getAuths(); + Authorizations authorizations = aiSplit.getAuths(); if (null == authorizations) { authorizations = getScanAuthorizations(job); } - String table = split.getTableName(); + String table = aiSplit.getTableName(); // in case the table name changed, we can still use the previous name for terms of configuration, // but the scanner will use the table id resolved at job setup time - InputTableConfig tableConfig = getInputTableConfig(job, split.getTableName()); + InputTableConfig tableConfig = getInputTableConfig(job, aiSplit.getTableName()); log.debug("Creating connector with user: " + principal); log.debug("Creating scanner for table: " + table); log.debug("Authorizations are: " + authorizations); - if (split instanceof org.apache.accumulo.core.client.mapreduce.RangeInputSplit) { - org.apache.accumulo.core.client.mapreduce.RangeInputSplit rangeSplit = (org.apache.accumulo.core.client.mapreduce.RangeInputSplit) split; - + if (aiSplit instanceof RangeInputSplit) { + RangeInputSplit rangeSplit = (RangeInputSplit) aiSplit; + split = rangeSplit; Boolean isOffline = rangeSplit.isOffline(); if (null == isOffline) { isOffline = tableConfig.isOfflineScan(); @@ -513,13 +515,13 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> { try { if (isOffline) { - scanner = new OfflineScanner(instance, new Credentials(principal, token), split.getTableId(), authorizations); + scanner = new OfflineScanner(instance, new Credentials(principal, token), aiSplit.getTableId(), authorizations); } else if (instance instanceof MockInstance) { - scanner = instance.getConnector(principal, token).createScanner(split.getTableName(), authorizations); + scanner = instance.getConnector(principal, token).createScanner(aiSplit.getTableName(), authorizations); } else { ClientConfiguration clientConf = getClientConfiguration(job); ClientContext context = new ClientContext(instance, new Credentials(principal, token), clientConf); - scanner = new ScannerImpl(context, split.getTableId(), authorizations); + scanner = new ScannerImpl(context, aiSplit.getTableId(), authorizations); } if (isIsolated) { log.info("Creating isolated scanner"); @@ -529,7 +531,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> { log.info("Using local iterators"); scanner = new ClientSideIteratorScanner(scanner); } - setupIterators(job, scanner, split.getTableName(), split); + setupIterators(job, scanner, aiSplit.getTableName(), aiSplit); } catch (Exception e) { throw new IOException(e); } @@ -537,15 +539,15 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> { scanner.setRange(rangeSplit.getRange()); scannerBase = scanner; - } else if (split instanceof BatchInputSplit) { + } else if (aiSplit instanceof BatchInputSplit) { BatchScanner scanner; - BatchInputSplit multiRangeSplit = (BatchInputSplit) split; + BatchInputSplit multiRangeSplit = (BatchInputSplit) aiSplit; try{ // Note: BatchScanner will use at most one thread per tablet, currently BatchInputSplit will not span tablets int scanThreads = 1; - scanner = instance.getConnector(principal, token).createBatchScanner(split.getTableName(), authorizations, scanThreads); - setupIterators(job, scanner, split.getTableName(), split); + scanner = instance.getConnector(principal, token).createBatchScanner(aiSplit.getTableName(), authorizations, scanThreads); + setupIterators(job, scanner, aiSplit.getTableName(), aiSplit); } catch (Exception e) { throw new IOException(e); } @@ -554,10 +556,10 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> { scannerBase = scanner; } else { - throw new IllegalArgumentException("Can not initialize from " + split.getClass().toString()); + throw new IllegalArgumentException("Can not initialize from " + aiSplit.getClass().toString()); } - Collection<Pair<Text,Text>> columns = split.getFetchedColumns(); + Collection<Pair<Text,Text>> columns = aiSplit.getFetchedColumns(); if (null == columns) { columns = tableConfig.getFetchedColumns(); } @@ -593,7 +595,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> { public float getProgress() throws IOException { if (numKeysRead > 0 && currentKey == null) return 1.0f; - return split.getProgress(currentKey); + return aiSplit.getProgress(currentKey); } protected Key currentKey = null; @@ -721,7 +723,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> { clippedRanges.add(ke.clip(r)); BatchInputSplit split = new BatchInputSplit(tableName, tableId, clippedRanges, new String[] {location}); - AccumuloInputSplit.updateSplit(split, instance, tableConfig, principal, token, auths, logLevel); + SplitUtils.updateSplit(split, instance, tableConfig, principal, token, auths, logLevel); splits.add(split); } else { @@ -730,7 +732,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> { if (autoAdjust) { // divide ranges into smaller ranges, based on the tablets RangeInputSplit split = new RangeInputSplit(tableName, tableId, ke.clip(r), new String[] {location}); - AccumuloInputSplit.updateSplit(split, instance, tableConfig, principal, token, auths, logLevel); + SplitUtils.updateSplit(split, instance, tableConfig, principal, token, auths, logLevel); split.setOffline(tableConfig.isOfflineScan()); split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners()); split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators()); @@ -752,7 +754,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> { if (!autoAdjust) for (Map.Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet()) { RangeInputSplit split = new RangeInputSplit(tableName, tableId, entry.getKey(), entry.getValue().toArray(new String[0])); - AccumuloInputSplit.updateSplit(split, instance, tableConfig, principal, token, auths, logLevel); + SplitUtils.updateSplit(split, instance, tableConfig, principal, token, auths, logLevel); split.setOffline(tableConfig.isOfflineScan()); split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners()); split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators()); http://git-wip-us.apache.org/repos/asf/accumulo/blob/d1e6e79c/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java index c7a304c..d402bb0 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java @@ -29,13 +29,14 @@ import java.util.Random; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchScanner; import org.apache.accumulo.core.client.ClientConfiguration; import org.apache.accumulo.core.client.ClientSideIteratorScanner; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.IsolatedScanner; +import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.BatchScanner; import org.apache.accumulo.core.client.ScannerBase; import org.apache.accumulo.core.client.TableDeletedException; import org.apache.accumulo.core.client.TableNotFoundException; @@ -52,6 +53,7 @@ import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.core.client.impl.TabletLocator; import org.apache.accumulo.core.client.mapreduce.impl.AccumuloInputSplit; import org.apache.accumulo.core.client.mapreduce.impl.BatchInputSplit; +import org.apache.accumulo.core.client.mapreduce.impl.SplitUtils; import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase; import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator; import org.apache.accumulo.core.client.mock.MockInstance; @@ -67,7 +69,6 @@ import org.apache.accumulo.core.master.state.tables.TableState; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.UtilWaitThread; -import org.apache.accumulo.core.client.IteratorSetting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputFormat; @@ -425,7 +426,8 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> { protected long numKeysRead; protected Iterator<Map.Entry<Key,Value>> scannerIterator; protected ScannerBase scannerBase; - protected AccumuloInputSplit split; + protected RangeInputSplit split; + private AccumuloInputSplit aiSplit; /** * Extracts Iterators settings from the context to be used by RecordReader. @@ -489,41 +491,42 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> { @Override public void initialize(InputSplit inSplit, TaskAttemptContext attempt) throws IOException { - split = (AccumuloInputSplit) inSplit; - log.debug("Initializing input split: " + split.toString()); + aiSplit = (AccumuloInputSplit) inSplit; + log.debug("Initializing input split: " + aiSplit.toString()); - Instance instance = split.getInstance(getClientConfiguration(attempt)); + Instance instance = aiSplit.getInstance(getClientConfiguration(attempt)); if (null == instance) { instance = getInstance(attempt); } - String principal = split.getPrincipal(); + String principal = aiSplit.getPrincipal(); if (null == principal) { principal = getPrincipal(attempt); } - AuthenticationToken token = split.getToken(); + AuthenticationToken token = aiSplit.getToken(); if (null == token) { token = getAuthenticationToken(attempt); } - Authorizations authorizations = split.getAuths(); + Authorizations authorizations = aiSplit.getAuths(); if (null == authorizations) { authorizations = getScanAuthorizations(attempt); } - String table = split.getTableName(); + String table = aiSplit.getTableName(); // in case the table name changed, we can still use the previous name for terms of configuration, // but the scanner will use the table id resolved at job setup time - InputTableConfig tableConfig = getInputTableConfig(attempt, split.getTableName()); + InputTableConfig tableConfig = getInputTableConfig(attempt, aiSplit.getTableName()); log.debug("Creating connector with user: " + principal); log.debug("Creating scanner for table: " + table); log.debug("Authorizations are: " + authorizations); - if (split instanceof RangeInputSplit) { - RangeInputSplit rangeSplit = (RangeInputSplit) split; + if (aiSplit instanceof RangeInputSplit) { + RangeInputSplit rangeSplit = (RangeInputSplit) aiSplit; + split = rangeSplit; Scanner scanner; Boolean isOffline = rangeSplit.isOffline(); @@ -543,13 +546,13 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> { try { if (isOffline) { - scanner = new OfflineScanner(instance, new Credentials(principal, token), split.getTableId(), authorizations); + scanner = new OfflineScanner(instance, new Credentials(principal, token), aiSplit.getTableId(), authorizations); } else if (instance instanceof MockInstance) { - scanner = instance.getConnector(principal, token).createScanner(split.getTableName(), authorizations); + scanner = instance.getConnector(principal, token).createScanner(aiSplit.getTableName(), authorizations); } else { ClientConfiguration clientConf = getClientConfiguration(attempt); ClientContext context = new ClientContext(instance, new Credentials(principal, token), clientConf); - scanner = new ScannerImpl(context, split.getTableId(), authorizations); + scanner = new ScannerImpl(context, aiSplit.getTableId(), authorizations); } if (isIsolated) { log.info("Creating isolated scanner"); @@ -560,7 +563,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> { scanner = new ClientSideIteratorScanner(scanner); } - setupIterators(attempt, scanner, split.getTableName(), split); + setupIterators(attempt, scanner, aiSplit.getTableName(), aiSplit); } catch (Exception e) { throw new IOException(e); } @@ -568,16 +571,17 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> { scanner.setRange(rangeSplit.getRange()); scannerBase = scanner; - } else if (split instanceof BatchInputSplit) { - BatchInputSplit batchSplit = (BatchInputSplit) split; + } else if (aiSplit instanceof BatchInputSplit) { + BatchInputSplit batchSplit = (BatchInputSplit) aiSplit; BatchScanner scanner; try{ // Note: BatchScanner will use at most one thread per tablet, currently BatchInputSplit will not span tablets int scanThreads = 1; - scanner = instance.getConnector(principal, token).createBatchScanner(split.getTableName(), authorizations, scanThreads); - setupIterators(attempt, scanner, split.getTableName(), split); + scanner = instance.getConnector(principal, token).createBatchScanner(aiSplit.getTableName(), authorizations, scanThreads); + setupIterators(attempt, scanner, aiSplit.getTableName(), aiSplit); } catch (Exception e) { + e.printStackTrace(); throw new IOException(e); } @@ -585,7 +589,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> { scannerBase = scanner; } - Collection<Pair<Text,Text>> columns = split.getFetchedColumns(); + Collection<Pair<Text,Text>> columns = aiSplit.getFetchedColumns(); if (null == columns) { columns = tableConfig.getFetchedColumns(); } @@ -616,7 +620,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> { public float getProgress() throws IOException { if (numKeysRead > 0 && currentKey == null) return 1.0f; - return split.getProgress(currentKey); + return aiSplit.getProgress(currentKey); } /** @@ -767,7 +771,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> { for(Range r: extentRanges.getValue()) clippedRanges.add(ke.clip(r)); BatchInputSplit split = new BatchInputSplit(tableName, tableId, clippedRanges, new String[] {location}); - AccumuloInputSplit.updateSplit(split, instance, tableConfig, principal, token, auths, logLevel); + SplitUtils.updateSplit(split, instance, tableConfig, principal, token, auths, logLevel); splits.add(split); } else { @@ -776,7 +780,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> { if (autoAdjust) { // divide ranges into smaller ranges, based on the tablets RangeInputSplit split = new RangeInputSplit(tableName, tableId, ke.clip(r), new String[] {location}); - AccumuloInputSplit.updateSplit(split, instance, tableConfig, principal, token, auths, logLevel); + SplitUtils.updateSplit(split, instance, tableConfig, principal, token, auths, logLevel); split.setOffline(tableConfig.isOfflineScan()); split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners()); split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators()); @@ -798,7 +802,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> { if (!autoAdjust) for (Map.Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet()) { RangeInputSplit split = new RangeInputSplit(tableName, tableId, entry.getKey(), entry.getValue().toArray(new String[0])); - AccumuloInputSplit.updateSplit(split, instance, tableConfig, principal, token, auths, logLevel); + SplitUtils.updateSplit(split, instance, tableConfig, principal, token, auths, logLevel); split.setOffline(tableConfig.isOfflineScan()); split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners()); split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators()); http://git-wip-us.apache.org/repos/asf/accumulo/blob/d1e6e79c/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java index 6c870a0..9851192 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java @@ -19,9 +19,9 @@ package org.apache.accumulo.core.client.mapreduce; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.util.Arrays; import org.apache.accumulo.core.client.mapreduce.impl.AccumuloInputSplit; +import org.apache.accumulo.core.client.mapreduce.impl.SplitUtils; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.PartialKey; import org.apache.accumulo.core.data.Range; @@ -47,6 +47,7 @@ public class RangeInputSplit extends AccumuloInputSplit { this.range = range; } + @Override public float getProgress(Key currentKey) { if (currentKey == null) return 0f; @@ -55,13 +56,13 @@ public class RangeInputSplit extends AccumuloInputSplit { if (range.getStartKey() != null && range.getEndKey() != null) { if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW) != 0) { // just look at the row progress - return getProgress(range.getStartKey().getRowData(), range.getEndKey().getRowData(), currentKey.getRowData()); + return SplitUtils.getProgress(range.getStartKey().getRowData(), range.getEndKey().getRowData(), currentKey.getRowData()); } else if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM) != 0) { // just look at the column family progress - return getProgress(range.getStartKey().getColumnFamilyData(), range.getEndKey().getColumnFamilyData(), currentKey.getColumnFamilyData()); + return SplitUtils.getProgress(range.getStartKey().getColumnFamilyData(), range.getEndKey().getColumnFamilyData(), currentKey.getColumnFamilyData()); } else if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM_COLQUAL) != 0) { // just look at the column qualifier progress - return getProgress(range.getStartKey().getColumnQualifierData(), range.getEndKey().getColumnQualifierData(), currentKey.getColumnQualifierData()); + return SplitUtils.getProgress(range.getStartKey().getColumnQualifierData(), range.getEndKey().getColumnQualifierData(), currentKey.getColumnQualifierData()); } } } @@ -124,23 +125,10 @@ public class RangeInputSplit extends AccumuloInputSplit { StringBuilder sb = new StringBuilder(256); sb.append("RangeInputSplit:"); sb.append(" Range: ").append(range); - sb.append(" Locations: ").append(Arrays.asList(locations)); - sb.append(" Table: ").append(tableName); - sb.append(" TableID: ").append(tableId); - sb.append(" InstanceName: ").append(instanceName); - sb.append(" zooKeepers: ").append(zooKeepers); - sb.append(" principal: ").append(principal); - sb.append(" tokenSource: ").append(tokenSource); - sb.append(" authenticationToken: ").append(token); - sb.append(" authenticationTokenFile: ").append(tokenFile); - sb.append(" Authorizations: ").append(auths); + sb.append(super.toString()); sb.append(" offlineScan: ").append(offline); - sb.append(" mockInstance: ").append(mockInstance); sb.append(" isolatedScan: ").append(isolatedScan); sb.append(" localIterators: ").append(localIterators); - sb.append(" fetchColumns: ").append(fetchedColumns); - sb.append(" iterators: ").append(iterators); - sb.append(" logLevel: ").append(level); return sb.toString(); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/d1e6e79c/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/AccumuloInputSplit.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/AccumuloInputSplit.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/AccumuloInputSplit.java index 94d0026..7f83936 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/AccumuloInputSplit.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/AccumuloInputSplit.java @@ -21,7 +21,6 @@ import static java.nio.charset.StandardCharsets.UTF_8; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.math.BigInteger; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -33,18 +32,17 @@ import org.apache.accumulo.core.client.ClientConfiguration; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.ZooKeeperInstance; -import org.apache.accumulo.core.client.mapreduce.InputTableConfig; import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase.TokenSource; import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator; import org.apache.accumulo.core.client.mock.MockInstance; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer; import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.Base64; import org.apache.accumulo.core.util.Pair; -import org.apache.accumulo.core.data.Key; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputSplit; @@ -58,16 +56,16 @@ import org.apache.log4j.Level; * @see org.apache.accumulo.core.client.mapreduce.impl.BatchInputSplit */ public abstract class AccumuloInputSplit extends InputSplit implements Writable { - protected String[] locations; - protected String tableId, tableName, instanceName, zooKeepers, principal; - protected TokenSource tokenSource; - protected String tokenFile; - protected AuthenticationToken token; - protected Boolean mockInstance; - protected Authorizations auths; - protected Set<Pair<Text,Text>> fetchedColumns; - protected List<IteratorSetting> iterators; - protected Level level; + private String[] locations; + private String tableId, tableName, instanceName, zooKeepers, principal; + private TokenSource tokenSource; + private String tokenFile; + private AuthenticationToken token; + private Boolean mockInstance; + private Authorizations auths; + private Set<Pair<Text,Text>> fetchedColumns; + private List<IteratorSetting> iterators; + private Level level; public abstract float getProgress(Key currentKey); @@ -89,26 +87,7 @@ public abstract class AccumuloInputSplit extends InputSplit implements Writable this.tableId = tableId; } - /** - * Central place to set common split configuration not handled by split constructors. - * The intention is to make it harder to miss optional setters in future refactor. - */ - public static void updateSplit(AccumuloInputSplit split, Instance instance, InputTableConfig tableConfig, - String principal, AuthenticationToken token, Authorizations auths, Level logLevel) { - split.setInstanceName(instance.getInstanceName()); - split.setZooKeepers(instance.getZooKeepers()); - split.setMockInstance(instance instanceof MockInstance); - - split.setPrincipal(principal); - split.setToken(token); - split.setAuths(auths); - - split.setFetchedColumns(tableConfig.getFetchedColumns()); - split.setIterators(tableConfig.getIterators()); - split.setLogLevel(logLevel); - } - - private static byte[] extractBytes(ByteSequence seq, int numBytes) { + static byte[] extractBytes(ByteSequence seq, int numBytes) { byte[] bytes = new byte[numBytes + 1]; bytes[0] = 0; for (int i = 0; i < numBytes; i++) { @@ -120,14 +99,6 @@ public abstract class AccumuloInputSplit extends InputSplit implements Writable return bytes; } - public static float getProgress(ByteSequence start, ByteSequence end, ByteSequence position) { - int maxDepth = Math.min(Math.max(end.length(), start.length()), position.length()); - BigInteger startBI = new BigInteger(extractBytes(start, maxDepth)); - BigInteger endBI = new BigInteger(extractBytes(end, maxDepth)); - BigInteger positionBI = new BigInteger(extractBytes(position, maxDepth)); - return (float) (positionBI.subtract(startBI).doubleValue() / endBI.subtract(startBI).doubleValue()); - } - public long getRangeLength(Range range) throws IOException { Text startRow = range.isInfiniteStartKey() ? new Text(new byte[] {Byte.MIN_VALUE}) : range.getStartKey().getRow(); Text stopRow = range.isInfiniteStopKey() ? new Text(new byte[] {Byte.MAX_VALUE}) : range.getEndKey().getRow(); @@ -442,4 +413,24 @@ public abstract class AccumuloInputSplit extends InputSplit implements Writable public void setLogLevel(Level level) { this.level = level; } + + @Override + public String toString(){ + StringBuilder sb = new StringBuilder(256); + sb.append(" Locations: ").append(Arrays.asList(locations)); + sb.append(" Table: ").append(tableName); + sb.append(" TableID: ").append(tableId); + sb.append(" InstanceName: ").append(instanceName); + sb.append(" zooKeepers: ").append(zooKeepers); + sb.append(" principal: ").append(principal); + sb.append(" tokenSource: ").append(tokenSource); + sb.append(" authenticationToken: ").append(token); + sb.append(" authenticationTokenFile: ").append(tokenFile); + sb.append(" Authorizations: ").append(auths); + sb.append(" mockInstance: ").append(mockInstance); + sb.append(" fetchColumns: ").append(fetchedColumns); + sb.append(" iterators: ").append(iterators); + sb.append(" logLevel: ").append(level); + return sb.toString(); + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/d1e6e79c/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/BatchInputSplit.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/BatchInputSplit.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/BatchInputSplit.java index 269622a..24b9ef3 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/BatchInputSplit.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/BatchInputSplit.java @@ -53,6 +53,7 @@ public class BatchInputSplit extends AccumuloInputSplit { /** * Save progress on each call to this function, implied by value of currentKey, and return average ranges in the split */ + @Override public float getProgress(Key currentKey) { if (null == rangeProgress) rangeProgress = new float[ranges.size()]; @@ -70,13 +71,13 @@ public class BatchInputSplit extends AccumuloInputSplit { if (range.getStartKey() != null && range.getEndKey() != null) { if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW) != 0) { // just look at the row progress - rangeProgress[i] = getProgress(range.getStartKey().getRowData(), range.getEndKey().getRowData(), currentKey.getRowData()); + rangeProgress[i] = SplitUtils.getProgress(range.getStartKey().getRowData(), range.getEndKey().getRowData(), currentKey.getRowData()); } else if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM) != 0) { // just look at the column family progress - rangeProgress[i] = getProgress(range.getStartKey().getColumnFamilyData(), range.getEndKey().getColumnFamilyData(), currentKey.getColumnFamilyData()); + rangeProgress[i] = SplitUtils.getProgress(range.getStartKey().getColumnFamilyData(), range.getEndKey().getColumnFamilyData(), currentKey.getColumnFamilyData()); } else if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM_COLQUAL) != 0) { // just look at the column qualifier progress - rangeProgress[i] = getProgress(range.getStartKey().getColumnQualifierData(), range.getEndKey().getColumnQualifierData(), currentKey.getColumnQualifierData()); + rangeProgress[i] = SplitUtils.getProgress(range.getStartKey().getColumnQualifierData(), range.getEndKey().getColumnQualifierData(), currentKey.getColumnQualifierData()); } } total += rangeProgress[i]; @@ -126,19 +127,7 @@ public class BatchInputSplit extends AccumuloInputSplit { StringBuilder sb = new StringBuilder(256); sb.append("BatchInputSplit:"); sb.append(" Ranges: ").append(Arrays.asList(ranges)); - sb.append(" Location: ").append(Arrays.asList(locations)); - sb.append(" Table: ").append(tableName); - sb.append(" TableID: ").append(tableId); - sb.append(" InstanceName: ").append(instanceName); - sb.append(" zooKeepers: ").append(zooKeepers); - sb.append(" principal: ").append(principal); - sb.append(" tokenSource: ").append(tokenSource); - sb.append(" authenticationToken: ").append(token); - sb.append(" authenticationTokenFile: ").append(tokenFile); - sb.append(" Authorizations: ").append(auths); - sb.append(" fetchColumns: ").append(fetchedColumns); - sb.append(" iterators: ").append(iterators); - sb.append(" logLevel: ").append(level); + sb.append(super.toString()); return sb.toString(); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/d1e6e79c/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/SplitUtils.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/SplitUtils.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/SplitUtils.java new file mode 100644 index 0000000..0aee665 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/SplitUtils.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.client.mapreduce.impl; + +import java.math.BigInteger; + +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.mapreduce.InputTableConfig; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.log4j.Level; + +public class SplitUtils { + + /** + * Central place to set common split configuration not handled by split constructors. + * The intention is to make it harder to miss optional setters in future refactor. + */ + public static void updateSplit(AccumuloInputSplit split, Instance instance, InputTableConfig tableConfig, + String principal, AuthenticationToken token, Authorizations auths, Level logLevel) { + split.setInstanceName(instance.getInstanceName()); + split.setZooKeepers(instance.getZooKeepers()); + split.setMockInstance(instance instanceof MockInstance); + + split.setPrincipal(principal); + split.setToken(token); + split.setAuths(auths); + + split.setFetchedColumns(tableConfig.getFetchedColumns()); + split.setIterators(tableConfig.getIterators()); + split.setLogLevel(logLevel); + } + + public static float getProgress(ByteSequence start, ByteSequence end, ByteSequence position) { + int maxDepth = Math.min(Math.max(end.length(), start.length()), position.length()); + BigInteger startBI = new BigInteger(AccumuloInputSplit.extractBytes(start, maxDepth)); + BigInteger endBI = new BigInteger(AccumuloInputSplit.extractBytes(end, maxDepth)); + BigInteger positionBI = new BigInteger(AccumuloInputSplit.extractBytes(position, maxDepth)); + return (float) (positionBI.subtract(startBI).doubleValue() / endBI.subtract(startBI).doubleValue()); + } + +}