ACCUMULO-3320 Integration test to ensure that WALs are not closed when a 
tserver may continue to use it.

Also test for the converse that after a WAL is unused by a tserver, it is still 
closed.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/09cb6b2a
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/09cb6b2a
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/09cb6b2a

Branch: refs/heads/master
Commit: 09cb6b2a82d6634c1bd991ed4a0012fb87a459f3
Parents: 84191c5
Author: Josh Elser <[email protected]>
Authored: Sun Nov 9 12:19:05 2014 -0500
Committer: Josh Elser <[email protected]>
Committed: Mon Nov 10 13:34:51 2014 -0800

----------------------------------------------------------------------
 .../org/apache/accumulo/core/conf/Property.java |  12 +-
 .../core/replication/ReplicationTable.java      |   3 +
 .../CloseWriteAheadLogReferences.java           |  11 +-
 .../master/replication/ReplicationDriver.java   |   6 +
 ...arbageCollectorCommunicatesWithTServers.java | 446 +++++++++++++++++++
 5 files changed, 470 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/09cb6b2a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index f59b654..1195668 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -206,12 +206,12 @@ public enum Property {
   TSERV_CLIENTPORT("tserver.port.client", "9997", PropertyType.PORT, "The port 
used for handling client connections on the tablet servers"),
   @Deprecated
   TSERV_MUTATION_QUEUE_MAX("tserver.mutation.queue.max", "1M", 
PropertyType.MEMORY,
-      "This setting is deprecated. See tserver.total.mutation.queue.max. " 
+      "This setting is deprecated. See tserver.total.mutation.queue.max. "
           + "The amount of memory to use to store write-ahead-log 
mutations-per-session before flushing them. Since the buffer is per write 
session, consider the"
           + " max number of concurrent writer when configuring. When using 
Hadoop 2, Accumulo will call hsync() on the WAL . For a small number of "
           + "concurrent writers, increasing this buffer size decreases the 
frequncy of hsync calls. For a large number of concurrent writers a small 
buffers "
           + "size is ok because of group commit."),
-  TSERV_TOTAL_MUTATION_QUEUE_MAX("tserver.total.mutation.queue.max", "50M", 
PropertyType.MEMORY, 
+  TSERV_TOTAL_MUTATION_QUEUE_MAX("tserver.total.mutation.queue.max", "50M", 
PropertyType.MEMORY,
       "The amount of memory used to store write-ahead-log mutations before 
flushing them."),
   
TSERV_TABLET_SPLIT_FINDMIDPOINT_MAXOPEN("tserver.tablet.split.midpoint.files.max",
 "30", PropertyType.COUNT,
       "To find a tablets split points, all index files are opened. This 
setting determines how many index "
@@ -502,6 +502,8 @@ public enum Property {
   REPLICATION_MAX_UNIT_SIZE("replication.max.unit.size", "64M", 
PropertyType.MEMORY, "Maximum size of data to send in a replication message"),
   REPLICATION_WORK_ASSIGNER("replication.work.assigner", 
"org.apache.accumulo.master.replication.UnorderedWorkAssigner", 
PropertyType.CLASSNAME,
       "Replication WorkAssigner implementation to use"),
+  REPLICATION_DRIVER_DELAY("replication.driver.delay", "0s", 
PropertyType.TIMEDURATION,
+      "Amount of time to wait before the replication work loop begins in the 
master."),
   REPLICATION_WORK_PROCESSOR_DELAY("replication.work.processor.delay", "0s", 
PropertyType.TIMEDURATION, "Amount of time to wait before first checking for 
replication work, not useful outside of tests"),
   REPLICATION_WORK_PROCESSOR_PERIOD("replication.work.processor.period", "0s", 
PropertyType.TIMEDURATION, "Amount of time to wait before re-checking for 
replication work, not useful outside of tests"),
 
@@ -814,7 +816,7 @@ public enum Property {
 
   /**
    * Creates a new instance of a class specified in a configuration property. 
The table classpath context is used if set.
-   * 
+   *
    * @param conf
    *          configuration containing property
    * @param property
@@ -835,7 +837,7 @@ public enum Property {
 
   /**
    * Creates a new instance of a class specified in a configuration property.
-   * 
+   *
    * @param conf
    *          configuration containing property
    * @param property
@@ -856,7 +858,7 @@ public enum Property {
   /**
    * Collects together properties from the given configuration pertaining to 
compaction strategies. The relevant properties all begin with the prefix in
    * {@link #TABLE_COMPACTION_STRATEGY_PREFIX}. In the returned map, the 
prefix is removed from each property's key.
-   * 
+   *
    * @param tableConf
    *          configuration
    * @return map of compaction strategy property keys and values, with the 
detection prefix removed from each key

http://git-wip-us.apache.org/repos/asf/accumulo/blob/09cb6b2a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java 
b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java
index ec7c202..c6f8ada 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java
@@ -37,10 +37,12 @@ import 
org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
 
 import com.google.common.collect.ImmutableMap;
 
 public class ReplicationTable {
+  private static final Logger log = Logger.getLogger(ReplicationTable.class);
 
   public static final String ID = "+rep";
   public static final String NAME = Namespaces.ACCUMULO_NAMESPACE + 
".replication";
@@ -90,6 +92,7 @@ public class ReplicationTable {
 
   public static void setOnline(Connector conn) throws 
AccumuloSecurityException, AccumuloException {
     try {
+      log.info("Bringing replication table online");
       conn.tableOperations().online(NAME, true);
     } catch (TableNotFoundException e) {
       throw new AssertionError(NAME + " should exist, but doesn't.");

http://git-wip-us.apache.org/repos/asf/accumulo/blob/09cb6b2a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
----------------------------------------------------------------------
diff --git 
a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
 
b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
index cb74f18..a41e965 100644
--- 
a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
+++ 
b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
@@ -39,6 +39,7 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.rfile.RFile;
 import org.apache.accumulo.core.master.thrift.MasterClientService;
 import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
@@ -121,6 +122,7 @@ public class CloseWriteAheadLogReferences implements 
Runnable {
     }
 
     log.info("Found " + referencedWals.size() + " WALs referenced in metadata 
in " + sw.toString());
+    log.debug("Referenced WALs: " + referencedWals);
     sw.reset();
 
     /*
@@ -202,6 +204,8 @@ public class CloseWriteAheadLogReferences implements 
Runnable {
         // The value may contain multiple WALs
         LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), 
entry.getValue());
 
+        log.debug("Found WALs for table(" + logEntry.extent.getTableId() + "): 
" + logEntry.logSet);
+
         // Normalize each log file (using Path) and add it to the set
         for (String logFile : logEntry.logSet) {
           referencedWals.add(normalizedWalPaths.get(logFile));
@@ -251,12 +255,13 @@ public class CloseWriteAheadLogReferences implements 
Runnable {
         }
 
         // Ignore things that aren't completely replicated as we can't delete 
those anyways
-        entry.getKey().getRow(replFileText);
-        String replFile = 
replFileText.toString().substring(ReplicationSection.getRowPrefix().length());
+        MetadataSchema.ReplicationSection.getFile(entry.getKey(), 
replFileText);
+        String replFile = replFileText.toString();
+        boolean isReferenced = referencedWals.contains(replFile);
 
         // We only want to clean up WALs (which is everything but rfiles) and 
only when
         // metadata doesn't have a reference to the given WAL
-        if (!status.getClosed() && !replFile.endsWith(RFILE_SUFFIX) && 
!referencedWals.contains(replFile)) {
+        if (!status.getClosed() && !replFile.endsWith(RFILE_SUFFIX) && 
!isReferenced) {
           try {
             closeWal(bw, entry.getKey());
             recordsClosed++;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/09cb6b2a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
----------------------------------------------------------------------
diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
 
b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
index ea378f4..f822a90 100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
@@ -55,6 +55,12 @@ public class ReplicationDriver extends Daemon {
   public void run() {
     CountSampler sampler = new CountSampler(10);
 
+    long millisToWait = 
conf.getTimeInMillis(Property.REPLICATION_DRIVER_DELAY);
+    log.debug("Waiting " + millisToWait + "ms before starting main replication 
loop");
+    UtilWaitThread.sleep(millisToWait);
+
+    log.debug("Starting replication loop");
+
     while (master.stillMaster()) {
       if (null == workMaker) {
         try {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/09cb6b2a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServers.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServers.java
 
b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServers.java
new file mode 100644
index 0000000..dff2726
--- /dev/null
+++ 
b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServers.java
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.replication;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.impl.ClientConfigurationHelper;
+import org.apache.accumulo.core.client.impl.ClientExecReturn;
+import org.apache.accumulo.core.client.impl.MasterClient;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.master.thrift.MasterClientService;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.core.replication.proto.Replication.Status;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
+import org.apache.accumulo.core.trace.Tracer;
+import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.AbstractMacIT;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.Text;
+import org.bouncycastle.util.Arrays;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ACCUMULO-3302 series of tests which ensure that a WAL is prematurely closed 
when a TServer may still continue to use it. Checking that no tablet references 
a
+ * WAL is insufficient to determine if a WAL will never be used in the future.
+ */
+public class GarbageCollectorCommunicatesWithTServers extends 
ConfigurableMacIT {
+  private static final Logger log = 
LoggerFactory.getLogger(GarbageCollectorCommunicatesWithTServers.class);
+
+  private final int GC_PERIOD_SECONDS = 1;
+
+  @Override
+  public int defaultTimeoutSeconds() {
+    return 2 * 60;
+  }
+
+  @Override
+  public void configure(MiniAccumuloConfigImpl cfg, Configuration coreSite) {
+    cfg.setNumTservers(1);
+    cfg.setProperty(Property.GC_CYCLE_DELAY, GC_PERIOD_SECONDS + "s");
+    // Wait longer to try to let the replication table come online before a 
cycle runs
+    cfg.setProperty(Property.GC_CYCLE_START, "10s");
+    cfg.setProperty(Property.REPLICATION_NAME, "master");
+    // Set really long delays for the master to do stuff for replication
+    cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_DELAY, "240s");
+    cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "240s");
+    cfg.setProperty(Property.REPLICATION_DRIVER_DELAY, "240s");
+    cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M");
+    coreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+  }
+
+  /**
+   * Fetch all of the WALs referenced by tablets in the metadata table for 
this table
+   */
+  private Set<String> getWalsForTable(String tableName) throws Exception {
+    final Connector conn = getConnector();
+    final String tableId = conn.tableOperations().tableIdMap().get(tableName);
+
+    Assert.assertNotNull("Could not determine table ID for " + tableName, 
tableId);
+
+    Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    Range r = MetadataSchema.TabletsSection.getRange(tableId);
+    s.setRange(r);
+    s.fetchColumnFamily(MetadataSchema.TabletsSection.LogColumnFamily.NAME);
+
+    Set<String> wals = new HashSet<String>();
+    for (Entry<Key,Value> entry : s) {
+      log.debug("Reading WALs: {}={}", entry.getKey().toStringNoTruncate(), 
entry.getValue());
+      // hostname:port/uri://path/to/wal
+      String cq = entry.getKey().getColumnQualifier().toString();
+      int index = cq.indexOf('/');
+      // Normalize the path
+      String path = new Path(cq.substring(index + 1)).toString();
+      log.debug("Extracted file: " + path);
+      wals.add(path);
+    }
+
+    return wals;
+  }
+
+  /**
+   * Fetch all of the rfiles referenced by tablets in the metadata table for 
this table
+   */
+  private Set<String> getFilesForTable(String tableName) throws Exception {
+    final Connector conn = getConnector();
+    final String tableId = conn.tableOperations().tableIdMap().get(tableName);
+
+    Assert.assertNotNull("Could not determine table ID for " + tableName, 
tableId);
+
+    Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    Range r = MetadataSchema.TabletsSection.getRange(tableId);
+    s.setRange(r);
+    
s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+
+    Set<String> rfiles = new HashSet<String>();
+    for (Entry<Key,Value> entry : s) {
+      log.debug("Reading RFiles: {}={}", entry.getKey().toStringNoTruncate(), 
entry.getValue());
+      // uri://path/to/wal
+      String cq = entry.getKey().getColumnQualifier().toString();
+      String path = new Path(cq).toString();
+      log.debug("Normalize path to rfile: {}", path);
+      rfiles.add(path);
+    }
+
+    return rfiles;
+  }
+
+  /**
+   * Get the replication status messages for the given table that exist in the 
metadata table (~repl entries)
+   */
+  private Map<String,Status> getMetadataStatusForTable(String tableName) 
throws Exception {
+    final Connector conn = getConnector();
+    final String tableId = conn.tableOperations().tableIdMap().get(tableName);
+
+    Assert.assertNotNull("Could not determine table ID for " + tableName, 
tableId);
+
+    Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    Range r = MetadataSchema.ReplicationSection.getRange();
+    s.setRange(r);
+    s.fetchColumn(MetadataSchema.ReplicationSection.COLF, new Text(tableId));
+
+    Map<String,Status> fileToStatus = new HashMap<String,Status>();
+    for (Entry<Key,Value> entry : s) {
+      Text file = new Text();
+      MetadataSchema.ReplicationSection.getFile(entry.getKey(), file);
+      Status status = Status.parseFrom(entry.getValue().get());
+      log.info("Got status for {}: {}", file, ProtobufUtil.toString(status));
+      fileToStatus.put(file.toString(), status);
+    }
+
+    return fileToStatus;
+  }
+
+  @Test
+  public void testActiveWalPrecludesClosing() throws Exception {
+    final String table = getUniqueNames(1)[0];
+    final Connector conn = getConnector();
+
+    // Bring the replication table online first and foremost
+    ReplicationTable.setOnline(conn);
+
+    log.info("Creating {}", table);
+    conn.tableOperations().create(table);
+
+    conn.tableOperations().setProperty(table, 
Property.TABLE_REPLICATION.getKey(), "true");
+
+    log.info("Writing a few mutations to the table");
+
+    BatchWriter bw = conn.createBatchWriter(table, null);
+
+    byte[] empty = new byte[0];
+    for (int i = 0; i < 5; i++) {
+      Mutation m = new Mutation(Integer.toString(i));
+      m.put(empty, empty, empty);
+      bw.addMutation(m);
+    }
+
+    log.info("Flushing mutations to the server");
+    bw.flush();
+
+    log.info("Checking that metadata only has one WAL recorded for this 
table");
+
+    Set<String> wals = getWalsForTable(table);
+    Assert.assertEquals("Expected to only find one WAL for the table", 1, 
wals.size());
+
+    log.info("Compacting the table which will remove all WALs from the 
tablets");
+
+    // Flush our test table to remove the WAL references in it
+    conn.tableOperations().flush(table, null, null, true);
+    // Flush the metadata table too because it will have a reference to the WAL
+    conn.tableOperations().flush(MetadataTable.NAME, null, null, true);
+
+    log.info("Waiting for replication table to come online");
+
+    log.info("Fetching replication statuses from metadata table");
+
+    Map<String,Status> fileToStatus = getMetadataStatusForTable(table);
+
+    Assert.assertEquals("Expected to only find one replication status 
message", 1, fileToStatus.size());
+
+    String walName = fileToStatus.keySet().iterator().next();
+    Assert.assertEquals("Expected log file name from tablet to equal 
replication entry", wals.iterator().next(), walName);
+
+    Status status = fileToStatus.get(walName);
+
+    Assert.assertEquals("Expected Status for file to not be closed", false, 
status.getClosed());
+
+    log.info("Checking to see that log entries are removed from tablet section 
after MinC");
+    // After compaction, the log column should be gone from the tablet
+    Set<String> walsAfterMinc = getWalsForTable(table);
+    Assert.assertEquals("Expected to find no WALs for tablet", 0, 
walsAfterMinc.size());
+
+    Set<String> filesForTable = getFilesForTable(table);
+    Assert.assertEquals("Expected to only find one rfile for table", 1, 
filesForTable.size());
+    log.info("Files for table before MajC: {}", filesForTable);
+
+    // Issue a MajC to roll a new file in HDFS
+    conn.tableOperations().compact(table, null, null, false, true);
+
+    Set<String> filesForTableAfterCompaction = getFilesForTable(table);
+
+    log.info("Files for table after MajC: {}", filesForTableAfterCompaction);
+
+    Assert.assertEquals("Expected to only find one rfile for table", 1, 
filesForTableAfterCompaction.size());
+    Assert.assertNotEquals("Expected the files before and after compaction to 
differ", filesForTableAfterCompaction, filesForTable);
+
+    // Use the rfile which was just replaced by the MajC to determine when the 
GC has ran
+    Path fileToBeDeleted = new Path(filesForTable.iterator().next());
+    FileSystem fs = fileToBeDeleted.getFileSystem(new Configuration());
+
+    boolean fileExists = fs.exists(fileToBeDeleted);
+    while (fileExists) {
+      log.info("File which should get deleted still exists: {}", 
fileToBeDeleted);
+      Thread.sleep(2000);
+      fileExists = fs.exists(fileToBeDeleted);
+    }
+
+    // At this point in time, we *know* that the GarbageCollector has run 
which means that the Status
+    // for our WAL should not be altered.
+
+    log.info("Re-checking that WALs are still not referenced for our table");
+
+    Set<String> walsAfterMajc = getWalsForTable(table);
+    Assert.assertEquals("Expected to find no WALs in tablets section: " + 
walsAfterMajc, 0, walsAfterMajc.size());
+
+    Map<String,Status> fileToStatusAfterMinc = 
getMetadataStatusForTable(table);
+    Assert.assertEquals("Expected to still find only one replication status 
message: " + fileToStatusAfterMinc, 1, fileToStatusAfterMinc.size());
+
+    Assert.assertEquals("Status before and after MinC should be identical", 
fileToStatus, fileToStatusAfterMinc);
+  }
+
+  @Test
+  public void testUnreferencedWalInTserverIsClosed() throws Exception {
+    final String[] names = getUniqueNames(2);
+    // `table` will be replicated, `otherTable` is only used to roll the WAL 
on the tserver
+    final String table = names[0], otherTable = names[1];
+    final Connector conn = getConnector();
+
+    // Bring the replication table online first and foremost
+    ReplicationTable.setOnline(conn);
+
+    log.info("Creating {}", table);
+    conn.tableOperations().create(table);
+
+    conn.tableOperations().setProperty(table, 
Property.TABLE_REPLICATION.getKey(), "true");
+
+    log.info("Writing a few mutations to the table");
+
+    BatchWriter bw = conn.createBatchWriter(table, null);
+
+    byte[] empty = new byte[0];
+    for (int i = 0; i < 5; i++) {
+      Mutation m = new Mutation(Integer.toString(i));
+      m.put(empty, empty, empty);
+      bw.addMutation(m);
+    }
+
+    log.info("Flushing mutations to the server");
+    bw.close();
+
+    log.info("Checking that metadata only has one WAL recorded for this 
table");
+
+    Set<String> wals = getWalsForTable(table);
+    Assert.assertEquals("Expected to only find one WAL for the table", 1, 
wals.size());
+
+    log.info("Compacting the table which will remove all WALs from the 
tablets");
+
+    // Flush our test table to remove the WAL references in it
+    conn.tableOperations().flush(table, null, null, true);
+    // Flush the metadata table too because it will have a reference to the WAL
+    conn.tableOperations().flush(MetadataTable.NAME, null, null, true);
+
+    log.info("Waiting for replication table to come online");
+
+    log.info("Fetching replication statuses from metadata table");
+
+    Map<String,Status> fileToStatus = getMetadataStatusForTable(table);
+
+    Assert.assertEquals("Expected to only find one replication status 
message", 1, fileToStatus.size());
+
+    String walName = fileToStatus.keySet().iterator().next();
+    Assert.assertEquals("Expected log file name from tablet to equal 
replication entry", wals.iterator().next(), walName);
+
+    Status status = fileToStatus.get(walName);
+
+    Assert.assertEquals("Expected Status for file to not be closed", false, 
status.getClosed());
+
+    log.info("Checking to see that log entries are removed from tablet section 
after MinC");
+    // After compaction, the log column should be gone from the tablet
+    Set<String> walsAfterMinc = getWalsForTable(table);
+    Assert.assertEquals("Expected to find no WALs for tablet", 0, 
walsAfterMinc.size());
+
+    Set<String> filesForTable = getFilesForTable(table);
+    Assert.assertEquals("Expected to only find one rfile for table", 1, 
filesForTable.size());
+    log.info("Files for table before MajC: {}", filesForTable);
+
+    // Issue a MajC to roll a new file in HDFS
+    conn.tableOperations().compact(table, null, null, false, true);
+
+    Set<String> filesForTableAfterCompaction = getFilesForTable(table);
+
+    log.info("Files for table after MajC: {}", filesForTableAfterCompaction);
+
+    Assert.assertEquals("Expected to only find one rfile for table", 1, 
filesForTableAfterCompaction.size());
+    Assert.assertNotEquals("Expected the files before and after compaction to 
differ", filesForTableAfterCompaction, filesForTable);
+
+    // Use the rfile which was just replaced by the MajC to determine when the 
GC has ran
+    Path fileToBeDeleted = new Path(filesForTable.iterator().next());
+    FileSystem fs = fileToBeDeleted.getFileSystem(new Configuration());
+
+    boolean fileExists = fs.exists(fileToBeDeleted);
+    while (fileExists) {
+      log.info("File which should get deleted still exists: {}", 
fileToBeDeleted);
+      Thread.sleep(2000);
+      fileExists = fs.exists(fileToBeDeleted);
+    }
+
+    // At this point in time, we *know* that the GarbageCollector has run 
which means that the Status
+    // for our WAL should not be altered.
+
+    log.info("Re-checking that WALs are still not referenced for our table");
+
+    Set<String> walsAfterMajc = getWalsForTable(table);
+    Assert.assertEquals("Expected to find no WALs in tablets section: " + 
walsAfterMajc, 0, walsAfterMajc.size());
+
+    Map<String,Status> fileToStatusAfterMinc = 
getMetadataStatusForTable(table);
+    Assert.assertEquals("Expected to still find only one replication status 
message: " + fileToStatusAfterMinc, 1, fileToStatusAfterMinc.size());
+
+    Assert.assertEquals("Status before and after MinC should be identical", 
fileToStatus, fileToStatusAfterMinc);
+
+    /*
+     * To verify that the WALs is still getting closed, we have to force the 
tserver to close the existing WAL and open a new one instead. The easiest way 
to do
+     * this is to write a load of data that will exceed the 1.33% full 
threshold that the logger keeps track of
+     */
+
+    conn.tableOperations().create(otherTable);
+    bw = conn.createBatchWriter(otherTable, null);
+    // 500k
+    byte[] bigValue = new byte[1024 * 500];
+    Arrays.fill(bigValue, (byte)1);
+    // 500k * 50
+    for (int i = 0; i < 50; i++) {
+      Mutation m = new Mutation(Integer.toString(i));
+      m.put(empty, empty, bigValue);
+      bw.addMutation(m);
+      if (i % 10 == 0) {
+        bw.flush();
+      }
+    }
+
+    bw.close();
+
+    conn.tableOperations().flush(otherTable, null, null, true);
+
+    final TCredentials tcreds = new Credentials("root", new 
PasswordToken(AbstractMacIT.ROOT_PASSWORD)).toThrift(conn.getInstance());
+
+    // Get the tservers which the master deems as active
+    List<String> tservers = MasterClient.execute(conn.getInstance(), new 
ClientExecReturn<List<String>,MasterClientService.Client>() {
+      @Override
+      public List<String> execute(MasterClientService.Client client) throws 
Exception {
+        return client.getActiveTservers(Tracer.traceInfo(), tcreds);
+      }
+    });
+
+    Assert.assertEquals("Expected only one active tservers", 1, 
tservers.size());
+
+    String tserver = tservers.get(0);
+    AccumuloConfiguration rpcConfig = 
ClientConfigurationHelper.getClientRpcConfiguration(conn.getInstance());
+
+    // Get the active WALs from that server
+    log.info("Fetching active WALs from {}", tserver);
+
+    Client client = ThriftUtil.getTServerClient(tserver, rpcConfig);
+    List<String> activeWalsForTserver = 
client.getActiveLogs(Tracer.traceInfo(), tcreds);
+
+    log.info("Active wals: {}", activeWalsForTserver);
+
+    Assert.assertEquals("Expected to find only one active WAL", 1, 
activeWalsForTserver.size());
+
+    String activeWal = new Path(activeWalsForTserver.get(0)).toString();
+
+    Assert.assertNotEquals("Current active WAL on tserver should not be the 
original WAL we saw", walName, activeWal);
+
+    log.info("Ensuring that replication status does get closed after WAL is no 
longer in use by Tserver");
+
+    do {
+      Map<String,Status> replicationStatuses = 
getMetadataStatusForTable(table);
+
+      log.info("Got replication status messages {}", replicationStatuses);
+      Assert.assertEquals("Did not expect to find additional status records", 
1, replicationStatuses.size());
+
+      status = replicationStatuses.values().iterator().next();
+      log.info("Current status: {}", ProtobufUtil.toString(status));
+
+      if (status.getClosed()) {
+        return;
+      }
+
+      log.info("Status is not yet closed, waiting for garbage collector to 
close it");
+
+      Thread.sleep(2000);
+    } while (true);
+  }
+}

Reply via email to