ACCUMULO-2575 Also check metadata table to account for status updates that haven't made it to replication table yet
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/2372c71d Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/2372c71d Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/2372c71d Branch: refs/heads/ACCUMULO-378 Commit: 2372c71df8ea7aa5299a93481d6b84088cc85049 Parents: b5cd35a Author: Josh Elser <els...@apache.org> Authored: Thu May 8 13:26:09 2014 -0400 Committer: Josh Elser <els...@apache.org> Committed: Thu May 8 13:26:09 2014 -0400 ---------------------------------------------------------------------- .../gc/GarbageCollectWriteAheadLogs.java | 18 +++-- .../gc/GarbageCollectWriteAheadLogsTest.java | 76 ++++++++++++++++++-- 2 files changed, 86 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/2372c71d/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java ---------------------------------------------------------------------- diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java index c373dd7..6e34e43 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java @@ -41,9 +41,12 @@ import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.gc.thrift.GCStatus; import org.apache.accumulo.core.gc.thrift.GcCycleStats; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection; import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; import org.apache.accumulo.core.replication.StatusUtil; import org.apache.accumulo.core.replication.proto.Replication.Status; +import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.Credentials; import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; @@ -67,6 +70,7 @@ import org.apache.log4j.Logger; import org.apache.thrift.TException; import org.apache.zookeeper.KeeperException; +import com.google.common.collect.Iterables; import com.google.common.net.HostAndPort; import com.google.protobuf.InvalidProtocolBufferException; @@ -392,14 +396,20 @@ public class GarbageCollectWriteAheadLogs { } protected Iterable<Entry<Key,Value>> getReplicationStatusForFile(Connector conn, String wal) throws TableNotFoundException { - Scanner s = ReplicationTable.getScanner(conn); + Scanner replScanner = ReplicationTable.getScanner(conn); // Scan only the Status records - StatusSection.limit(s); + StatusSection.limit(replScanner); // Only look for this specific WAL - s.setRange(Range.exact(wal)); + replScanner.setRange(Range.exact(wal)); - return s; + Scanner metaScanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + // Need to add in the replication section prefix + metaScanner.setRange(Range.exact(ReplicationSection.getRowPrefix() + wal)); + // Limit the column family to be sure + metaScanner.fetchColumnFamily(ReplicationSection.COLF); + + return Iterables.concat(replScanner, metaScanner); } private int scanServers(Map<Path,String> fileToServerMap, Map<String,Path> nameToFileMap) throws Exception { http://git-wip-us.apache.org/repos/asf/accumulo/blob/2372c71d/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java ---------------------------------------------------------------------- diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java index 11390f5..749fc96 100644 --- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java +++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java @@ -46,6 +46,9 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.gc.thrift.GCStatus; +import org.apache.accumulo.core.gc.thrift.GcCycleStats; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection; import org.apache.accumulo.core.protobuf.ProtobufUtil; import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; import org.apache.accumulo.core.replication.StatusUtil; @@ -56,9 +59,12 @@ import org.apache.accumulo.server.replication.ReplicationTable; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; +import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestName; import com.google.common.collect.Maps; @@ -76,7 +82,9 @@ public class GarbageCollectWriteAheadLogsTest { private VolumeManager volMgr; private GarbageCollectWriteAheadLogs gcwal; private long modTime; - private GCStatus status; + + @Rule + public TestName testName = new TestName(); @Before public void setUp() throws Exception { @@ -84,7 +92,6 @@ public class GarbageCollectWriteAheadLogsTest { volMgr = createMock(VolumeManager.class); gcwal = new GarbageCollectWriteAheadLogs(instance, volMgr, false); modTime = System.currentTimeMillis(); - status = createMock(GCStatus.class); } @Test @@ -290,6 +297,7 @@ public class GarbageCollectWriteAheadLogsTest { assertFalse(GarbageCollectWriteAheadLogs.isUUID(null)); } + // It was easier to do this than get the mocking working for me private static class ReplicationGCWAL extends GarbageCollectWriteAheadLogs { private List<Entry<Key,Value>> replData; @@ -345,9 +353,11 @@ public class GarbageCollectWriteAheadLogsTest { public void removeReplicationEntries() throws Exception { String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString(); - Instance inst = new MockInstance("removeReplicationEntries"); + Instance inst = new MockInstance(testName.getMethodName()); Credentials creds = new Credentials("root", new PasswordToken("")); Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken()); + + GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(inst, volMgr, false); ReplicationTable.create(conn); @@ -359,11 +369,69 @@ public class GarbageCollectWriteAheadLogsTest { StatusSection.add(m, new Text("1"), StatusUtil.newFileValue()); bw.addMutation(m); + // These WALs are potential candidates for deletion from fs Map<String,Path> nameToFileMap = new HashMap<>(); + nameToFileMap.put(file1, new Path("/wals/" + file1)); + nameToFileMap.put(file2, new Path("/wals/" + file2)); + Map<String,Path> sortedWALogs = Collections.emptyMap(); - gcwal.removeReplicationEntries(nameToFileMap, sortedWALogs, this.status, creds); + // Make the GCStatus and GcCycleStats + GCStatus status = new GCStatus(); + GcCycleStats cycleStats = new GcCycleStats(); + status.currentLog = cycleStats; + + // We should iterate over two entries + Assert.assertEquals(2, gcWALs.removeReplicationEntries(nameToFileMap, sortedWALogs, status, creds)); + + // We should have noted that two files were still in use + Assert.assertEquals(2l, cycleStats.inUse); + + // Both should have been deleted + Assert.assertEquals(0, nameToFileMap.size()); + } + + @Test + public void replicationEntriesOnlyInMetaPreventGC() throws Exception { + String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString(); + + Instance inst = new MockInstance(testName.getMethodName()); + Credentials creds = new Credentials("root", new PasswordToken("")); + Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken()); + + GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(inst, volMgr, false); + + ReplicationTable.create(conn); + + // Write some records to the metadata table, we haven't yet written status records to the replication table + BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + Mutation m = new Mutation(ReplicationSection.getRowPrefix() + "/wals/" + file1); + m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.newFileValue()); + bw.addMutation(m); + + m = new Mutation(ReplicationSection.getRowPrefix() + "/wals/" + file2); + m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.newFileValue()); + bw.addMutation(m); + + // These WALs are potential candidates for deletion from fs + Map<String,Path> nameToFileMap = new HashMap<>(); + nameToFileMap.put(file1, new Path("/wals/" + file1)); + nameToFileMap.put(file2, new Path("/wals/" + file2)); + + Map<String,Path> sortedWALogs = Collections.emptyMap(); + + // Make the GCStatus and GcCycleStats objects + GCStatus status = new GCStatus(); + GcCycleStats cycleStats = new GcCycleStats(); + status.currentLog = cycleStats; + + // We should iterate over two entries + Assert.assertEquals(2, gcWALs.removeReplicationEntries(nameToFileMap, sortedWALogs, status, creds)); + + // We should have noted that two files were still in use + Assert.assertEquals(2l, cycleStats.inUse); + // Both should have been deleted Assert.assertEquals(0, nameToFileMap.size()); } }