Updated Branches: refs/heads/master 16218af38 -> f7fd2989d
ACCUMULO-1889 found a few more ZooKeeperInstances that are not closed Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/674fa95c Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/674fa95c Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/674fa95c Branch: refs/heads/master Commit: 674fa95cacaa9353142071a66006e0ffb65cae94 Parents: 6d083e4 Author: Eric Newton <eric.new...@gmail.com> Authored: Thu Dec 19 16:27:16 2013 -0500 Committer: Eric Newton <eric.new...@gmail.com> Committed: Thu Dec 19 16:27:16 2013 -0500 ---------------------------------------------------------------------- .../client/mapreduce/AbstractInputFormat.java | 254 ++++++++++--------- .../client/mapreduce/AccumuloOutputFormat.java | 5 +- .../mapreduce/lib/util/InputConfigurator.java | 12 +- 3 files changed, 143 insertions(+), 128 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/674fa95c/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java index 2b7e958..9f30563 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java @@ -525,144 +525,148 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> { Level logLevel = getLogLevel(context); log.setLevel(logLevel); validateOptions(context); - + LinkedList<InputSplit> splits = new LinkedList<InputSplit>(); Map<String,InputTableConfig> tableConfigs = getInputTableConfigs(context); - for (Map.Entry<String,InputTableConfig> tableConfigEntry : tableConfigs.entrySet()) { - - String tableName = tableConfigEntry.getKey(); - InputTableConfig tableConfig = tableConfigEntry.getValue(); - - Instance instance = getInstance(context); - boolean mockInstance; - String tableId; - // resolve table name to id once, and use id from this point forward - if (instance instanceof MockInstance) { - tableId = ""; - mockInstance = true; - } else { - try { - tableId = Tables.getTableId(instance, tableName); - } catch (TableNotFoundException e) { - throw new IOException(e); + Instance instance = getInstance(context); + try { + for (Map.Entry<String,InputTableConfig> tableConfigEntry : tableConfigs.entrySet()) { + + String tableName = tableConfigEntry.getKey(); + InputTableConfig tableConfig = tableConfigEntry.getValue(); + + boolean mockInstance; + String tableId; + // resolve table name to id once, and use id from this point forward + if (instance instanceof MockInstance) { + tableId = ""; + mockInstance = true; + } else { + try { + tableId = Tables.getTableId(instance, tableName); + } catch (TableNotFoundException e) { + throw new IOException(e); + } + mockInstance = false; } - mockInstance = false; - } - - Authorizations auths = getScanAuthorizations(context); - String principal = getPrincipal(context); - AuthenticationToken token = getAuthenticationToken(context); - - boolean autoAdjust = tableConfig.shouldAutoAdjustRanges(); - List<Range> ranges = autoAdjust ? Range.mergeOverlapping(tableConfig.getRanges()) : tableConfig.getRanges(); - if (ranges.isEmpty()) { - ranges = new ArrayList<Range>(1); - ranges.add(new Range()); - } - - // get the metadata information for these ranges - Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>(); - TabletLocator tl; - try { - if (tableConfig.isOfflineScan()) { - binnedRanges = binOfflineTable(context, tableId, ranges); - while (binnedRanges == null) { - // Some tablets were still online, try again - UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms + + Authorizations auths = getScanAuthorizations(context); + String principal = getPrincipal(context); + AuthenticationToken token = getAuthenticationToken(context); + + boolean autoAdjust = tableConfig.shouldAutoAdjustRanges(); + List<Range> ranges = autoAdjust ? Range.mergeOverlapping(tableConfig.getRanges()) : tableConfig.getRanges(); + if (ranges.isEmpty()) { + ranges = new ArrayList<Range>(1); + ranges.add(new Range()); + } + + // get the metadata information for these ranges + Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>(); + TabletLocator tl; + try { + if (tableConfig.isOfflineScan()) { binnedRanges = binOfflineTable(context, tableId, ranges); - - } - } else { - tl = getTabletLocator(context, tableId); - // its possible that the cache could contain complete, but old information about a tables tablets... so clear it - tl.invalidateCache(); - Credentials creds = new Credentials(getPrincipal(context), getAuthenticationToken(context)); - - while (!tl.binRanges(creds, ranges, binnedRanges).isEmpty()) { - if (!(instance instanceof MockInstance)) { - if (!Tables.exists(instance, tableId)) - throw new TableDeletedException(tableId); - if (Tables.getTableState(instance, tableId) == TableState.OFFLINE) - throw new TableOfflineException(instance, tableId); + while (binnedRanges == null) { + // Some tablets were still online, try again + UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms + binnedRanges = binOfflineTable(context, tableId, ranges); + } - binnedRanges.clear(); - log.warn("Unable to locate bins for specified ranges. Retrying."); - UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms + } else { + tl = getTabletLocator(context, tableId); + // its possible that the cache could contain complete, but old information about a tables tablets... so clear it tl.invalidateCache(); + Credentials creds = new Credentials(getPrincipal(context), getAuthenticationToken(context)); + + while (!tl.binRanges(creds, ranges, binnedRanges).isEmpty()) { + if (!(instance instanceof MockInstance)) { + if (!Tables.exists(instance, tableId)) + throw new TableDeletedException(tableId); + if (Tables.getTableState(instance, tableId) == TableState.OFFLINE) + throw new TableOfflineException(instance, tableId); + } + binnedRanges.clear(); + log.warn("Unable to locate bins for specified ranges. Retrying."); + UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between 100 and 200 ms + tl.invalidateCache(); + } } + } catch (Exception e) { + throw new IOException(e); } - } catch (Exception e) { - throw new IOException(e); - } - - HashMap<Range,ArrayList<String>> splitsToAdd = null; - - if (!autoAdjust) - splitsToAdd = new HashMap<Range,ArrayList<String>>(); - - HashMap<String,String> hostNameCache = new HashMap<String,String>(); - for (Map.Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) { - String ip = tserverBin.getKey().split(":", 2)[0]; - String location = hostNameCache.get(ip); - if (location == null) { - InetAddress inetAddress = InetAddress.getByName(ip); - location = inetAddress.getHostName(); - hostNameCache.put(ip, location); - } - for (Map.Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) { - Range ke = extentRanges.getKey().toDataRange(); - for (Range r : extentRanges.getValue()) { - if (autoAdjust) { - // divide ranges into smaller ranges, based on the tablets - RangeInputSplit split = new RangeInputSplit(tableName, tableId, ke.clip(r), new String[] {location}); - - split.setOffline(tableConfig.isOfflineScan()); - split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners()); - split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators()); - split.setMockInstance(mockInstance); - split.setFetchedColumns(tableConfig.getFetchedColumns()); - split.setPrincipal(principal); - split.setToken(token); - split.setInstanceName(instance.getInstanceName()); - split.setZooKeepers(instance.getZooKeepers()); - split.setAuths(auths); - split.setIterators(tableConfig.getIterators()); - split.setLogLevel(logLevel); - - splits.add(split); - } else { - // don't divide ranges - ArrayList<String> locations = splitsToAdd.get(r); - if (locations == null) - locations = new ArrayList<String>(1); - locations.add(location); - splitsToAdd.put(r, locations); + + HashMap<Range,ArrayList<String>> splitsToAdd = null; + + if (!autoAdjust) + splitsToAdd = new HashMap<Range,ArrayList<String>>(); + + HashMap<String,String> hostNameCache = new HashMap<String,String>(); + for (Map.Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) { + String ip = tserverBin.getKey().split(":", 2)[0]; + String location = hostNameCache.get(ip); + if (location == null) { + InetAddress inetAddress = InetAddress.getByName(ip); + location = inetAddress.getHostName(); + hostNameCache.put(ip, location); + } + for (Map.Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) { + Range ke = extentRanges.getKey().toDataRange(); + for (Range r : extentRanges.getValue()) { + if (autoAdjust) { + // divide ranges into smaller ranges, based on the tablets + RangeInputSplit split = new RangeInputSplit(tableName, tableId, ke.clip(r), new String[] {location}); + + split.setOffline(tableConfig.isOfflineScan()); + split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners()); + split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators()); + split.setMockInstance(mockInstance); + split.setFetchedColumns(tableConfig.getFetchedColumns()); + split.setPrincipal(principal); + split.setToken(token); + split.setInstanceName(instance.getInstanceName()); + split.setZooKeepers(instance.getZooKeepers()); + split.setAuths(auths); + split.setIterators(tableConfig.getIterators()); + split.setLogLevel(logLevel); + + splits.add(split); + } else { + // don't divide ranges + ArrayList<String> locations = splitsToAdd.get(r); + if (locations == null) + locations = new ArrayList<String>(1); + locations.add(location); + splitsToAdd.put(r, locations); + } } } } + + if (!autoAdjust) + for (Map.Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet()) { + RangeInputSplit split = new RangeInputSplit(tableName, tableId, entry.getKey(), entry.getValue().toArray(new String[0])); + + split.setOffline(tableConfig.isOfflineScan()); + split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners()); + split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators()); + split.setMockInstance(mockInstance); + split.setFetchedColumns(tableConfig.getFetchedColumns()); + split.setPrincipal(principal); + split.setToken(token); + split.setInstanceName(instance.getInstanceName()); + split.setZooKeepers(instance.getZooKeepers()); + split.setAuths(auths); + split.setIterators(tableConfig.getIterators()); + split.setLogLevel(logLevel); + + splits.add(split); + } } - - if (!autoAdjust) - for (Map.Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet()) { - RangeInputSplit split = new RangeInputSplit(tableName, tableId, entry.getKey(), entry.getValue().toArray(new String[0])); - - split.setOffline(tableConfig.isOfflineScan()); - split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners()); - split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators()); - split.setMockInstance(mockInstance); - split.setFetchedColumns(tableConfig.getFetchedColumns()); - split.setPrincipal(principal); - split.setToken(token); - split.setInstanceName(instance.getInstanceName()); - split.setZooKeepers(instance.getZooKeepers()); - split.setAuths(auths); - split.setIterators(tableConfig.getIterators()); - split.setLogLevel(logLevel); - - splits.add(split); - } + return splits; + } finally { + instance.close(); } - return splits; } // use reflection to pull the Configuration out of the JobContext for Hadoop 1 and Hadoop 2 compatibility http://git-wip-us.apache.org/repos/asf/accumulo/blob/674fa95c/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java index 0c924b1..b816d43 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java @@ -533,17 +533,20 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { public void checkOutputSpecs(JobContext job) throws IOException { if (!isConnectorInfoSet(job)) throw new IOException("Connector info has not been set."); + Instance instance = getInstance(job); try { // if the instance isn't configured, it will complain here String principal = getPrincipal(job); AuthenticationToken token = getAuthenticationToken(job); - Connector c = getInstance(job).getConnector(principal, token); + Connector c = instance.getConnector(principal, token); if (!c.securityOperations().authenticateUser(principal, token)) throw new IOException("Unable to authenticate user"); } catch (AccumuloException e) { throw new IOException(e); } catch (AccumuloSecurityException e) { throw new IOException(e); + } finally { + instance.close(); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/674fa95c/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java index 7b17d11..7419d9b 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/InputConfigurator.java @@ -601,7 +601,11 @@ public class InputConfigurator extends ConfiguratorBase { if ("MockInstance".equals(instanceType)) return new MockTabletLocator(); Instance instance = getInstance(implementingClass, conf); - return TabletLocator.getLocator(instance, new Text(tableId)); + try { + return TabletLocator.getLocator(instance, new Text(tableId)); + } finally { + instance.close(); + } } // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job) @@ -625,10 +629,12 @@ public class InputConfigurator extends ConfiguratorBase { if (!"MockInstance".equals(instanceKey) && !"ZooKeeperInstance".equals(instanceKey)) throw new IOException("Instance info has not been set."); // validate that we can connect as configured + Instance inst = getInstance(implementingClass, conf); try { String principal = getPrincipal(implementingClass, conf); AuthenticationToken token = getAuthenticationToken(implementingClass, conf); - Connector c = getInstance(implementingClass, conf).getConnector(principal, token); + + Connector c = inst.getConnector(principal, token); if (!c.securityOperations().authenticateUser(principal, token)) throw new IOException("Unable to authenticate user"); @@ -656,6 +662,8 @@ public class InputConfigurator extends ConfiguratorBase { throw new IOException(e); } catch (TableNotFoundException e) { throw new IOException(e); + } finally { + inst.close(); } }