Repository: accumulo Updated Branches: refs/heads/1.6 679a1c3eb -> 768a73c50 refs/heads/1.7 cead3978f -> e08b538eb refs/heads/1.8 fbcaa9027 -> 31c3a7982 refs/heads/master 184a18f70 -> 0d2b9bdcc
ACCUMULO-4428 Fix state of GC firstSeenDead map The GC's map of host->timestamp is used to track when it has first seen a dead tserver, however a new instance of GarbageCollectWriteAheadLogs is used during each cycle of the GC. The state information is lost. The state is now managed by SimpleGarbageCollector, passing this state into each cycle of the GCWriteAheadLogs class. Closes apache/accumulo#143 Signed-off-by: Josh Elser <els...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/9d4a46a0 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/9d4a46a0 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/9d4a46a0 Branch: refs/heads/1.6 Commit: 9d4a46a0c207bf89372aba585edbb65d4e141ee4 Parents: 679a1c3 Author: Adam J. Shook <adamjsh...@gmail.com> Authored: Wed Aug 31 13:45:59 2016 -0400 Committer: Josh Elser <els...@apache.org> Committed: Fri Sep 2 16:08:51 2016 -0400 ---------------------------------------------------------------------- .../gc/GarbageCollectWriteAheadLogs.java | 7 +- .../accumulo/gc/SimpleGarbageCollector.java | 6 +- .../gc/GarbageCollectWriteAheadLogsTest.java | 16 +++-- .../test/functional/GarbageCollectorIT.java | 68 ++++++++++++++++++++ 4 files changed, 87 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d4a46a0/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java ---------------------------------------------------------------------- diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java index a22a34e..3a8169e 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java @@ -64,7 +64,7 @@ public class GarbageCollectWriteAheadLogs { private final Instance instance; private final VolumeManager fs; - private final Map<HostAndPort,Long> firstSeenDead = new HashMap<HostAndPort,Long>(); + private final Map<HostAndPort,Long> firstSeenDead; private boolean useTrash; @@ -77,11 +77,14 @@ public class GarbageCollectWriteAheadLogs { * volume manager to use * @param useTrash * true to move files to trash rather than delete them + * @param firstSeenDead + * mutable map of a host to when it was first seen dead */ - GarbageCollectWriteAheadLogs(Instance instance, VolumeManager fs, boolean useTrash) throws IOException { + GarbageCollectWriteAheadLogs(Instance instance, VolumeManager fs, boolean useTrash, Map<HostAndPort,Long> firstSeenDead) throws IOException { this.instance = instance; this.fs = fs; this.useTrash = useTrash; + this.firstSeenDead = firstSeenDead; } /** http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d4a46a0/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java ---------------------------------------------------------------------- diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index 0191f38..36ca17a 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -19,6 +19,7 @@ package org.apache.accumulo.gc; import java.io.FileNotFoundException; import java.io.IOException; import java.net.UnknownHostException; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -535,6 +536,9 @@ public class SimpleGarbageCollector implements Iface { Sampler sampler = new CountSampler(100); + // Map of tserver -> timestamp, used by the GCWriteAheadLogs to track state of when a tablet server went down + final Map<HostAndPort,Long> firstSeenDead = new HashMap<>(); + while (true) { if (sampler.next()) Trace.on("gc"); @@ -568,7 +572,7 @@ public class SimpleGarbageCollector implements Iface { // Clean up any unused write-ahead logs Span waLogs = Trace.start("walogs"); try { - GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs(instance, fs, isUsingTrash()); + GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs(instance, fs, isUsingTrash(), firstSeenDead); log.info("Beginning garbage collection of write-ahead logs"); walogCollector.collect(status); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d4a46a0/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java ---------------------------------------------------------------------- diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java index 03f5c96..24efa09 100644 --- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java +++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java @@ -77,12 +77,14 @@ public class GarbageCollectWriteAheadLogsTest { private VolumeManager volMgr; private GarbageCollectWriteAheadLogs gcwal; private long modTime; + private Map<HostAndPort,Long> firstSeenDead; @Before public void setUp() throws Exception { instance = createMock(Instance.class); volMgr = createMock(VolumeManager.class); - gcwal = new GarbageCollectWriteAheadLogs(instance, volMgr, false); + firstSeenDead = new HashMap<>(); + gcwal = new GarbageCollectWriteAheadLogs(instance, volMgr, false, firstSeenDead); modTime = System.currentTimeMillis(); } @@ -344,8 +346,8 @@ public class GarbageCollectWriteAheadLogsTest { private boolean holdsLockBool = false; - public GCWALPartialMock(Instance i, VolumeManager vm, boolean useTrash, boolean holdLock) throws IOException { - super(i, vm, useTrash); + public GCWALPartialMock(Instance i, VolumeManager vm, boolean useTrash, Map<HostAndPort,Long> firstSeenDead, boolean holdLock) throws IOException { + super(i, vm, useTrash, firstSeenDead); this.holdsLockBool = holdLock; } @@ -378,7 +380,7 @@ public class GarbageCollectWriteAheadLogsTest { } private GCWALPartialMock getGCWALForRemoveFileTest(GCStatus s, final boolean locked) throws IOException { - return new GCWALPartialMock(new MockInstance("accumulo"), VolumeManagerImpl.get(), false, locked); + return new GCWALPartialMock(new MockInstance("accumulo"), VolumeManagerImpl.get(), false, firstSeenDead, locked); } private Map<String,Path> getEmptyMap() { @@ -472,8 +474,8 @@ public class GarbageCollectWriteAheadLogsTest { class GCWALDeadTserverCollectMock extends GarbageCollectWriteAheadLogs { - public GCWALDeadTserverCollectMock(Instance i, VolumeManager vm, boolean useTrash) throws IOException { - super(i, vm, useTrash); + public GCWALDeadTserverCollectMock(Instance i, VolumeManager vm, boolean useTrash, Map<HostAndPort,Long> firstSeenDead) throws IOException { + super(ctx, vm, useTrash, firstSeenDead); } @Override @@ -522,7 +524,7 @@ public class GarbageCollectWriteAheadLogsTest { try { VolumeManager vm = VolumeManagerImpl.getLocal(walDir.toString()); - GarbageCollectWriteAheadLogs gcwal2 = new GCWALDeadTserverCollectMock(i, vm, false); + GarbageCollectWriteAheadLogs gcwal2 = new GCWALDeadTserverCollectMock(i, vm, false, firstSeenDead); GCStatus status = new GCStatus(new GcCycleStats(), new GcCycleStats(), new GcCycleStats(), new GcCycleStats()); gcwal2.collect(status); http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d4a46a0/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java b/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java index b7f3396..94cd1fc 100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java @@ -19,7 +19,9 @@ package org.apache.accumulo.test.functional; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -30,15 +32,19 @@ import java.util.Map.Entry; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.cli.BatchWriterOpts; import org.apache.accumulo.core.cli.ScannerOpts; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.ZooKeeperInstance; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.schema.MetadataSchema; @@ -85,6 +91,7 @@ public class GarbageCollectorIT extends ConfigurableMacIT { settings.put(Property.GC_CYCLE_START.getKey(), "1"); settings.put(Property.GC_CYCLE_DELAY.getKey(), "1"); settings.put(Property.GC_PORT.getKey(), "0"); + settings.put(Property.GC_WAL_DEAD_SERVER_WAIT, "1s"); settings.put(Property.TSERV_MAXMEM.getKey(), "5K"); settings.put(Property.TSERV_MAJC_DELAY.getKey(), "1"); cfg.setSiteConfig(settings); @@ -108,6 +115,10 @@ public class GarbageCollectorIT extends ConfigurableMacIT { assertNull(getCluster().getProcesses().get(ServerType.GARBAGE_COLLECTOR)); } + private void killMacTServer() throws ProcessNotFoundException, InterruptedException, KeeperException { + getCluster().killProcess(ServerType.TABLET_SERVER, getCluster().getProcesses().get(ServerType.TABLET_SERVER).iterator().next()); + } + @Test public void gcTest() throws Exception { killMacGc(); @@ -138,6 +149,52 @@ public class GarbageCollectorIT extends ConfigurableMacIT { } @Test + public void gcDeleteDeadTServerWAL() throws Exception { + // Kill GC process + killMacGc(); + + // Create table and ingest data + Connector c = getConnector(); + c.tableOperations().create("test_ingest"); + c.tableOperations().setProperty("test_ingest", Property.TABLE_SPLIT_THRESHOLD.getKey(), "5K"); + String tableId = getConnector().tableOperations().tableIdMap().get("test_ingest"); + TestIngest.Opts opts = new TestIngest.Opts(); + VerifyIngest.Opts vopts = new VerifyIngest.Opts(); + vopts.rows = opts.rows = 10000; + vopts.cols = opts.cols = 1; + opts.setPrincipal("root"); + vopts.setPrincipal("root"); + TestIngest.ingest(c, opts, new BatchWriterOpts()); + + // Test WAL log has been created + List<String> walsBefore = getWALsForTableId(tableId); + Assert.assertEquals("Should be one WAL", 1, walsBefore.size()); + + // Flush and check for no WAL logs + c.tableOperations().flush("test_ingest", null, null, true); + List<String> walsAfter = getWALsForTableId(tableId); + Assert.assertEquals("Should be no WALs", 0, walsAfter.size()); + + // Validate WAL file still exists + String walFile = walsBefore.get(0).split("\\|")[0].replaceFirst("file:///", ""); + File wf = new File(walFile); + Assert.assertEquals("WAL file does not exist", true, wf.exists()); + + // Kill TServer and give it some time to die and master to rebalance + killMacTServer(); + UtilWaitThread.sleep(5000); + + // Restart GC and let it run + Process gc = getCluster().exec(SimpleGarbageCollector.class); + UtilWaitThread.sleep(60000); + + // Then check the log for proper events + String output = FunctionalTestUtils.readAll(getCluster(), SimpleGarbageCollector.class, gc); + assertTrue("WAL GC should have started", output.contains("Beginning garbage collection of write-ahead logs")); + assertTrue("WAL was not removed even though tserver was down", output.contains("Removing WAL for offline server")); + } + + @Test public void gcLotsOfCandidatesIT() throws Exception { killMacGc(); @@ -288,6 +345,17 @@ public class GarbageCollectorIT extends ConfigurableMacIT { return result; } + private List<String> getWALsForTableId(String tableId) throws TableNotFoundException, AccumuloException, AccumuloSecurityException { + Scanner scanner = getConnector().createScanner("accumulo.metadata", Authorizations.EMPTY); + scanner.setRange(Range.prefix(new Text(tableId))); + scanner.fetchColumnFamily(new Text("log")); + List<String> walsList = new ArrayList<String>(); + for (Entry<Key,Value> e : scanner) { + walsList.add(e.getValue().toString()); + } + return walsList; + } + public static void addEntries(Connector conn, BatchWriterOpts bwOpts) throws Exception { conn.securityOperations().grantTablePermission(conn.whoami(), MetadataTable.NAME, TablePermission.WRITE); BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, bwOpts.getBatchWriterConfig());