ACCUMULO-3320 Integration test to ensure that WALs are not closed when a tserver may continue to use it.
Also test for the converse that after a WAL is unused by a tserver, it is still closed. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/09cb6b2a Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/09cb6b2a Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/09cb6b2a Branch: refs/heads/master Commit: 09cb6b2a82d6634c1bd991ed4a0012fb87a459f3 Parents: 84191c5 Author: Josh Elser <[email protected]> Authored: Sun Nov 9 12:19:05 2014 -0500 Committer: Josh Elser <[email protected]> Committed: Mon Nov 10 13:34:51 2014 -0800 ---------------------------------------------------------------------- .../org/apache/accumulo/core/conf/Property.java | 12 +- .../core/replication/ReplicationTable.java | 3 + .../CloseWriteAheadLogReferences.java | 11 +- .../master/replication/ReplicationDriver.java | 6 + ...arbageCollectorCommunicatesWithTServers.java | 446 +++++++++++++++++++ 5 files changed, 470 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/09cb6b2a/core/src/main/java/org/apache/accumulo/core/conf/Property.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index f59b654..1195668 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -206,12 +206,12 @@ public enum Property { TSERV_CLIENTPORT("tserver.port.client", "9997", PropertyType.PORT, "The port used for handling client connections on the tablet servers"), @Deprecated TSERV_MUTATION_QUEUE_MAX("tserver.mutation.queue.max", "1M", PropertyType.MEMORY, - "This setting is deprecated. See tserver.total.mutation.queue.max. " + "This setting is deprecated. See tserver.total.mutation.queue.max. " + "The amount of memory to use to store write-ahead-log mutations-per-session before flushing them. Since the buffer is per write session, consider the" + " max number of concurrent writer when configuring. When using Hadoop 2, Accumulo will call hsync() on the WAL . For a small number of " + "concurrent writers, increasing this buffer size decreases the frequncy of hsync calls. For a large number of concurrent writers a small buffers " + "size is ok because of group commit."), - TSERV_TOTAL_MUTATION_QUEUE_MAX("tserver.total.mutation.queue.max", "50M", PropertyType.MEMORY, + TSERV_TOTAL_MUTATION_QUEUE_MAX("tserver.total.mutation.queue.max", "50M", PropertyType.MEMORY, "The amount of memory used to store write-ahead-log mutations before flushing them."), TSERV_TABLET_SPLIT_FINDMIDPOINT_MAXOPEN("tserver.tablet.split.midpoint.files.max", "30", PropertyType.COUNT, "To find a tablets split points, all index files are opened. This setting determines how many index " @@ -502,6 +502,8 @@ public enum Property { REPLICATION_MAX_UNIT_SIZE("replication.max.unit.size", "64M", PropertyType.MEMORY, "Maximum size of data to send in a replication message"), REPLICATION_WORK_ASSIGNER("replication.work.assigner", "org.apache.accumulo.master.replication.UnorderedWorkAssigner", PropertyType.CLASSNAME, "Replication WorkAssigner implementation to use"), + REPLICATION_DRIVER_DELAY("replication.driver.delay", "0s", PropertyType.TIMEDURATION, + "Amount of time to wait before the replication work loop begins in the master."), REPLICATION_WORK_PROCESSOR_DELAY("replication.work.processor.delay", "0s", PropertyType.TIMEDURATION, "Amount of time to wait before first checking for replication work, not useful outside of tests"), REPLICATION_WORK_PROCESSOR_PERIOD("replication.work.processor.period", "0s", PropertyType.TIMEDURATION, "Amount of time to wait before re-checking for replication work, not useful outside of tests"), @@ -814,7 +816,7 @@ public enum Property { /** * Creates a new instance of a class specified in a configuration property. The table classpath context is used if set. - * + * * @param conf * configuration containing property * @param property @@ -835,7 +837,7 @@ public enum Property { /** * Creates a new instance of a class specified in a configuration property. - * + * * @param conf * configuration containing property * @param property @@ -856,7 +858,7 @@ public enum Property { /** * Collects together properties from the given configuration pertaining to compaction strategies. The relevant properties all begin with the prefix in * {@link #TABLE_COMPACTION_STRATEGY_PREFIX}. In the returned map, the prefix is removed from each property's key. - * + * * @param tableConf * configuration * @return map of compaction strategy property keys and values, with the detection prefix removed from each key http://git-wip-us.apache.org/repos/asf/accumulo/blob/09cb6b2a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java index ec7c202..c6f8ada 100644 --- a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java +++ b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java @@ -37,10 +37,12 @@ import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection; import org.apache.accumulo.core.security.Authorizations; import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; import com.google.common.collect.ImmutableMap; public class ReplicationTable { + private static final Logger log = Logger.getLogger(ReplicationTable.class); public static final String ID = "+rep"; public static final String NAME = Namespaces.ACCUMULO_NAMESPACE + ".replication"; @@ -90,6 +92,7 @@ public class ReplicationTable { public static void setOnline(Connector conn) throws AccumuloSecurityException, AccumuloException { try { + log.info("Bringing replication table online"); conn.tableOperations().online(NAME, true); } catch (TableNotFoundException e) { throw new AssertionError(NAME + " should exist, but doesn't."); http://git-wip-us.apache.org/repos/asf/accumulo/blob/09cb6b2a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java ---------------------------------------------------------------------- diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java index cb74f18..a41e965 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java @@ -39,6 +39,7 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.file.rfile.RFile; import org.apache.accumulo.core.master.thrift.MasterClientService; import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; @@ -121,6 +122,7 @@ public class CloseWriteAheadLogReferences implements Runnable { } log.info("Found " + referencedWals.size() + " WALs referenced in metadata in " + sw.toString()); + log.debug("Referenced WALs: " + referencedWals); sw.reset(); /* @@ -202,6 +204,8 @@ public class CloseWriteAheadLogReferences implements Runnable { // The value may contain multiple WALs LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue()); + log.debug("Found WALs for table(" + logEntry.extent.getTableId() + "): " + logEntry.logSet); + // Normalize each log file (using Path) and add it to the set for (String logFile : logEntry.logSet) { referencedWals.add(normalizedWalPaths.get(logFile)); @@ -251,12 +255,13 @@ public class CloseWriteAheadLogReferences implements Runnable { } // Ignore things that aren't completely replicated as we can't delete those anyways - entry.getKey().getRow(replFileText); - String replFile = replFileText.toString().substring(ReplicationSection.getRowPrefix().length()); + MetadataSchema.ReplicationSection.getFile(entry.getKey(), replFileText); + String replFile = replFileText.toString(); + boolean isReferenced = referencedWals.contains(replFile); // We only want to clean up WALs (which is everything but rfiles) and only when // metadata doesn't have a reference to the given WAL - if (!status.getClosed() && !replFile.endsWith(RFILE_SUFFIX) && !referencedWals.contains(replFile)) { + if (!status.getClosed() && !replFile.endsWith(RFILE_SUFFIX) && !isReferenced) { try { closeWal(bw, entry.getKey()); recordsClosed++; http://git-wip-us.apache.org/repos/asf/accumulo/blob/09cb6b2a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java index ea378f4..f822a90 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java +++ b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java @@ -55,6 +55,12 @@ public class ReplicationDriver extends Daemon { public void run() { CountSampler sampler = new CountSampler(10); + long millisToWait = conf.getTimeInMillis(Property.REPLICATION_DRIVER_DELAY); + log.debug("Waiting " + millisToWait + "ms before starting main replication loop"); + UtilWaitThread.sleep(millisToWait); + + log.debug("Starting replication loop"); + while (master.stillMaster()) { if (null == workMaker) { try { http://git-wip-us.apache.org/repos/asf/accumulo/blob/09cb6b2a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServers.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServers.java b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServers.java new file mode 100644 index 0000000..dff2726 --- /dev/null +++ b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServers.java @@ -0,0 +1,446 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.replication; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.impl.ClientConfigurationHelper; +import org.apache.accumulo.core.client.impl.ClientExecReturn; +import org.apache.accumulo.core.client.impl.MasterClient; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.master.thrift.MasterClientService; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.protobuf.ProtobufUtil; +import org.apache.accumulo.core.replication.ReplicationTable; +import org.apache.accumulo.core.replication.proto.Replication.Status; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.Credentials; +import org.apache.accumulo.core.security.thrift.TCredentials; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client; +import org.apache.accumulo.core.trace.Tracer; +import org.apache.accumulo.core.util.ThriftUtil; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.functional.AbstractMacIT; +import org.apache.accumulo.test.functional.ConfigurableMacIT; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.io.Text; +import org.bouncycastle.util.Arrays; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ACCUMULO-3302 series of tests which ensure that a WAL is prematurely closed when a TServer may still continue to use it. Checking that no tablet references a + * WAL is insufficient to determine if a WAL will never be used in the future. + */ +public class GarbageCollectorCommunicatesWithTServers extends ConfigurableMacIT { + private static final Logger log = LoggerFactory.getLogger(GarbageCollectorCommunicatesWithTServers.class); + + private final int GC_PERIOD_SECONDS = 1; + + @Override + public int defaultTimeoutSeconds() { + return 2 * 60; + } + + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration coreSite) { + cfg.setNumTservers(1); + cfg.setProperty(Property.GC_CYCLE_DELAY, GC_PERIOD_SECONDS + "s"); + // Wait longer to try to let the replication table come online before a cycle runs + cfg.setProperty(Property.GC_CYCLE_START, "10s"); + cfg.setProperty(Property.REPLICATION_NAME, "master"); + // Set really long delays for the master to do stuff for replication + cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_DELAY, "240s"); + cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "240s"); + cfg.setProperty(Property.REPLICATION_DRIVER_DELAY, "240s"); + cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M"); + coreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + } + + /** + * Fetch all of the WALs referenced by tablets in the metadata table for this table + */ + private Set<String> getWalsForTable(String tableName) throws Exception { + final Connector conn = getConnector(); + final String tableId = conn.tableOperations().tableIdMap().get(tableName); + + Assert.assertNotNull("Could not determine table ID for " + tableName, tableId); + + Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + Range r = MetadataSchema.TabletsSection.getRange(tableId); + s.setRange(r); + s.fetchColumnFamily(MetadataSchema.TabletsSection.LogColumnFamily.NAME); + + Set<String> wals = new HashSet<String>(); + for (Entry<Key,Value> entry : s) { + log.debug("Reading WALs: {}={}", entry.getKey().toStringNoTruncate(), entry.getValue()); + // hostname:port/uri://path/to/wal + String cq = entry.getKey().getColumnQualifier().toString(); + int index = cq.indexOf('/'); + // Normalize the path + String path = new Path(cq.substring(index + 1)).toString(); + log.debug("Extracted file: " + path); + wals.add(path); + } + + return wals; + } + + /** + * Fetch all of the rfiles referenced by tablets in the metadata table for this table + */ + private Set<String> getFilesForTable(String tableName) throws Exception { + final Connector conn = getConnector(); + final String tableId = conn.tableOperations().tableIdMap().get(tableName); + + Assert.assertNotNull("Could not determine table ID for " + tableName, tableId); + + Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + Range r = MetadataSchema.TabletsSection.getRange(tableId); + s.setRange(r); + s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME); + + Set<String> rfiles = new HashSet<String>(); + for (Entry<Key,Value> entry : s) { + log.debug("Reading RFiles: {}={}", entry.getKey().toStringNoTruncate(), entry.getValue()); + // uri://path/to/wal + String cq = entry.getKey().getColumnQualifier().toString(); + String path = new Path(cq).toString(); + log.debug("Normalize path to rfile: {}", path); + rfiles.add(path); + } + + return rfiles; + } + + /** + * Get the replication status messages for the given table that exist in the metadata table (~repl entries) + */ + private Map<String,Status> getMetadataStatusForTable(String tableName) throws Exception { + final Connector conn = getConnector(); + final String tableId = conn.tableOperations().tableIdMap().get(tableName); + + Assert.assertNotNull("Could not determine table ID for " + tableName, tableId); + + Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + Range r = MetadataSchema.ReplicationSection.getRange(); + s.setRange(r); + s.fetchColumn(MetadataSchema.ReplicationSection.COLF, new Text(tableId)); + + Map<String,Status> fileToStatus = new HashMap<String,Status>(); + for (Entry<Key,Value> entry : s) { + Text file = new Text(); + MetadataSchema.ReplicationSection.getFile(entry.getKey(), file); + Status status = Status.parseFrom(entry.getValue().get()); + log.info("Got status for {}: {}", file, ProtobufUtil.toString(status)); + fileToStatus.put(file.toString(), status); + } + + return fileToStatus; + } + + @Test + public void testActiveWalPrecludesClosing() throws Exception { + final String table = getUniqueNames(1)[0]; + final Connector conn = getConnector(); + + // Bring the replication table online first and foremost + ReplicationTable.setOnline(conn); + + log.info("Creating {}", table); + conn.tableOperations().create(table); + + conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION.getKey(), "true"); + + log.info("Writing a few mutations to the table"); + + BatchWriter bw = conn.createBatchWriter(table, null); + + byte[] empty = new byte[0]; + for (int i = 0; i < 5; i++) { + Mutation m = new Mutation(Integer.toString(i)); + m.put(empty, empty, empty); + bw.addMutation(m); + } + + log.info("Flushing mutations to the server"); + bw.flush(); + + log.info("Checking that metadata only has one WAL recorded for this table"); + + Set<String> wals = getWalsForTable(table); + Assert.assertEquals("Expected to only find one WAL for the table", 1, wals.size()); + + log.info("Compacting the table which will remove all WALs from the tablets"); + + // Flush our test table to remove the WAL references in it + conn.tableOperations().flush(table, null, null, true); + // Flush the metadata table too because it will have a reference to the WAL + conn.tableOperations().flush(MetadataTable.NAME, null, null, true); + + log.info("Waiting for replication table to come online"); + + log.info("Fetching replication statuses from metadata table"); + + Map<String,Status> fileToStatus = getMetadataStatusForTable(table); + + Assert.assertEquals("Expected to only find one replication status message", 1, fileToStatus.size()); + + String walName = fileToStatus.keySet().iterator().next(); + Assert.assertEquals("Expected log file name from tablet to equal replication entry", wals.iterator().next(), walName); + + Status status = fileToStatus.get(walName); + + Assert.assertEquals("Expected Status for file to not be closed", false, status.getClosed()); + + log.info("Checking to see that log entries are removed from tablet section after MinC"); + // After compaction, the log column should be gone from the tablet + Set<String> walsAfterMinc = getWalsForTable(table); + Assert.assertEquals("Expected to find no WALs for tablet", 0, walsAfterMinc.size()); + + Set<String> filesForTable = getFilesForTable(table); + Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTable.size()); + log.info("Files for table before MajC: {}", filesForTable); + + // Issue a MajC to roll a new file in HDFS + conn.tableOperations().compact(table, null, null, false, true); + + Set<String> filesForTableAfterCompaction = getFilesForTable(table); + + log.info("Files for table after MajC: {}", filesForTableAfterCompaction); + + Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTableAfterCompaction.size()); + Assert.assertNotEquals("Expected the files before and after compaction to differ", filesForTableAfterCompaction, filesForTable); + + // Use the rfile which was just replaced by the MajC to determine when the GC has ran + Path fileToBeDeleted = new Path(filesForTable.iterator().next()); + FileSystem fs = fileToBeDeleted.getFileSystem(new Configuration()); + + boolean fileExists = fs.exists(fileToBeDeleted); + while (fileExists) { + log.info("File which should get deleted still exists: {}", fileToBeDeleted); + Thread.sleep(2000); + fileExists = fs.exists(fileToBeDeleted); + } + + // At this point in time, we *know* that the GarbageCollector has run which means that the Status + // for our WAL should not be altered. + + log.info("Re-checking that WALs are still not referenced for our table"); + + Set<String> walsAfterMajc = getWalsForTable(table); + Assert.assertEquals("Expected to find no WALs in tablets section: " + walsAfterMajc, 0, walsAfterMajc.size()); + + Map<String,Status> fileToStatusAfterMinc = getMetadataStatusForTable(table); + Assert.assertEquals("Expected to still find only one replication status message: " + fileToStatusAfterMinc, 1, fileToStatusAfterMinc.size()); + + Assert.assertEquals("Status before and after MinC should be identical", fileToStatus, fileToStatusAfterMinc); + } + + @Test + public void testUnreferencedWalInTserverIsClosed() throws Exception { + final String[] names = getUniqueNames(2); + // `table` will be replicated, `otherTable` is only used to roll the WAL on the tserver + final String table = names[0], otherTable = names[1]; + final Connector conn = getConnector(); + + // Bring the replication table online first and foremost + ReplicationTable.setOnline(conn); + + log.info("Creating {}", table); + conn.tableOperations().create(table); + + conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION.getKey(), "true"); + + log.info("Writing a few mutations to the table"); + + BatchWriter bw = conn.createBatchWriter(table, null); + + byte[] empty = new byte[0]; + for (int i = 0; i < 5; i++) { + Mutation m = new Mutation(Integer.toString(i)); + m.put(empty, empty, empty); + bw.addMutation(m); + } + + log.info("Flushing mutations to the server"); + bw.close(); + + log.info("Checking that metadata only has one WAL recorded for this table"); + + Set<String> wals = getWalsForTable(table); + Assert.assertEquals("Expected to only find one WAL for the table", 1, wals.size()); + + log.info("Compacting the table which will remove all WALs from the tablets"); + + // Flush our test table to remove the WAL references in it + conn.tableOperations().flush(table, null, null, true); + // Flush the metadata table too because it will have a reference to the WAL + conn.tableOperations().flush(MetadataTable.NAME, null, null, true); + + log.info("Waiting for replication table to come online"); + + log.info("Fetching replication statuses from metadata table"); + + Map<String,Status> fileToStatus = getMetadataStatusForTable(table); + + Assert.assertEquals("Expected to only find one replication status message", 1, fileToStatus.size()); + + String walName = fileToStatus.keySet().iterator().next(); + Assert.assertEquals("Expected log file name from tablet to equal replication entry", wals.iterator().next(), walName); + + Status status = fileToStatus.get(walName); + + Assert.assertEquals("Expected Status for file to not be closed", false, status.getClosed()); + + log.info("Checking to see that log entries are removed from tablet section after MinC"); + // After compaction, the log column should be gone from the tablet + Set<String> walsAfterMinc = getWalsForTable(table); + Assert.assertEquals("Expected to find no WALs for tablet", 0, walsAfterMinc.size()); + + Set<String> filesForTable = getFilesForTable(table); + Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTable.size()); + log.info("Files for table before MajC: {}", filesForTable); + + // Issue a MajC to roll a new file in HDFS + conn.tableOperations().compact(table, null, null, false, true); + + Set<String> filesForTableAfterCompaction = getFilesForTable(table); + + log.info("Files for table after MajC: {}", filesForTableAfterCompaction); + + Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTableAfterCompaction.size()); + Assert.assertNotEquals("Expected the files before and after compaction to differ", filesForTableAfterCompaction, filesForTable); + + // Use the rfile which was just replaced by the MajC to determine when the GC has ran + Path fileToBeDeleted = new Path(filesForTable.iterator().next()); + FileSystem fs = fileToBeDeleted.getFileSystem(new Configuration()); + + boolean fileExists = fs.exists(fileToBeDeleted); + while (fileExists) { + log.info("File which should get deleted still exists: {}", fileToBeDeleted); + Thread.sleep(2000); + fileExists = fs.exists(fileToBeDeleted); + } + + // At this point in time, we *know* that the GarbageCollector has run which means that the Status + // for our WAL should not be altered. + + log.info("Re-checking that WALs are still not referenced for our table"); + + Set<String> walsAfterMajc = getWalsForTable(table); + Assert.assertEquals("Expected to find no WALs in tablets section: " + walsAfterMajc, 0, walsAfterMajc.size()); + + Map<String,Status> fileToStatusAfterMinc = getMetadataStatusForTable(table); + Assert.assertEquals("Expected to still find only one replication status message: " + fileToStatusAfterMinc, 1, fileToStatusAfterMinc.size()); + + Assert.assertEquals("Status before and after MinC should be identical", fileToStatus, fileToStatusAfterMinc); + + /* + * To verify that the WALs is still getting closed, we have to force the tserver to close the existing WAL and open a new one instead. The easiest way to do + * this is to write a load of data that will exceed the 1.33% full threshold that the logger keeps track of + */ + + conn.tableOperations().create(otherTable); + bw = conn.createBatchWriter(otherTable, null); + // 500k + byte[] bigValue = new byte[1024 * 500]; + Arrays.fill(bigValue, (byte)1); + // 500k * 50 + for (int i = 0; i < 50; i++) { + Mutation m = new Mutation(Integer.toString(i)); + m.put(empty, empty, bigValue); + bw.addMutation(m); + if (i % 10 == 0) { + bw.flush(); + } + } + + bw.close(); + + conn.tableOperations().flush(otherTable, null, null, true); + + final TCredentials tcreds = new Credentials("root", new PasswordToken(AbstractMacIT.ROOT_PASSWORD)).toThrift(conn.getInstance()); + + // Get the tservers which the master deems as active + List<String> tservers = MasterClient.execute(conn.getInstance(), new ClientExecReturn<List<String>,MasterClientService.Client>() { + @Override + public List<String> execute(MasterClientService.Client client) throws Exception { + return client.getActiveTservers(Tracer.traceInfo(), tcreds); + } + }); + + Assert.assertEquals("Expected only one active tservers", 1, tservers.size()); + + String tserver = tservers.get(0); + AccumuloConfiguration rpcConfig = ClientConfigurationHelper.getClientRpcConfiguration(conn.getInstance()); + + // Get the active WALs from that server + log.info("Fetching active WALs from {}", tserver); + + Client client = ThriftUtil.getTServerClient(tserver, rpcConfig); + List<String> activeWalsForTserver = client.getActiveLogs(Tracer.traceInfo(), tcreds); + + log.info("Active wals: {}", activeWalsForTserver); + + Assert.assertEquals("Expected to find only one active WAL", 1, activeWalsForTserver.size()); + + String activeWal = new Path(activeWalsForTserver.get(0)).toString(); + + Assert.assertNotEquals("Current active WAL on tserver should not be the original WAL we saw", walName, activeWal); + + log.info("Ensuring that replication status does get closed after WAL is no longer in use by Tserver"); + + do { + Map<String,Status> replicationStatuses = getMetadataStatusForTable(table); + + log.info("Got replication status messages {}", replicationStatuses); + Assert.assertEquals("Did not expect to find additional status records", 1, replicationStatuses.size()); + + status = replicationStatuses.values().iterator().next(); + log.info("Current status: {}", ProtobufUtil.toString(status)); + + if (status.getClosed()) { + return; + } + + log.info("Status is not yet closed, waiting for garbage collector to close it"); + + Thread.sleep(2000); + } while (true); + } +}
