ACCUMULO-4428 Fix state of GC firstSeenDead map

The GC's map of host->timestamp is used to track when it has first seen
a dead tserver, however a new instance of GarbageCollectWriteAheadLogs
is used during each cycle of the GC.  The state information is lost.
The state is now managed by SimpleGarbageCollector, passing this state
into each cycle of the GCWriteAheadLogs class.

Closes apache/accumulo#143

Signed-off-by: Josh Elser <els...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/9d4a46a0
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/9d4a46a0
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/9d4a46a0

Branch: refs/heads/master
Commit: 9d4a46a0c207bf89372aba585edbb65d4e141ee4
Parents: 679a1c3
Author: Adam J. Shook <adamjsh...@gmail.com>
Authored: Wed Aug 31 13:45:59 2016 -0400
Committer: Josh Elser <els...@apache.org>
Committed: Fri Sep 2 16:08:51 2016 -0400

----------------------------------------------------------------------
 .../gc/GarbageCollectWriteAheadLogs.java        |  7 +-
 .../accumulo/gc/SimpleGarbageCollector.java     |  6 +-
 .../gc/GarbageCollectWriteAheadLogsTest.java    | 16 +++--
 .../test/functional/GarbageCollectorIT.java     | 68 ++++++++++++++++++++
 4 files changed, 87 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d4a46a0/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git 
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
 
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index a22a34e..3a8169e 100644
--- 
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ 
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -64,7 +64,7 @@ public class GarbageCollectWriteAheadLogs {
 
   private final Instance instance;
   private final VolumeManager fs;
-  private final Map<HostAndPort,Long> firstSeenDead = new 
HashMap<HostAndPort,Long>();
+  private final Map<HostAndPort,Long> firstSeenDead;
 
   private boolean useTrash;
 
@@ -77,11 +77,14 @@ public class GarbageCollectWriteAheadLogs {
    *          volume manager to use
    * @param useTrash
    *          true to move files to trash rather than delete them
+   * @param firstSeenDead
+   *          mutable map of a host to when it was first seen dead
    */
-  GarbageCollectWriteAheadLogs(Instance instance, VolumeManager fs, boolean 
useTrash) throws IOException {
+  GarbageCollectWriteAheadLogs(Instance instance, VolumeManager fs, boolean 
useTrash, Map<HostAndPort,Long> firstSeenDead) throws IOException {
     this.instance = instance;
     this.fs = fs;
     this.useTrash = useTrash;
+    this.firstSeenDead = firstSeenDead;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d4a46a0/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --git 
a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java 
b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 0191f38..36ca17a 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -19,6 +19,7 @@ package org.apache.accumulo.gc;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.UnknownHostException;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -535,6 +536,9 @@ public class SimpleGarbageCollector implements Iface {
 
     Sampler sampler = new CountSampler(100);
 
+    // Map of tserver -> timestamp, used by the GCWriteAheadLogs to track 
state of when a tablet server went down
+    final Map<HostAndPort,Long> firstSeenDead = new HashMap<>();
+
     while (true) {
       if (sampler.next())
         Trace.on("gc");
@@ -568,7 +572,7 @@ public class SimpleGarbageCollector implements Iface {
       // Clean up any unused write-ahead logs
       Span waLogs = Trace.start("walogs");
       try {
-        GarbageCollectWriteAheadLogs walogCollector = new 
GarbageCollectWriteAheadLogs(instance, fs, isUsingTrash());
+        GarbageCollectWriteAheadLogs walogCollector = new 
GarbageCollectWriteAheadLogs(instance, fs, isUsingTrash(), firstSeenDead);
         log.info("Beginning garbage collection of write-ahead logs");
         walogCollector.collect(status);
       } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d4a46a0/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --git 
a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
 
b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index 03f5c96..24efa09 100644
--- 
a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ 
b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -77,12 +77,14 @@ public class GarbageCollectWriteAheadLogsTest {
   private VolumeManager volMgr;
   private GarbageCollectWriteAheadLogs gcwal;
   private long modTime;
+  private Map<HostAndPort,Long> firstSeenDead;
 
   @Before
   public void setUp() throws Exception {
     instance = createMock(Instance.class);
     volMgr = createMock(VolumeManager.class);
-    gcwal = new GarbageCollectWriteAheadLogs(instance, volMgr, false);
+    firstSeenDead = new HashMap<>();
+    gcwal = new GarbageCollectWriteAheadLogs(instance, volMgr, false, 
firstSeenDead);
     modTime = System.currentTimeMillis();
   }
 
@@ -344,8 +346,8 @@ public class GarbageCollectWriteAheadLogsTest {
 
     private boolean holdsLockBool = false;
 
-    public GCWALPartialMock(Instance i, VolumeManager vm, boolean useTrash, 
boolean holdLock) throws IOException {
-      super(i, vm, useTrash);
+    public GCWALPartialMock(Instance i, VolumeManager vm, boolean useTrash, 
Map<HostAndPort,Long> firstSeenDead, boolean holdLock) throws IOException {
+      super(i, vm, useTrash, firstSeenDead);
       this.holdsLockBool = holdLock;
     }
 
@@ -378,7 +380,7 @@ public class GarbageCollectWriteAheadLogsTest {
   }
 
   private GCWALPartialMock getGCWALForRemoveFileTest(GCStatus s, final boolean 
locked) throws IOException {
-    return new GCWALPartialMock(new MockInstance("accumulo"), 
VolumeManagerImpl.get(), false, locked);
+    return new GCWALPartialMock(new MockInstance("accumulo"), 
VolumeManagerImpl.get(), false, firstSeenDead, locked);
   }
 
   private Map<String,Path> getEmptyMap() {
@@ -472,8 +474,8 @@ public class GarbageCollectWriteAheadLogsTest {
 
   class GCWALDeadTserverCollectMock extends GarbageCollectWriteAheadLogs {
 
-    public GCWALDeadTserverCollectMock(Instance i, VolumeManager vm, boolean 
useTrash) throws IOException {
-      super(i, vm, useTrash);
+    public GCWALDeadTserverCollectMock(Instance i, VolumeManager vm, boolean 
useTrash, Map<HostAndPort,Long> firstSeenDead) throws IOException {
+      super(ctx, vm, useTrash, firstSeenDead);
     }
 
     @Override
@@ -522,7 +524,7 @@ public class GarbageCollectWriteAheadLogsTest {
 
     try {
       VolumeManager vm = VolumeManagerImpl.getLocal(walDir.toString());
-      GarbageCollectWriteAheadLogs gcwal2 = new GCWALDeadTserverCollectMock(i, 
vm, false);
+      GarbageCollectWriteAheadLogs gcwal2 = new GCWALDeadTserverCollectMock(i, 
vm, false, firstSeenDead);
       GCStatus status = new GCStatus(new GcCycleStats(), new GcCycleStats(), 
new GcCycleStats(), new GcCycleStats());
 
       gcwal2.collect(status);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d4a46a0/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
 
b/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
index b7f3396..94cd1fc 100644
--- 
a/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
+++ 
b/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
@@ -19,7 +19,9 @@ package org.apache.accumulo.test.functional;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -30,15 +32,19 @@ import java.util.Map.Entry;
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.cli.BatchWriterOpts;
 import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
@@ -85,6 +91,7 @@ public class GarbageCollectorIT extends ConfigurableMacIT {
     settings.put(Property.GC_CYCLE_START.getKey(), "1");
     settings.put(Property.GC_CYCLE_DELAY.getKey(), "1");
     settings.put(Property.GC_PORT.getKey(), "0");
+    settings.put(Property.GC_WAL_DEAD_SERVER_WAIT, "1s");
     settings.put(Property.TSERV_MAXMEM.getKey(), "5K");
     settings.put(Property.TSERV_MAJC_DELAY.getKey(), "1");
     cfg.setSiteConfig(settings);
@@ -108,6 +115,10 @@ public class GarbageCollectorIT extends ConfigurableMacIT {
     assertNull(getCluster().getProcesses().get(ServerType.GARBAGE_COLLECTOR));
   }
 
+  private void killMacTServer() throws ProcessNotFoundException, 
InterruptedException, KeeperException {
+      getCluster().killProcess(ServerType.TABLET_SERVER, 
getCluster().getProcesses().get(ServerType.TABLET_SERVER).iterator().next());
+  }
+
   @Test
   public void gcTest() throws Exception {
     killMacGc();
@@ -138,6 +149,52 @@ public class GarbageCollectorIT extends ConfigurableMacIT {
   }
 
   @Test
+  public void gcDeleteDeadTServerWAL() throws Exception {
+    // Kill GC process
+    killMacGc();
+
+    // Create table and ingest data
+    Connector c = getConnector();
+    c.tableOperations().create("test_ingest");
+    c.tableOperations().setProperty("test_ingest", 
Property.TABLE_SPLIT_THRESHOLD.getKey(), "5K");
+    String tableId = 
getConnector().tableOperations().tableIdMap().get("test_ingest");
+    TestIngest.Opts opts = new TestIngest.Opts();
+    VerifyIngest.Opts vopts = new VerifyIngest.Opts();
+    vopts.rows = opts.rows = 10000;
+    vopts.cols = opts.cols = 1;
+    opts.setPrincipal("root");
+    vopts.setPrincipal("root");
+    TestIngest.ingest(c, opts, new BatchWriterOpts());
+
+    // Test WAL log has been created
+    List<String> walsBefore = getWALsForTableId(tableId);
+    Assert.assertEquals("Should be one WAL", 1, walsBefore.size());
+
+    // Flush and check for no WAL logs
+    c.tableOperations().flush("test_ingest", null, null, true);
+    List<String> walsAfter = getWALsForTableId(tableId);
+    Assert.assertEquals("Should be no WALs", 0, walsAfter.size());
+
+    // Validate WAL file still exists
+    String walFile = 
walsBefore.get(0).split("\\|")[0].replaceFirst("file:///", "");
+    File wf = new File(walFile);
+    Assert.assertEquals("WAL file does not exist", true, wf.exists());
+
+    // Kill TServer and give it some time to die and master to rebalance
+    killMacTServer();
+    UtilWaitThread.sleep(5000);
+
+    // Restart GC and let it run
+    Process gc = getCluster().exec(SimpleGarbageCollector.class);
+    UtilWaitThread.sleep(60000);
+
+    // Then check the log for proper events
+    String output = FunctionalTestUtils.readAll(getCluster(), 
SimpleGarbageCollector.class, gc);
+    assertTrue("WAL GC should have started", output.contains("Beginning 
garbage collection of write-ahead logs"));
+    assertTrue("WAL was not removed even though tserver was down", 
output.contains("Removing WAL for offline server"));
+  }
+
+  @Test
   public void gcLotsOfCandidatesIT() throws Exception {
     killMacGc();
 
@@ -288,6 +345,17 @@ public class GarbageCollectorIT extends ConfigurableMacIT {
     return result;
   }
 
+  private List<String> getWALsForTableId(String tableId) throws 
TableNotFoundException, AccumuloException, AccumuloSecurityException {
+    Scanner scanner = getConnector().createScanner("accumulo.metadata", 
Authorizations.EMPTY);
+    scanner.setRange(Range.prefix(new Text(tableId)));
+    scanner.fetchColumnFamily(new Text("log"));
+    List<String> walsList = new ArrayList<String>();
+    for (Entry<Key,Value> e : scanner) {
+      walsList.add(e.getValue().toString());
+    }
+    return walsList;
+  }
+
   public static void addEntries(Connector conn, BatchWriterOpts bwOpts) throws 
Exception {
     conn.securityOperations().grantTablePermission(conn.whoami(), 
MetadataTable.NAME, TablePermission.WRITE);
     BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, 
bwOpts.getBatchWriterConfig());

Reply via email to