Updated Branches: refs/heads/1.6.0-SNAPSHOT cb1243a8a -> 016f3bb10
ACCUMULO-2128 Revert "ACCUMULO-1889 found a few more ZooKeeperInstances that are not closed" This reverts commit 674fa95cacaa9353142071a66006e0ffb65cae94. Conflicts: core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/016f3bb1 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/016f3bb1 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/016f3bb1 Branch: refs/heads/1.6.0-SNAPSHOT Commit: 016f3bb10c43f6461c5d41025b0e07b50f1638a2 Parents: cb1243a Author: Keith Turner <ktur...@apache.org> Authored: Mon Jan 6 15:21:30 2014 -0500 Committer: Keith Turner <ktur...@apache.org> Committed: Mon Jan 6 15:21:30 2014 -0500 ---------------------------------------------------------------------- .../client/mapreduce/AbstractInputFormat.java | 254 +++++++++---------- .../client/mapreduce/AccumuloOutputFormat.java | 5 +- .../mapreduce/lib/util/InputConfigurator.java | 6 +- 3 files changed, 127 insertions(+), 138 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/016f3bb1/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java index 03c6a0a..35587d4 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java @@ -525,148 +525,144 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> { Level logLevel = getLogLevel(context); log.setLevel(logLevel); validateOptions(context); - + LinkedList<InputSplit> splits = new LinkedList<InputSplit>(); Map<String,InputTableConfig> tableConfigs = getInputTableConfigs(context); - Instance instance = getInstance(context); - try { - for (Map.Entry<String,InputTableConfig> tableConfigEntry : tableConfigs.entrySet()) { - - String tableName = tableConfigEntry.getKey(); - InputTableConfig tableConfig = tableConfigEntry.getValue(); - - boolean mockInstance; - String tableId; - // resolve table name to id once, and use id from this point forward - if (instance instanceof MockInstance) { - tableId = ""; - mockInstance = true; - } else { - try { - tableId = Tables.getTableId(instance, tableName); - } catch (TableNotFoundException e) { - throw new IOException(e); - } - mockInstance = false; - } - - Authorizations auths = getScanAuthorizations(context); - String principal = getPrincipal(context); - AuthenticationToken token = getAuthenticationToken(context); - - boolean autoAdjust = tableConfig.shouldAutoAdjustRanges(); - List<Range> ranges = autoAdjust ? Range.mergeOverlapping(tableConfig.getRanges()) : tableConfig.getRanges(); - if (ranges.isEmpty()) { - ranges = new ArrayList<Range>(1); - ranges.add(new Range()); - } - - // get the metadata information for these ranges - Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>(); - TabletLocator tl; + for (Map.Entry<String,InputTableConfig> tableConfigEntry : tableConfigs.entrySet()) { + + String tableName = tableConfigEntry.getKey(); + InputTableConfig tableConfig = tableConfigEntry.getValue(); + + Instance instance = getInstance(context); + boolean mockInstance; + String tableId; + // resolve table name to id once, and use id from this point forward + if (instance instanceof MockInstance) { + tableId = ""; + mockInstance = true; + } else { try { - if (tableConfig.isOfflineScan()) { + tableId = Tables.getTableId(instance, tableName); + } catch (TableNotFoundException e) { + throw new IOException(e); + } + mockInstance = false; + } + + Authorizations auths = getScanAuthorizations(context); + String principal = getPrincipal(context); + AuthenticationToken token = getAuthenticationToken(context); + + boolean autoAdjust = tableConfig.shouldAutoAdjustRanges(); + List<Range> ranges = autoAdjust ? Range.mergeOverlapping(tableConfig.getRanges()) : tableConfig.getRanges(); + if (ranges.isEmpty()) { + ranges = new ArrayList<Range>(1); + ranges.add(new Range()); + } + + // get the metadata information for these ranges + Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>(); + TabletLocator tl; + try { + if (tableConfig.isOfflineScan()) { + binnedRanges = binOfflineTable(context, tableId, ranges); + while (binnedRanges == null) { + // Some tablets were still online, try again + UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms binnedRanges = binOfflineTable(context, tableId, ranges); - while (binnedRanges == null) { - // Some tablets were still online, try again - UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms - binnedRanges = binOfflineTable(context, tableId, ranges); - + + } + } else { + tl = getTabletLocator(context, tableId); + // its possible that the cache could contain complete, but old information about a tables tablets... so clear it + tl.invalidateCache(); + Credentials creds = new Credentials(getPrincipal(context), getAuthenticationToken(context)); + + while (!tl.binRanges(creds, ranges, binnedRanges).isEmpty()) { + if (!(instance instanceof MockInstance)) { + if (!Tables.exists(instance, tableId)) + throw new TableDeletedException(tableId); + if (Tables.getTableState(instance, tableId) == TableState.OFFLINE) + throw new TableOfflineException(instance, tableId); } - } else { - tl = getTabletLocator(context, tableId); - // its possible that the cache could contain complete, but old information about a tables tablets... so clear it + binnedRanges.clear(); + log.warn("Unable to locate bins for specified ranges. Retrying."); + UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms tl.invalidateCache(); - Credentials creds = new Credentials(getPrincipal(context), getAuthenticationToken(context)); - - while (!tl.binRanges(creds, ranges, binnedRanges).isEmpty()) { - if (!(instance instanceof MockInstance)) { - if (!Tables.exists(instance, tableId)) - throw new TableDeletedException(tableId); - if (Tables.getTableState(instance, tableId) == TableState.OFFLINE) - throw new TableOfflineException(instance, tableId); - } - binnedRanges.clear(); - log.warn("Unable to locate bins for specified ranges. Retrying."); - UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms - tl.invalidateCache(); - } } - } catch (Exception e) { - throw new IOException(e); } - - HashMap<Range,ArrayList<String>> splitsToAdd = null; - - if (!autoAdjust) - splitsToAdd = new HashMap<Range,ArrayList<String>>(); - - HashMap<String,String> hostNameCache = new HashMap<String,String>(); - for (Map.Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) { - String ip = tserverBin.getKey().split(":", 2)[0]; - String location = hostNameCache.get(ip); - if (location == null) { - InetAddress inetAddress = InetAddress.getByName(ip); - location = inetAddress.getCanonicalHostName(); - hostNameCache.put(ip, location); - } - for (Map.Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) { - Range ke = extentRanges.getKey().toDataRange(); - for (Range r : extentRanges.getValue()) { - if (autoAdjust) { - // divide ranges into smaller ranges, based on the tablets - RangeInputSplit split = new RangeInputSplit(tableName, tableId, ke.clip(r), new String[] {location}); - - split.setOffline(tableConfig.isOfflineScan()); - split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners()); - split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators()); - split.setMockInstance(mockInstance); - split.setFetchedColumns(tableConfig.getFetchedColumns()); - split.setPrincipal(principal); - split.setToken(token); - split.setInstanceName(instance.getInstanceName()); - split.setZooKeepers(instance.getZooKeepers()); - split.setAuths(auths); - split.setIterators(tableConfig.getIterators()); - split.setLogLevel(logLevel); - - splits.add(split); - } else { - // don't divide ranges - ArrayList<String> locations = splitsToAdd.get(r); - if (locations == null) - locations = new ArrayList<String>(1); - locations.add(location); - splitsToAdd.put(r, locations); - } + } catch (Exception e) { + throw new IOException(e); + } + + HashMap<Range,ArrayList<String>> splitsToAdd = null; + + if (!autoAdjust) + splitsToAdd = new HashMap<Range,ArrayList<String>>(); + + HashMap<String,String> hostNameCache = new HashMap<String,String>(); + for (Map.Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) { + String ip = tserverBin.getKey().split(":", 2)[0]; + String location = hostNameCache.get(ip); + if (location == null) { + InetAddress inetAddress = InetAddress.getByName(ip); + location = inetAddress.getCanonicalHostName(); + hostNameCache.put(ip, location); + } + for (Map.Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) { + Range ke = extentRanges.getKey().toDataRange(); + for (Range r : extentRanges.getValue()) { + if (autoAdjust) { + // divide ranges into smaller ranges, based on the tablets + RangeInputSplit split = new RangeInputSplit(tableName, tableId, ke.clip(r), new String[] {location}); + + split.setOffline(tableConfig.isOfflineScan()); + split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners()); + split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators()); + split.setMockInstance(mockInstance); + split.setFetchedColumns(tableConfig.getFetchedColumns()); + split.setPrincipal(principal); + split.setToken(token); + split.setInstanceName(instance.getInstanceName()); + split.setZooKeepers(instance.getZooKeepers()); + split.setAuths(auths); + split.setIterators(tableConfig.getIterators()); + split.setLogLevel(logLevel); + + splits.add(split); + } else { + // don't divide ranges + ArrayList<String> locations = splitsToAdd.get(r); + if (locations == null) + locations = new ArrayList<String>(1); + locations.add(location); + splitsToAdd.put(r, locations); } } } - - if (!autoAdjust) - for (Map.Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet()) { - RangeInputSplit split = new RangeInputSplit(tableName, tableId, entry.getKey(), entry.getValue().toArray(new String[0])); - - split.setOffline(tableConfig.isOfflineScan()); - split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners()); - split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators()); - split.setMockInstance(mockInstance); - split.setFetchedColumns(tableConfig.getFetchedColumns()); - split.setPrincipal(principal); - split.setToken(token); - split.setInstanceName(instance.getInstanceName()); - split.setZooKeepers(instance.getZooKeepers()); - split.setAuths(auths); - split.setIterators(tableConfig.getIterators()); - split.setLogLevel(logLevel); - - splits.add(split); - } } - return splits; - } finally { - instance.close(); + + if (!autoAdjust) + for (Map.Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet()) { + RangeInputSplit split = new RangeInputSplit(tableName, tableId, entry.getKey(), entry.getValue().toArray(new String[0])); + + split.setOffline(tableConfig.isOfflineScan()); + split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners()); + split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators()); + split.setMockInstance(mockInstance); + split.setFetchedColumns(tableConfig.getFetchedColumns()); + split.setPrincipal(principal); + split.setToken(token); + split.setInstanceName(instance.getInstanceName()); + split.setZooKeepers(instance.getZooKeepers()); + split.setAuths(auths); + split.setIterators(tableConfig.getIterators()); + split.setLogLevel(logLevel); + + splits.add(split); + } } + return splits; } // use reflection to pull the Configuration out of the JobContext for Hadoop 1 and Hadoop 2 compatibility http://git-wip-us.apache.org/repos/asf/accumulo/blob/016f3bb1/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java index b816d43..0c924b1 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java @@ -533,20 +533,17 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { public void checkOutputSpecs(JobContext job) throws IOException { if (!isConnectorInfoSet(job)) throw new IOException("Connector info has not been set."); - Instance instance = getInstance(job); try { // if the instance isn't configured, it will complain here String principal = getPrincipal(job); AuthenticationToken token = getAuthenticationToken(job); - Connector c = instance.getConnector(principal, token); + Connector c = getInstance(job).getConnector(principal, token); if (!c.securityOperations().authenticateUser(principal, token)) throw new IOException("Unable to authenticate user"); } catch (AccumuloException e) { throw new IOException(e); } catch (AccumuloSecurityException e) { throw new IOException(e); - } finally { - instance.close(); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/016f3bb1/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java index 9454d59..7b17d11 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java @@ -625,12 +625,10 @@ public class InputConfigurator extends ConfiguratorBase { if (!"MockInstance".equals(instanceKey) && !"ZooKeeperInstance".equals(instanceKey)) throw new IOException("Instance info has not been set."); // validate that we can connect as configured - Instance inst = getInstance(implementingClass, conf); try { String principal = getPrincipal(implementingClass, conf); AuthenticationToken token = getAuthenticationToken(implementingClass, conf); - - Connector c = inst.getConnector(principal, token); + Connector c = getInstance(implementingClass, conf).getConnector(principal, token); if (!c.securityOperations().authenticateUser(principal, token)) throw new IOException("Unable to authenticate user"); @@ -658,8 +656,6 @@ public class InputConfigurator extends ConfiguratorBase { throw new IOException(e); } catch (TableNotFoundException e) { throw new IOException(e); - } finally { - inst.close(); } }