ACCUMULO-4318 Made writers and scanners auto closeable
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e67317cb Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e67317cb Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e67317cb Branch: refs/heads/master Commit: e67317cb267744cae11872d373223234459600be Parents: 61a7de4 Author: Keith Turner <ke...@deenlo.com> Authored: Thu Jun 2 12:26:02 2016 -0400 Committer: Keith Turner <ke...@deenlo.com> Committed: Thu Jun 2 12:26:02 2016 -0400 ---------------------------------------------------------------------- .../accumulo/core/client/BatchWriter.java | 3 +- .../accumulo/core/client/ConditionalWriter.java | 3 +- .../accumulo/core/client/ScannerBase.java | 5 +- .../core/client/impl/ScannerImplTest.java | 4 + .../impl/TabletServerBatchReaderTest.java | 6 +- .../examples/simple/reservations/ARS.java | 45 +- .../server/util/MasterMetadataUtil.java | 57 +- .../accumulo/server/util/MetadataTableUtil.java | 288 ++--- .../accumulo/gc/SimpleGarbageCollector.java | 4 +- .../accumulo/master/tableOps/CopyFailed.java | 21 +- .../apache/accumulo/tserver/TabletServer.java | 14 +- .../accumulo/test/ConditionalWriterIT.java | 1144 +++++++++--------- .../test/functional/BatchWriterFlushIT.java | 33 +- .../accumulo/test/functional/ReadWriteIT.java | 62 +- .../test/functional/SplitRecoveryIT.java | 72 +- .../test/randomwalk/bulk/ConsistencyCheck.java | 24 +- .../test/randomwalk/conditional/Transfer.java | 94 +- .../test/randomwalk/conditional/Verify.java | 37 +- 18 files changed, 959 insertions(+), 957 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/core/src/main/java/org/apache/accumulo/core/client/BatchWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/BatchWriter.java b/core/src/main/java/org/apache/accumulo/core/client/BatchWriter.java index b4d81aa..95d87c5 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/BatchWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/client/BatchWriter.java @@ -29,7 +29,7 @@ import org.apache.accumulo.core.data.Mutation; * In the event that an MutationsRejectedException exception is thrown by one of the methods on a BatchWriter instance, the user should close the current * instance and create a new instance. This is a known limitation which will be addressed by ACCUMULO-2990 in the future. */ -public interface BatchWriter { +public interface BatchWriter extends AutoCloseable { /** * Queues one mutation to write. @@ -66,6 +66,7 @@ public interface BatchWriter { * @throws MutationsRejectedException * this could be thrown because current or previous mutations failed */ + @Override void close() throws MutationsRejectedException; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java index 62244e6..d13dc09 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java @@ -28,7 +28,7 @@ import org.apache.accumulo.core.data.ConditionalMutation; * * @since 1.6.0 */ -public interface ConditionalWriter { +public interface ConditionalWriter extends AutoCloseable { class Result { private Status status; @@ -131,5 +131,6 @@ public interface ConditionalWriter { /** * release any resources (like threads pools) used by conditional writer */ + @Override void close(); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java b/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java index 354f6f4..2110050 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java +++ b/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java @@ -31,7 +31,7 @@ import org.apache.hadoop.io.Text; * This class hosts configuration methods that are shared between different types of scanners. * */ -public interface ScannerBase extends Iterable<Entry<Key,Value>> { +public interface ScannerBase extends Iterable<Entry<Key,Value>>, AutoCloseable { /** * Add a server-side scan iterator. @@ -160,10 +160,11 @@ public interface ScannerBase extends Iterable<Entry<Key,Value>> { long getTimeout(TimeUnit timeUnit); /** - * Closes any underlying connections on the scanner + * Closes any underlying connections on the scanner. This may invalidate any iterators derived from the Scanner, causing them to throw exceptions. * * @since 1.5.0 */ + @Override void close(); /** http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/core/src/test/java/org/apache/accumulo/core/client/impl/ScannerImplTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/client/impl/ScannerImplTest.java b/core/src/test/java/org/apache/accumulo/core/client/impl/ScannerImplTest.java index eedc61d..38e3c07 100644 --- a/core/src/test/java/org/apache/accumulo/core/client/impl/ScannerImplTest.java +++ b/core/src/test/java/org/apache/accumulo/core/client/impl/ScannerImplTest.java @@ -45,12 +45,14 @@ public class ScannerImplTest { s.setReadaheadThreshold(Long.MAX_VALUE); Assert.assertEquals(Long.MAX_VALUE, s.getReadaheadThreshold()); + s.close(); } @Test(expected = IllegalArgumentException.class) public void testInValidReadaheadValues() { Scanner s = new ScannerImpl(context, "foo", Authorizations.EMPTY); s.setReadaheadThreshold(-1); + s.close(); } @Test @@ -58,8 +60,10 @@ public class ScannerImplTest { Authorizations expected = new Authorizations("a,b"); Scanner s = new ScannerImpl(context, "foo", expected); assertEquals(expected, s.getAuthorizations()); + s.close(); } + @SuppressWarnings("resource") @Test(expected = IllegalArgumentException.class) public void testNullAuthorizationsFails() { new ScannerImpl(context, "foo", null); http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/core/src/test/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderTest.java b/core/src/test/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderTest.java index b31050a..af4a474 100644 --- a/core/src/test/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderTest.java +++ b/core/src/test/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderTest.java @@ -36,10 +36,12 @@ public class TabletServerBatchReaderTest { @Test public void testGetAuthorizations() { Authorizations expected = new Authorizations("a,b"); - BatchScanner s = new TabletServerBatchReader(context, "foo", expected, 1); - assertEquals(expected, s.getAuthorizations()); + try (BatchScanner s = new TabletServerBatchReader(context, "foo", expected, 1)) { + assertEquals(expected, s.getAuthorizations()); + } } + @SuppressWarnings("resource") @Test(expected = IllegalArgumentException.class) public void testNullAuthorizationsFails() { new TabletServerBatchReader(context, "foo", null, 1); http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/examples/simple/src/main/java/org/apache/accumulo/examples/simple/reservations/ARS.java ---------------------------------------------------------------------- diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/reservations/ARS.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/reservations/ARS.java index b9e1a83..d99f7af 100644 --- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/reservations/ARS.java +++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/reservations/ARS.java @@ -20,8 +20,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map.Entry; -import jline.console.ConsoleReader; - import org.apache.accumulo.core.client.ClientConfiguration; import org.apache.accumulo.core.client.ConditionalWriter; import org.apache.accumulo.core.client.ConditionalWriter.Status; @@ -41,6 +39,8 @@ import org.apache.hadoop.io.Text; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import jline.console.ConsoleReader; + /** * Accumulo Reservation System : An example reservation system using Accumulo. Supports atomic reservations of a resource at a date. Wait list are also * supported. In order to keep the example simple, no checking is done of the date. Also the code is inefficient, if interested in improving it take a look at @@ -88,9 +88,9 @@ public class ARS { ReservationResult result = ReservationResult.RESERVED; - ConditionalWriter cwriter = conn.createConditionalWriter(rTable, new ConditionalWriterConfig()); - - try { + // it is important to use an isolated scanner so that only whole mutations are seen + try (ConditionalWriter cwriter = conn.createConditionalWriter(rTable, new ConditionalWriterConfig()); + Scanner scanner = new IsolatedScanner(conn.createScanner(rTable, Authorizations.EMPTY))) { while (true) { Status status = cwriter.write(update).getStatus(); switch (status) { @@ -109,8 +109,6 @@ public class ARS { // that attempted to make a reservation by putting them later in the list. A more complex solution could involve having independent sub-queues within // the row that approximately maintain arrival order and use exponential back off to fairly merge the sub-queues into the main queue. - // it is important to use an isolated scanner so that only whole mutations are seen - Scanner scanner = new IsolatedScanner(conn.createScanner(rTable, Authorizations.EMPTY)); scanner.setRange(new Range(row)); int seq = -1; @@ -152,10 +150,7 @@ public class ARS { else result = ReservationResult.WAIT_LISTED; } - } finally { - cwriter.close(); } - } public void cancel(String what, String when, String who) throws Exception { @@ -166,13 +161,10 @@ public class ARS { // will cause any concurrent reservations to retry. If this delete were done using a batch writer, then a concurrent reservation could report WAIT_LISTED // when it actually got the reservation. - ConditionalWriter cwriter = conn.createConditionalWriter(rTable, new ConditionalWriterConfig()); - - try { + // its important to use an isolated scanner so that only whole mutations are seen + try (ConditionalWriter cwriter = conn.createConditionalWriter(rTable, new ConditionalWriterConfig()); + Scanner scanner = new IsolatedScanner(conn.createScanner(rTable, Authorizations.EMPTY))) { while (true) { - - // its important to use an isolated scanner so that only whole mutations are seen - Scanner scanner = new IsolatedScanner(conn.createScanner(rTable, Authorizations.EMPTY)); scanner.setRange(new Range(row)); int seq = -1; @@ -217,8 +209,6 @@ public class ARS { } } - } finally { - cwriter.close(); } } @@ -226,18 +216,19 @@ public class ARS { String row = what + ":" + when; // its important to use an isolated scanner so that only whole mutations are seen - Scanner scanner = new IsolatedScanner(conn.createScanner(rTable, Authorizations.EMPTY)); - scanner.setRange(new Range(row)); - scanner.fetchColumnFamily(new Text("res")); + try (Scanner scanner = new IsolatedScanner(conn.createScanner(rTable, Authorizations.EMPTY))) { + scanner.setRange(new Range(row)); + scanner.fetchColumnFamily(new Text("res")); - List<String> reservations = new ArrayList<String>(); + List<String> reservations = new ArrayList<String>(); - for (Entry<Key,Value> entry : scanner) { - String val = entry.getValue().toString(); - reservations.add(val); - } + for (Entry<Key,Value> entry : scanner) { + String val = entry.getValue().toString(); + reservations.add(val); + } - return reservations; + return reservations; + } } public static void main(String[] args) throws Exception { http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java index b9e52e3..5aa61bc 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java @@ -16,6 +16,7 @@ */ package org.apache.accumulo.server.util; +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static java.nio.charset.StandardCharsets.UTF_8; import java.io.IOException; @@ -61,8 +62,6 @@ import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; - /** * */ @@ -151,42 +150,44 @@ public class MasterMetadataUtil { // check to see if prev tablet exist in metadata tablet Key prevRowKey = new Key(new Text(KeyExtent.getMetadataEntry(table, metadataPrevEndRow))); - ScannerImpl scanner2 = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY); - scanner2.setRange(new Range(prevRowKey, prevRowKey.followingKey(PartialKey.ROW))); + try (ScannerImpl scanner2 = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY)) { + scanner2.setRange(new Range(prevRowKey, prevRowKey.followingKey(PartialKey.ROW))); + + VolumeManager fs = VolumeManagerImpl.get(); + if (!scanner2.iterator().hasNext()) { + log.info("Rolling back incomplete split " + metadataEntry + " " + metadataPrevEndRow); + MetadataTableUtil.rollBackSplit(metadataEntry, KeyExtent.decodePrevEndRow(oper), context, lock); + return new KeyExtent(metadataEntry, KeyExtent.decodePrevEndRow(oper)); + } else { + log.info("Finishing incomplete split " + metadataEntry + " " + metadataPrevEndRow); - VolumeManager fs = VolumeManagerImpl.get(); - if (!scanner2.iterator().hasNext()) { - log.info("Rolling back incomplete split " + metadataEntry + " " + metadataPrevEndRow); - MetadataTableUtil.rollBackSplit(metadataEntry, KeyExtent.decodePrevEndRow(oper), context, lock); - return new KeyExtent(metadataEntry, KeyExtent.decodePrevEndRow(oper)); - } else { - log.info("Finishing incomplete split " + metadataEntry + " " + metadataPrevEndRow); + List<FileRef> highDatafilesToRemove = new ArrayList<FileRef>(); - List<FileRef> highDatafilesToRemove = new ArrayList<FileRef>(); + SortedMap<FileRef,DataFileValue> origDatafileSizes = new TreeMap<FileRef,DataFileValue>(); + SortedMap<FileRef,DataFileValue> highDatafileSizes = new TreeMap<FileRef,DataFileValue>(); + SortedMap<FileRef,DataFileValue> lowDatafileSizes = new TreeMap<FileRef,DataFileValue>(); - Scanner scanner3 = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY); - Key rowKey = new Key(metadataEntry); + try (Scanner scanner3 = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY)) { + Key rowKey = new Key(metadataEntry); - SortedMap<FileRef,DataFileValue> origDatafileSizes = new TreeMap<FileRef,DataFileValue>(); - SortedMap<FileRef,DataFileValue> highDatafileSizes = new TreeMap<FileRef,DataFileValue>(); - SortedMap<FileRef,DataFileValue> lowDatafileSizes = new TreeMap<FileRef,DataFileValue>(); - scanner3.fetchColumnFamily(DataFileColumnFamily.NAME); - scanner3.setRange(new Range(rowKey, rowKey.followingKey(PartialKey.ROW))); + scanner3.fetchColumnFamily(DataFileColumnFamily.NAME); + scanner3.setRange(new Range(rowKey, rowKey.followingKey(PartialKey.ROW))); - for (Entry<Key,Value> entry : scanner3) { - if (entry.getKey().compareColumnFamily(DataFileColumnFamily.NAME) == 0) { - origDatafileSizes.put(new FileRef(fs, entry.getKey()), new DataFileValue(entry.getValue().get())); + for (Entry<Key,Value> entry : scanner3) { + if (entry.getKey().compareColumnFamily(DataFileColumnFamily.NAME) == 0) { + origDatafileSizes.put(new FileRef(fs, entry.getKey()), new DataFileValue(entry.getValue().get())); + } + } } - } - MetadataTableUtil.splitDatafiles(table, metadataPrevEndRow, splitRatio, new HashMap<FileRef,FileUtil.FileInfo>(), origDatafileSizes, lowDatafileSizes, - highDatafileSizes, highDatafilesToRemove); + MetadataTableUtil.splitDatafiles(table, metadataPrevEndRow, splitRatio, new HashMap<FileRef,FileUtil.FileInfo>(), origDatafileSizes, lowDatafileSizes, + highDatafileSizes, highDatafilesToRemove); - MetadataTableUtil.finishSplit(metadataEntry, highDatafileSizes, highDatafilesToRemove, context, lock); + MetadataTableUtil.finishSplit(metadataEntry, highDatafileSizes, highDatafilesToRemove, context, lock); - return new KeyExtent(metadataEntry, KeyExtent.encodePrevEndRow(metadataPrevEndRow)); + return new KeyExtent(metadataEntry, KeyExtent.encodePrevEndRow(metadataPrevEndRow)); + } } - } private static TServerInstance getTServerInstance(String address, ZooLock zooLock) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java index 5081a9c..416a296 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java @@ -280,24 +280,25 @@ public class MetadataTableUtil { public static SortedMap<FileRef,DataFileValue> getDataFileSizes(KeyExtent extent, ClientContext context) throws IOException { TreeMap<FileRef,DataFileValue> sizes = new TreeMap<FileRef,DataFileValue>(); - Scanner mdScanner = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY); - mdScanner.fetchColumnFamily(DataFileColumnFamily.NAME); - Text row = extent.getMetadataEntry(); - VolumeManager fs = VolumeManagerImpl.get(); + try (Scanner mdScanner = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY)) { + mdScanner.fetchColumnFamily(DataFileColumnFamily.NAME); + Text row = extent.getMetadataEntry(); + VolumeManager fs = VolumeManagerImpl.get(); - Key endKey = new Key(row, DataFileColumnFamily.NAME, new Text("")); - endKey = endKey.followingKey(PartialKey.ROW_COLFAM); + Key endKey = new Key(row, DataFileColumnFamily.NAME, new Text("")); + endKey = endKey.followingKey(PartialKey.ROW_COLFAM); - mdScanner.setRange(new Range(new Key(row), endKey)); - for (Entry<Key,Value> entry : mdScanner) { + mdScanner.setRange(new Range(new Key(row), endKey)); + for (Entry<Key,Value> entry : mdScanner) { - if (!entry.getKey().getRow().equals(row)) - break; - DataFileValue dfv = new DataFileValue(entry.getValue().get()); - sizes.put(new FileRef(fs, entry.getKey()), dfv); - } + if (!entry.getKey().getRow().equals(row)) + break; + DataFileValue dfv = new DataFileValue(entry.getValue().get()); + sizes.put(new FileRef(fs, entry.getKey()), dfv); + } - return sizes; + return sizes; + } } public static void rollBackSplit(Text metadataEntry, Text oldPrevEndRow, ClientContext context, ZooLock zooLock) { @@ -415,60 +416,59 @@ public class MetadataTableUtil { } public static void deleteTable(String tableId, boolean insertDeletes, ClientContext context, ZooLock lock) throws AccumuloException, IOException { - Scanner ms = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY); - BatchWriter bw = new BatchWriterImpl(context, MetadataTable.ID, new BatchWriterConfig().setMaxMemory(1000000).setMaxLatency(120000l, TimeUnit.MILLISECONDS) - .setMaxWriteThreads(2)); + try (Scanner ms = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY); + BatchWriter bw = new BatchWriterImpl(context, MetadataTable.ID, new BatchWriterConfig().setMaxMemory(1000000) + .setMaxLatency(120000l, TimeUnit.MILLISECONDS).setMaxWriteThreads(2))) { - // scan metadata for our table and delete everything we find - Mutation m = null; - ms.setRange(new KeyExtent(tableId, null, null).toMetadataRange()); + // scan metadata for our table and delete everything we find + Mutation m = null; + ms.setRange(new KeyExtent(tableId, null, null).toMetadataRange()); - // insert deletes before deleting data from metadata... this makes the code fault tolerant - if (insertDeletes) { + // insert deletes before deleting data from metadata... this makes the code fault tolerant + if (insertDeletes) { - ms.fetchColumnFamily(DataFileColumnFamily.NAME); - TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(ms); + ms.fetchColumnFamily(DataFileColumnFamily.NAME); + TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(ms); - for (Entry<Key,Value> cell : ms) { - Key key = cell.getKey(); + for (Entry<Key,Value> cell : ms) { + Key key = cell.getKey(); - if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) { - FileRef ref = new FileRef(VolumeManagerImpl.get(), key); - bw.addMutation(createDeleteMutation(tableId, ref.meta().toString())); - } + if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) { + FileRef ref = new FileRef(VolumeManagerImpl.get(), key); + bw.addMutation(createDeleteMutation(tableId, ref.meta().toString())); + } - if (TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) { - bw.addMutation(createDeleteMutation(tableId, cell.getValue().toString())); + if (TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) { + bw.addMutation(createDeleteMutation(tableId, cell.getValue().toString())); + } } - } - bw.flush(); + bw.flush(); - ms.clearColumns(); - } + ms.clearColumns(); + } - for (Entry<Key,Value> cell : ms) { - Key key = cell.getKey(); + for (Entry<Key,Value> cell : ms) { + Key key = cell.getKey(); - if (m == null) { - m = new Mutation(key.getRow()); - if (lock != null) - putLockID(lock, m); + if (m == null) { + m = new Mutation(key.getRow()); + if (lock != null) + putLockID(lock, m); + } + + if (key.getRow().compareTo(m.getRow(), 0, m.getRow().length) != 0) { + bw.addMutation(m); + m = new Mutation(key.getRow()); + if (lock != null) + putLockID(lock, m); + } + m.putDelete(key.getColumnFamily(), key.getColumnQualifier()); } - if (key.getRow().compareTo(m.getRow(), 0, m.getRow().length) != 0) { + if (m != null) bw.addMutation(m); - m = new Mutation(key.getRow()); - if (lock != null) - putLockID(lock, m); - } - m.putDelete(key.getColumnFamily(), key.getColumnQualifier()); } - - if (m != null) - bw.addMutation(m); - - bw.close(); } static String getZookeeperLogLocation() { @@ -521,23 +521,24 @@ public class MetadataTableUtil { } else { String systemTableToCheck = extent.isMeta() ? RootTable.ID : MetadataTable.ID; - Scanner scanner = new ScannerImpl(context, systemTableToCheck, Authorizations.EMPTY); - scanner.fetchColumnFamily(LogColumnFamily.NAME); - scanner.fetchColumnFamily(DataFileColumnFamily.NAME); - scanner.setRange(extent.toMetadataRange()); + try (Scanner scanner = new ScannerImpl(context, systemTableToCheck, Authorizations.EMPTY)) { + scanner.fetchColumnFamily(LogColumnFamily.NAME); + scanner.fetchColumnFamily(DataFileColumnFamily.NAME); + scanner.setRange(extent.toMetadataRange()); - for (Entry<Key,Value> entry : scanner) { - if (!entry.getKey().getRow().equals(extent.getMetadataEntry())) { - throw new RuntimeException("Unexpected row " + entry.getKey().getRow() + " expected " + extent.getMetadataEntry()); - } + for (Entry<Key,Value> entry : scanner) { + if (!entry.getKey().getRow().equals(extent.getMetadataEntry())) { + throw new RuntimeException("Unexpected row " + entry.getKey().getRow() + " expected " + extent.getMetadataEntry()); + } - if (entry.getKey().getColumnFamily().equals(LogColumnFamily.NAME)) { - result.add(LogEntry.fromKeyValue(entry.getKey(), entry.getValue())); - } else if (entry.getKey().getColumnFamily().equals(DataFileColumnFamily.NAME)) { - DataFileValue dfv = new DataFileValue(entry.getValue().get()); - sizes.put(new FileRef(fs, entry.getKey()), dfv); - } else { - throw new RuntimeException("Unexpected col fam " + entry.getKey().getColumnFamily()); + if (entry.getKey().getColumnFamily().equals(LogColumnFamily.NAME)) { + result.add(LogEntry.fromKeyValue(entry.getKey(), entry.getValue())); + } else if (entry.getKey().getColumnFamily().equals(DataFileColumnFamily.NAME)) { + DataFileValue dfv = new DataFileValue(entry.getValue().get()); + sizes.put(new FileRef(fs, entry.getKey()), dfv); + } else { + throw new RuntimeException("Unexpected col fam " + entry.getKey().getColumnFamily()); + } } } } @@ -828,58 +829,56 @@ public class MetadataTableUtil { public static void cloneTable(ClientContext context, String srcTableId, String tableId, VolumeManager volumeManager) throws Exception { Connector conn = context.getConnector(); - BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + try (BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig())) { - while (true) { + while (true) { - try { - initializeClone(MetadataTable.NAME, srcTableId, tableId, conn, bw); + try { + initializeClone(MetadataTable.NAME, srcTableId, tableId, conn, bw); - // the following loop looks changes in the file that occurred during the copy.. if files were dereferenced then they could have been GCed + // the following loop looks changes in the file that occurred during the copy.. if files were dereferenced then they could have been GCed - while (true) { - int rewrites = checkClone(MetadataTable.NAME, srcTableId, tableId, conn, bw); + while (true) { + int rewrites = checkClone(MetadataTable.NAME, srcTableId, tableId, conn, bw); - if (rewrites == 0) - break; - } + if (rewrites == 0) + break; + } - bw.flush(); - break; + bw.flush(); + break; - } catch (TabletIterator.TabletDeletedException tde) { - // tablets were merged in the src table - bw.flush(); + } catch (TabletIterator.TabletDeletedException tde) { + // tablets were merged in the src table + bw.flush(); - // delete what we have cloned and try again - deleteTable(tableId, false, context, null); + // delete what we have cloned and try again + deleteTable(tableId, false, context, null); - log.debug("Tablets merged in table " + srcTableId + " while attempting to clone, trying again"); + log.debug("Tablets merged in table " + srcTableId + " while attempting to clone, trying again"); - sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + } } - } - // delete the clone markers and create directory entries - Scanner mscanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); - mscanner.setRange(new KeyExtent(tableId, null, null).toMetadataRange()); - mscanner.fetchColumnFamily(ClonedColumnFamily.NAME); + // delete the clone markers and create directory entries + Scanner mscanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + mscanner.setRange(new KeyExtent(tableId, null, null).toMetadataRange()); + mscanner.fetchColumnFamily(ClonedColumnFamily.NAME); - int dirCount = 0; + int dirCount = 0; - for (Entry<Key,Value> entry : mscanner) { - Key k = entry.getKey(); - Mutation m = new Mutation(k.getRow()); - m.putDelete(k.getColumnFamily(), k.getColumnQualifier()); - String dir = volumeManager.choose(Optional.of(tableId), ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + tableId - + Path.SEPARATOR + new String(FastFormat.toZeroPaddedString(dirCount++, 8, 16, Constants.CLONE_PREFIX_BYTES)); - TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value(dir.getBytes(UTF_8))); + for (Entry<Key,Value> entry : mscanner) { + Key k = entry.getKey(); + Mutation m = new Mutation(k.getRow()); + m.putDelete(k.getColumnFamily(), k.getColumnQualifier()); + String dir = volumeManager.choose(Optional.of(tableId), ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + tableId + + Path.SEPARATOR + new String(FastFormat.toZeroPaddedString(dirCount++, 8, 16, Constants.CLONE_PREFIX_BYTES)); + TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value(dir.getBytes(UTF_8))); - bw.addMutation(m); + bw.addMutation(m); + } } - - bw.close(); - } public static void chopped(AccumuloServerContext context, KeyExtent extent, ZooLock zooLock) { @@ -889,27 +888,26 @@ public class MetadataTableUtil { } public static void removeBulkLoadEntries(Connector conn, String tableId, long tid) throws Exception { - Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)); - mscanner.setRange(new KeyExtent(tableId, null, null).toMetadataRange()); - mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME); - BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); - for (Entry<Key,Value> entry : mscanner) { - log.debug("Looking at entry " + entry + " with tid " + tid); - if (Long.parseLong(entry.getValue().toString()) == tid) { - log.debug("deleting entry " + entry); - Mutation m = new Mutation(entry.getKey().getRow()); - m.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier()); - bw.addMutation(m); + try (Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)); + BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig())) { + mscanner.setRange(new KeyExtent(tableId, null, null).toMetadataRange()); + mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME); + for (Entry<Key,Value> entry : mscanner) { + log.debug("Looking at entry " + entry + " with tid " + tid); + if (Long.parseLong(entry.getValue().toString()) == tid) { + log.debug("deleting entry " + entry); + Mutation m = new Mutation(entry.getKey().getRow()); + m.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier()); + bw.addMutation(m); + } } } - bw.close(); } public static List<FileRef> getBulkFilesLoaded(Connector conn, KeyExtent extent, long tid) throws IOException { List<FileRef> result = new ArrayList<FileRef>(); - try { + try (Scanner mscanner = new IsolatedScanner(conn.createScanner(extent.isMeta() ? RootTable.NAME : MetadataTable.NAME, Authorizations.EMPTY))) { VolumeManager fs = VolumeManagerImpl.get(); - Scanner mscanner = new IsolatedScanner(conn.createScanner(extent.isMeta() ? RootTable.NAME : MetadataTable.NAME, Authorizations.EMPTY)); mscanner.setRange(extent.toMetadataRange()); mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME); for (Entry<Key,Value> entry : mscanner) { @@ -917,6 +915,7 @@ public class MetadataTableUtil { result.add(new FileRef(fs, entry.getKey())); } } + return result; } catch (TableNotFoundException ex) { // unlikely @@ -929,16 +928,17 @@ public class MetadataTableUtil { Map<Long,List<FileRef>> result = new HashMap<>(); VolumeManager fs = VolumeManagerImpl.get(); - Scanner scanner = new ScannerImpl(context, extent.isMeta() ? RootTable.ID : MetadataTable.ID, Authorizations.EMPTY); - scanner.setRange(new Range(metadataRow)); - scanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME); - for (Entry<Key,Value> entry : scanner) { - Long tid = Long.parseLong(entry.getValue().toString()); - List<FileRef> lst = result.get(tid); - if (lst == null) { - result.put(tid, lst = new ArrayList<>()); + try (Scanner scanner = new ScannerImpl(context, extent.isMeta() ? RootTable.ID : MetadataTable.ID, Authorizations.EMPTY)) { + scanner.setRange(new Range(metadataRow)); + scanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME); + for (Entry<Key,Value> entry : scanner) { + Long tid = Long.parseLong(entry.getValue().toString()); + List<FileRef> lst = result.get(tid); + if (lst == null) { + result.put(tid, lst = new ArrayList<>()); + } + lst.add(new FileRef(fs, entry.getKey())); } - lst.add(new FileRef(fs, entry.getKey())); } return result; } @@ -985,14 +985,15 @@ public class MetadataTableUtil { Range oldDeletesRange = new Range(oldDeletesPrefix, true, "!!~dem", false); // move old delete markers to new location, to standardize table schema between all metadata tables - Scanner scanner = new ScannerImpl(context, RootTable.ID, Authorizations.EMPTY); - scanner.setRange(oldDeletesRange); - for (Entry<Key,Value> entry : scanner) { - String row = entry.getKey().getRow().toString(); - if (row.startsWith(oldDeletesPrefix)) { - moveDeleteEntry(context, RootTable.OLD_EXTENT, entry, row, oldDeletesPrefix); - } else { - break; + try (Scanner scanner = new ScannerImpl(context, RootTable.ID, Authorizations.EMPTY)) { + scanner.setRange(oldDeletesRange); + for (Entry<Key,Value> entry : scanner) { + String row = entry.getKey().getRow().toString(); + if (row.startsWith(oldDeletesPrefix)) { + moveDeleteEntry(context, RootTable.OLD_EXTENT, entry, row, oldDeletesPrefix); + } else { + break; + } } } } @@ -1002,14 +1003,15 @@ public class MetadataTableUtil { KeyExtent notMetadata = new KeyExtent("anythingNotMetadata", null, null); // move delete markers from the normal delete keyspace to the root tablet delete keyspace if the files are for the !METADATA table - Scanner scanner = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY); - scanner.setRange(MetadataSchema.DeletesSection.getRange()); - for (Entry<Key,Value> entry : scanner) { - String row = entry.getKey().getRow().toString(); - if (row.startsWith(MetadataSchema.DeletesSection.getRowPrefix() + "/" + MetadataTable.ID)) { - moveDeleteEntry(context, notMetadata, entry, row, MetadataSchema.DeletesSection.getRowPrefix()); - } else { - break; + try (Scanner scanner = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY)) { + scanner.setRange(MetadataSchema.DeletesSection.getRange()); + for (Entry<Key,Value> entry : scanner) { + String row = entry.getKey().getRow().toString(); + if (row.startsWith(MetadataSchema.DeletesSection.getRowPrefix() + "/" + MetadataTable.ID)) { + moveDeleteEntry(context, notMetadata, entry, row, MetadataSchema.DeletesSection.getRowPrefix()); + } else { + break; + } } } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java ---------------------------------------------------------------------- diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index 5e8c038..cc43802 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -16,6 +16,8 @@ */ package org.apache.accumulo.gc; +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; + import java.io.FileNotFoundException; import java.io.IOException; import java.net.UnknownHostException; @@ -110,7 +112,6 @@ import com.google.common.base.Function; import com.google.common.collect.Iterators; import com.google.common.collect.Maps; import com.google.common.net.HostAndPort; -import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import com.google.protobuf.InvalidProtocolBufferException; public class SimpleGarbageCollector extends AccumuloServerContext implements Iface { @@ -269,6 +270,7 @@ public class SimpleGarbageCollector extends AccumuloServerContext implements Ifa @Override public Iterator<String> getBlipIterator() throws TableNotFoundException, AccumuloException, AccumuloSecurityException { + @SuppressWarnings("resource") IsolatedScanner scanner = new IsolatedScanner(getConnector().createScanner(tableName, Authorizations.EMPTY)); scanner.setRange(MetadataSchema.BlipSection.getRange()); http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/server/master/src/main/java/org/apache/accumulo/master/tableOps/CopyFailed.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CopyFailed.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CopyFailed.java index 068aa81..5fbf3a0 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CopyFailed.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CopyFailed.java @@ -111,16 +111,17 @@ class CopyFailed extends MasterRepo { // determine which failed files were loaded Connector conn = master.getConnector(); - Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)); - mscanner.setRange(new KeyExtent(tableId, null, null).toMetadataRange()); - mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME); - - for (Entry<Key,Value> entry : mscanner) { - if (Long.parseLong(entry.getValue().toString()) == tid) { - FileRef loadedFile = new FileRef(fs, entry.getKey()); - String absPath = failures.remove(loadedFile); - if (absPath != null) { - loadedFailures.put(loadedFile, absPath); + try (Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY))) { + mscanner.setRange(new KeyExtent(tableId, null, null).toMetadataRange()); + mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME); + + for (Entry<Key,Value> entry : mscanner) { + if (Long.parseLong(entry.getValue().toString()) == tid) { + FileRef loadedFile = new FileRef(fs, entry.getKey()); + String absPath = failures.remove(loadedFile); + if (absPath != null) { + loadedFailures.put(loadedFile, absPath); + } } } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/e67317cb/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 1523c55..6427b29 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -142,6 +142,8 @@ import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.ServerServices; import org.apache.accumulo.core.util.ServerServices.Service; import org.apache.accumulo.core.util.SimpleThreadPool; +import org.apache.accumulo.core.util.ratelimit.RateLimiter; +import org.apache.accumulo.core.util.ratelimit.SharedRateLimiterFactory; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.fate.util.LoggingRunnable; import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; @@ -256,8 +258,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.net.HostAndPort; -import org.apache.accumulo.core.util.ratelimit.RateLimiter; -import org.apache.accumulo.core.util.ratelimit.SharedRateLimiterFactory; public class TabletServer extends AccumuloServerContext implements Runnable { @@ -2595,12 +2595,12 @@ public class TabletServer extends AccumuloServerContext implements Runnable { TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN, TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN, TabletsSection.TabletColumnFamily.OLD_PREV_ROW_COLUMN, TabletsSection.ServerColumnFamily.TIME_COLUMN}); - ScannerImpl scanner = new ScannerImpl(context, tableToVerify, Authorizations.EMPTY); - scanner.setRange(extent.toMetadataRange()); - TreeMap<Key,Value> tkv = new TreeMap<Key,Value>(); - for (Entry<Key,Value> entry : scanner) - tkv.put(entry.getKey(), entry.getValue()); + try (ScannerImpl scanner = new ScannerImpl(context, tableToVerify, Authorizations.EMPTY)) { + scanner.setRange(extent.toMetadataRange()); + for (Entry<Key,Value> entry : scanner) + tkv.put(entry.getKey(), entry.getValue()); + } // only populate map after success if (tabletsKeyValues == null) {