Repository: accumulo
Updated Branches:
  refs/heads/1.7 661dac336 -> 8a3cc4f04
  refs/heads/1.8 d28a3ee3e -> 55795c1ca
  refs/heads/master 159560979 -> cca8b896e


ACCUMULO-4428 Fix state of GC firstSeenDead map

The GC's map of host->timestamp is used to track when it has first seen
a dead tserver, however a new instance of GarbageCollectWriteAheadLogs
is used during each cycle of the GC.  The state information is lost.
The state is now managed by SimpleGarbageCollector, passing this state
into each cycle of the GCWriteAheadLogs class.

Closes apache/accumulo#143

Signed-off-by: Josh Elser <els...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/9edda321
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/9edda321
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/9edda321

Branch: refs/heads/1.7
Commit: 9edda3215974d7ee284ecfa08b69fcd1ef686d19
Parents: 661dac3
Author: Adam J. Shook <adamjsh...@gmail.com>
Authored: Wed Aug 31 13:45:59 2016 -0400
Committer: Josh Elser <els...@apache.org>
Committed: Wed Aug 31 14:30:33 2016 -0400

----------------------------------------------------------------------
 .../gc/GarbageCollectWriteAheadLogs.java        |  7 +-
 .../accumulo/gc/SimpleGarbageCollector.java     |  6 +-
 .../gc/GarbageCollectWriteAheadLogsTest.java    | 32 +++++----
 .../test/functional/GarbageCollectorIT.java     | 68 ++++++++++++++++++++
 4 files changed, 96 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/9edda321/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git 
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
 
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index b57b8fc..339b233 100644
--- 
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ 
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -83,7 +83,7 @@ public class GarbageCollectWriteAheadLogs {
 
   private final AccumuloServerContext context;
   private final VolumeManager fs;
-  private final Map<HostAndPort,Long> firstSeenDead = new 
HashMap<HostAndPort,Long>();
+  private final Map<HostAndPort,Long> firstSeenDead;
 
   private boolean useTrash;
 
@@ -96,11 +96,14 @@ public class GarbageCollectWriteAheadLogs {
    *          volume manager to use
    * @param useTrash
    *          true to move files to trash rather than delete them
+   * @param firstSeenDead
+   *          mutable map of a host to when it was first seen dead
    */
-  GarbageCollectWriteAheadLogs(AccumuloServerContext context, VolumeManager 
fs, boolean useTrash) throws IOException {
+  GarbageCollectWriteAheadLogs(AccumuloServerContext context, VolumeManager 
fs, boolean useTrash, Map<HostAndPort,Long> firstSeenDead) throws IOException {
     this.context = context;
     this.fs = fs;
     this.useTrash = useTrash;
+    this.firstSeenDead = firstSeenDead;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9edda321/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --git 
a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java 
b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 98acf9a..879d6b9 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -19,6 +19,7 @@ package org.apache.accumulo.gc;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.UnknownHostException;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -530,6 +531,9 @@ public class SimpleGarbageCollector extends 
AccumuloServerContext implements Ifa
 
     ProbabilitySampler sampler = new 
ProbabilitySampler(getConfiguration().getFraction(Property.GC_TRACE_PERCENT));
 
+    // Map of tserver -> timestamp, used by the GCWriteAheadLogs to track 
state of when a tablet server went down
+    final Map<HostAndPort,Long> firstSeenDead = new HashMap<>();
+
     while (true) {
       Trace.on("gc", sampler);
 
@@ -574,7 +578,7 @@ public class SimpleGarbageCollector extends 
AccumuloServerContext implements Ifa
       // Clean up any unused write-ahead logs
       Span waLogs = Trace.start("walogs");
       try {
-        GarbageCollectWriteAheadLogs walogCollector = new 
GarbageCollectWriteAheadLogs(this, fs, isUsingTrash());
+        GarbageCollectWriteAheadLogs walogCollector = new 
GarbageCollectWriteAheadLogs(this, fs, isUsingTrash(), firstSeenDead);
         log.info("Beginning garbage collection of write-ahead logs");
         walogCollector.collect(status);
       } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9edda321/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --git 
a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
 
b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index bc9fca3..940b922 100644
--- 
a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ 
b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -107,6 +107,7 @@ public class GarbageCollectWriteAheadLogsTest {
   private VolumeManager volMgr;
   private GarbageCollectWriteAheadLogs gcwal;
   private long modTime;
+  private Map<HostAndPort,Long> firstSeenDead;
 
   @Rule
   public TestName testName = new TestName();
@@ -151,7 +152,8 @@ public class GarbageCollectWriteAheadLogsTest {
 
     replay(instance, factory, siteConfig);
     AccumuloServerContext context = new AccumuloServerContext(factory);
-    gcwal = new GarbageCollectWriteAheadLogs(context, volMgr, false);
+    firstSeenDead = new HashMap<>();
+    gcwal = new GarbageCollectWriteAheadLogs(context, volMgr, false, 
firstSeenDead);
     modTime = System.currentTimeMillis();
   }
 
@@ -393,8 +395,9 @@ public class GarbageCollectWriteAheadLogsTest {
 
     private List<Entry<Key,Value>> replData;
 
-    ReplicationGCWAL(AccumuloServerContext context, VolumeManager fs, boolean 
useTrash, List<Entry<Key,Value>> replData) throws IOException {
-      super(context, fs, useTrash);
+    ReplicationGCWAL(AccumuloServerContext context, VolumeManager fs, boolean 
useTrash, Map<HostAndPort,Long> firstSeenDead, List<Entry<Key,Value>> replData)
+        throws IOException {
+      super(context, fs, useTrash, firstSeenDead);
       this.replData = replData;
     }
 
@@ -413,7 +416,7 @@ public class GarbageCollectWriteAheadLogsTest {
     LinkedList<Entry<Key,Value>> replData = new LinkedList<>();
     replData.add(Maps.immutableEntry(new Key("/wals/" + file1, 
StatusSection.NAME.toString(), "1"), 
StatusUtil.fileCreatedValue(System.currentTimeMillis())));
 
-    ReplicationGCWAL replGC = new ReplicationGCWAL(null, volMgr, false, 
replData);
+    ReplicationGCWAL replGC = new ReplicationGCWAL(null, volMgr, false, 
firstSeenDead, replData);
 
     replay(conn);
 
@@ -442,7 +445,7 @@ public class GarbageCollectWriteAheadLogsTest {
     Instance inst = new MockInstance(testName.getMethodName());
     AccumuloServerContext context = new AccumuloServerContext(new 
ServerConfigurationFactory(inst));
 
-    GarbageCollectWriteAheadLogs gcWALs = new 
GarbageCollectWriteAheadLogs(context, volMgr, false);
+    GarbageCollectWriteAheadLogs gcWALs = new 
GarbageCollectWriteAheadLogs(context, volMgr, false, firstSeenDead);
 
     long file1CreateTime = System.currentTimeMillis();
     long file2CreateTime = file1CreateTime + 50;
@@ -485,7 +488,7 @@ public class GarbageCollectWriteAheadLogsTest {
 
     Connector conn = context.getConnector();
 
-    GarbageCollectWriteAheadLogs gcWALs = new 
GarbageCollectWriteAheadLogs(context, volMgr, false);
+    GarbageCollectWriteAheadLogs gcWALs = new 
GarbageCollectWriteAheadLogs(context, volMgr, false, firstSeenDead);
 
     long file1CreateTime = System.currentTimeMillis();
     long file2CreateTime = file1CreateTime + 50;
@@ -534,7 +537,7 @@ public class GarbageCollectWriteAheadLogsTest {
     bw.addMutation(m);
     bw.close();
 
-    GarbageCollectWriteAheadLogs gcWALs = new 
GarbageCollectWriteAheadLogs(context, volMgr, false);
+    GarbageCollectWriteAheadLogs gcWALs = new 
GarbageCollectWriteAheadLogs(context, volMgr, false, firstSeenDead);
 
     Iterable<Entry<Key,Value>> data = gcWALs.getReplicationStatusForFile(conn, 
wal);
     Entry<Key,Value> entry = Iterables.getOnlyElement(data);
@@ -562,7 +565,7 @@ public class GarbageCollectWriteAheadLogsTest {
     bw.addMutation(m);
     bw.close();
 
-    GarbageCollectWriteAheadLogs gcWALs = new 
GarbageCollectWriteAheadLogs(context, volMgr, false);
+    GarbageCollectWriteAheadLogs gcWALs = new 
GarbageCollectWriteAheadLogs(context, volMgr, false, firstSeenDead);
 
     Iterable<Entry<Key,Value>> iter = gcWALs.getReplicationStatusForFile(conn, 
wal);
     Map<Key,Value> data = new HashMap<>();
@@ -666,8 +669,9 @@ public class GarbageCollectWriteAheadLogsTest {
 
     private boolean holdsLockBool = false;
 
-    public GCWALPartialMock(AccumuloServerContext ctx, VolumeManager vm, 
boolean useTrash, boolean holdLock) throws IOException {
-      super(ctx, vm, useTrash);
+    public GCWALPartialMock(AccumuloServerContext ctx, VolumeManager vm, 
boolean useTrash, Map<HostAndPort,Long> firstSeenDead, boolean holdLock)
+        throws IOException {
+      super(ctx, vm, useTrash, firstSeenDead);
       this.holdsLockBool = holdLock;
     }
 
@@ -701,7 +705,7 @@ public class GarbageCollectWriteAheadLogsTest {
 
   private GCWALPartialMock getGCWALForRemoveFileTest(GCStatus s, final boolean 
locked) throws IOException {
     AccumuloServerContext ctx = new AccumuloServerContext(new 
ServerConfigurationFactory(new MockInstance("accumulo")));
-    return new GCWALPartialMock(ctx, VolumeManagerImpl.get(), false, locked);
+    return new GCWALPartialMock(ctx, VolumeManagerImpl.get(), false, 
firstSeenDead, locked);
   }
 
   private Map<String,Path> getEmptyMap() {
@@ -795,8 +799,8 @@ public class GarbageCollectWriteAheadLogsTest {
 
   class GCWALDeadTserverCollectMock extends GarbageCollectWriteAheadLogs {
 
-    public GCWALDeadTserverCollectMock(AccumuloServerContext ctx, 
VolumeManager vm, boolean useTrash) throws IOException {
-      super(ctx, vm, useTrash);
+    public GCWALDeadTserverCollectMock(AccumuloServerContext ctx, 
VolumeManager vm, boolean useTrash, Map<HostAndPort,Long> firstSeenDead) throws 
IOException {
+      super(ctx, vm, useTrash, firstSeenDead);
     }
 
     @Override
@@ -846,7 +850,7 @@ public class GarbageCollectWriteAheadLogsTest {
 
     try {
       VolumeManager vm = VolumeManagerImpl.getLocal(walDir.toString());
-      GarbageCollectWriteAheadLogs gcwal2 = new 
GCWALDeadTserverCollectMock(ctx, vm, false);
+      GarbageCollectWriteAheadLogs gcwal2 = new 
GCWALDeadTserverCollectMock(ctx, vm, false, firstSeenDead);
       GCStatus status = new GCStatus(new GcCycleStats(), new GcCycleStats(), 
new GcCycleStats(), new GcCycleStats());
 
       gcwal2.collect(status);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9edda321/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
 
b/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
index 202bfac..3e8abe6 100644
--- 
a/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
+++ 
b/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
@@ -19,7 +19,9 @@ package org.apache.accumulo.test.functional;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
@@ -29,15 +31,19 @@ import java.util.Map.Entry;
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.cli.BatchWriterOpts;
 import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
@@ -85,6 +91,7 @@ public class GarbageCollectorIT extends ConfigurableMacIT {
     cfg.setProperty(Property.GC_CYCLE_START, "1");
     cfg.setProperty(Property.GC_CYCLE_DELAY, "1");
     cfg.setProperty(Property.GC_PORT, "0");
+    cfg.setProperty(Property.GC_WAL_DEAD_SERVER_WAIT, "1s");
     cfg.setProperty(Property.TSERV_MAXMEM, "5K");
     cfg.setProperty(Property.TSERV_MAJC_DELAY, "1");
 
@@ -107,6 +114,10 @@ public class GarbageCollectorIT extends ConfigurableMacIT {
     assertNull(getCluster().getProcesses().get(ServerType.GARBAGE_COLLECTOR));
   }
 
+  private void killMacTServer() throws ProcessNotFoundException, 
InterruptedException, KeeperException {
+      getCluster().killProcess(ServerType.TABLET_SERVER, 
getCluster().getProcesses().get(ServerType.TABLET_SERVER).iterator().next());
+  }
+
   @Test
   public void gcTest() throws Exception {
     killMacGc();
@@ -139,6 +150,52 @@ public class GarbageCollectorIT extends ConfigurableMacIT {
   }
 
   @Test
+  public void gcDeleteDeadTServerWAL() throws Exception {
+    // Kill GC process
+    killMacGc();
+
+    // Create table and ingest data
+    Connector c = getConnector();
+    c.tableOperations().create("test_ingest");
+    c.tableOperations().setProperty("test_ingest", 
Property.TABLE_SPLIT_THRESHOLD.getKey(), "5K");
+    String tableId = 
getConnector().tableOperations().tableIdMap().get("test_ingest");
+    TestIngest.Opts opts = new TestIngest.Opts();
+    VerifyIngest.Opts vopts = new VerifyIngest.Opts();
+    vopts.rows = opts.rows = 10000;
+    vopts.cols = opts.cols = 1;
+    opts.setPrincipal("root");
+    vopts.setPrincipal("root");
+    TestIngest.ingest(c, opts, new BatchWriterOpts());
+
+    // Test WAL log has been created
+    List<String> walsBefore = getWALsForTableId(tableId);
+    Assert.assertEquals("Should be one WAL", 1, walsBefore.size());
+
+    // Flush and check for no WAL logs
+    c.tableOperations().flush("test_ingest", null, null, true);
+    List<String> walsAfter = getWALsForTableId(tableId);
+    Assert.assertEquals("Should be no WALs", 0, walsAfter.size());
+
+    // Validate WAL file still exists
+    String walFile = 
walsBefore.get(0).split("\\|")[0].replaceFirst("file:///", "");
+    File wf = new File(walFile);
+    Assert.assertEquals("WAL file does not exist", true, wf.exists());
+
+    // Kill TServer and give it some time to die and master to rebalance
+    killMacTServer();
+    UtilWaitThread.sleep(5000);
+
+    // Restart GC and let it run
+    Process gc = getCluster().exec(SimpleGarbageCollector.class);
+    UtilWaitThread.sleep(60000);
+
+    // Then check the log for proper events
+    String output = FunctionalTestUtils.readAll(getCluster(), 
SimpleGarbageCollector.class, gc);
+    assertTrue("WAL GC should have started", output.contains("Beginning 
garbage collection of write-ahead logs"));
+    assertTrue("WAL was not removed even though tserver was down", 
output.contains("Removing WAL for offline server"));
+  }
+
+  @Test
   public void gcLotsOfCandidatesIT() throws Exception {
     killMacGc();
 
@@ -284,6 +341,17 @@ public class GarbageCollectorIT extends ConfigurableMacIT {
     return Iterators.size(Arrays.asList(fs.globStatus(path)).iterator());
   }
 
+  private List<String> getWALsForTableId(String tableId) throws 
TableNotFoundException, AccumuloException, AccumuloSecurityException {
+    Scanner scanner = getConnector().createScanner("accumulo.metadata", 
Authorizations.EMPTY);
+    scanner.setRange(Range.prefix(new Text(tableId)));
+    scanner.fetchColumnFamily(new Text("log"));
+    List<String> walsList = new ArrayList<String>();
+    for (Entry<Key,Value> e : scanner) {
+      walsList.add(e.getValue().toString());
+    }
+    return walsList;
+  }
+
   public static void addEntries(Connector conn, BatchWriterOpts bwOpts) throws 
Exception {
     conn.securityOperations().grantTablePermission(conn.whoami(), 
MetadataTable.NAME, TablePermission.WRITE);
     BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, 
bwOpts.getBatchWriterConfig());

Reply via email to