Updated Branches: refs/heads/1.6.0-SNAPSHOT f0f227a79 -> cbbcaac88
ACCUMULO-2104 ACCUMULO-2106 Close the MTBW before deleting the tables in RW If we don't close the MTBW, we have the potential to have failures be retried after we delete the tables (in the teardown of the Fixture). This is, of course, besides the fact that a long-running RW client will just leak resources like mad. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/cbbcaac8 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/cbbcaac8 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/cbbcaac8 Branch: refs/heads/1.6.0-SNAPSHOT Commit: cbbcaac8889e4053f10d215b2fdfb09a085bffe3 Parents: f0f227a Author: Josh Elser <els...@apache.org> Authored: Fri Dec 27 22:52:51 2013 -0500 Committer: Josh Elser <els...@apache.org> Committed: Fri Dec 27 22:52:51 2013 -0500 ---------------------------------------------------------------------- .../org/apache/accumulo/test/randomwalk/State.java | 13 +++++++++++++ .../accumulo/test/randomwalk/image/ImageFixture.java | 15 +++++++++++++++ .../randomwalk/multitable/MultiTableFixture.java | 14 ++++++++++++++ .../randomwalk/sequential/SequentialFixture.java | 14 ++++++++++++++ .../accumulo/test/randomwalk/shard/ShardFixture.java | 15 +++++++++++++++ 5 files changed, 71 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/cbbcaac8/test/src/main/java/org/apache/accumulo/test/randomwalk/State.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/State.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/State.java index fcf9b25..5998eef 100644 --- a/test/src/main/java/org/apache/accumulo/test/randomwalk/State.java +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/State.java @@ -29,6 +29,7 @@ import org.apache.accumulo.core.client.ClientConfiguration; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.MultiTableBatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.ZooKeeperInstance; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.client.security.tokens.PasswordToken; @@ -141,6 +142,18 @@ public class State { return mtbw; } + public boolean isMultiTableBatchWriterInitialized() { + return mtbw != null; + } + + public void resetMultiTableBatchWriter() { + if (!mtbw.isClosed()) { + log.warn("Setting non-closed MultiTableBatchWriter to null (leaking resources)"); + } + + mtbw = null; + } + public String getMapReduceJars() { String acuHome = System.getenv("ACCUMULO_HOME"); http://git-wip-us.apache.org/repos/asf/accumulo/blob/cbbcaac8/test/src/main/java/org/apache/accumulo/test/randomwalk/image/ImageFixture.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/image/ImageFixture.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/image/ImageFixture.java index 5dc8928..744c761 100644 --- a/test/src/main/java/org/apache/accumulo/test/randomwalk/image/ImageFixture.java +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/image/ImageFixture.java @@ -27,6 +27,8 @@ import java.util.TreeSet; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.MultiTableBatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.test.randomwalk.Fixture; @@ -105,7 +107,20 @@ public class ImageFixture extends Fixture { @Override public void tearDown(State state) throws Exception { + // We have resources we need to clean up + if (state.isMultiTableBatchWriterInitialized()) { + MultiTableBatchWriter mtbw = state.getMultiTableBatchWriter(); + try { + mtbw.close(); + } catch (MutationsRejectedException e) { + log.error("Ignoring mutations that weren't flushed", e); + } + + // Reset the MTBW on the state to null + state.resetMultiTableBatchWriter(); + } + // Now we can safely delete the tables log.debug("Dropping tables: " + imageTableName + " " + indexTableName); Connector conn = state.getConnector(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/cbbcaac8/test/src/main/java/org/apache/accumulo/test/randomwalk/multitable/MultiTableFixture.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/multitable/MultiTableFixture.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/multitable/MultiTableFixture.java index 01ebdd7..7ea2eb3 100644 --- a/test/src/main/java/org/apache/accumulo/test/randomwalk/multitable/MultiTableFixture.java +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/multitable/MultiTableFixture.java @@ -20,6 +20,8 @@ import java.net.InetAddress; import java.util.ArrayList; import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MultiTableBatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.test.randomwalk.Fixture; import org.apache.accumulo.test.randomwalk.State; @@ -40,6 +42,18 @@ public class MultiTableFixture extends Fixture { @Override public void tearDown(State state) throws Exception { + // We have resources we need to clean up + if (state.isMultiTableBatchWriterInitialized()) { + MultiTableBatchWriter mtbw = state.getMultiTableBatchWriter(); + try { + mtbw.close(); + } catch (MutationsRejectedException e) { + log.error("Ignoring mutations that weren't flushed", e); + } + + // Reset the MTBW on the state to null + state.resetMultiTableBatchWriter(); + } Connector conn = state.getConnector(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/cbbcaac8/test/src/main/java/org/apache/accumulo/test/randomwalk/sequential/SequentialFixture.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/sequential/SequentialFixture.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/sequential/SequentialFixture.java index 2b89264..2a5cfa1 100644 --- a/test/src/main/java/org/apache/accumulo/test/randomwalk/sequential/SequentialFixture.java +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/sequential/SequentialFixture.java @@ -20,6 +20,8 @@ import java.net.InetAddress; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.MultiTableBatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.test.randomwalk.Fixture; @@ -55,6 +57,18 @@ public class SequentialFixture extends Fixture { @Override public void tearDown(State state) throws Exception { + // We have resources we need to clean up + if (state.isMultiTableBatchWriterInitialized()) { + MultiTableBatchWriter mtbw = state.getMultiTableBatchWriter(); + try { + mtbw.close(); + } catch (MutationsRejectedException e) { + log.error("Ignoring mutations that weren't flushed", e); + } + + // Reset the MTBW on the state to null + state.resetMultiTableBatchWriter(); + } log.debug("Dropping tables: " + seqTableName); http://git-wip-us.apache.org/repos/asf/accumulo/blob/cbbcaac8/test/src/main/java/org/apache/accumulo/test/randomwalk/shard/ShardFixture.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/shard/ShardFixture.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/shard/ShardFixture.java index 8de30ae..a54229f 100644 --- a/test/src/main/java/org/apache/accumulo/test/randomwalk/shard/ShardFixture.java +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/shard/ShardFixture.java @@ -22,6 +22,8 @@ import java.util.SortedSet; import java.util.TreeSet; import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MultiTableBatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.test.randomwalk.Fixture; import org.apache.accumulo.test.randomwalk.State; @@ -94,6 +96,19 @@ public class ShardFixture extends Fixture { @Override public void tearDown(State state) throws Exception { + // We have resources we need to clean up + if (state.isMultiTableBatchWriterInitialized()) { + MultiTableBatchWriter mtbw = state.getMultiTableBatchWriter(); + try { + mtbw.close(); + } catch (MutationsRejectedException e) { + log.error("Ignoring mutations that weren't flushed", e); + } + + // Reset the MTBW on the state to null + state.resetMultiTableBatchWriter(); + } + Connector conn = state.getConnector(); conn.tableOperations().delete((String) state.get("indexTableName"));