ACCUMULO-2575 Fix bug in GC where files were being prematurely deleted and add tests to catch
After realizing the deadlock potential when not having tservers write the initial replication entry to the metadata table, the garbage collection code was not updated for the shift in data layout. On a clean system, when first beginning replication, the replication table will not yet exist, but replication entries will exist in the metadata table. The GC code still assuming that the lack of a replication table implied that there was no data pending replication and short-circuited early. We need to check the metadata table always, and, when present, also the replication table. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/53fc90fc Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/53fc90fc Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/53fc90fc Branch: refs/heads/ACCUMULO-378 Commit: 53fc90fcc783a37df666915cec45f3da3a290c7f Parents: 440d7fc Author: Josh Elser <els...@apache.org> Authored: Thu May 8 16:16:33 2014 -0400 Committer: Josh Elser <els...@apache.org> Committed: Thu May 8 16:16:33 2014 -0400 ---------------------------------------------------------------------- .../gc/GarbageCollectWriteAheadLogs.java | 70 ++++++++++++++------ .../gc/GarbageCollectWriteAheadLogsTest.java | 68 ++++++++++++++++++- .../replication/ReplicationWorkAssigner.java | 2 + .../test/replication/ReplicationIT.java | 6 +- .../test/replication/ReplicationWithGCIT.java | 60 ++++------------- test/src/test/resources/log4j.properties | 2 + 6 files changed, 135 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/53fc90fc/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java ---------------------------------------------------------------------- diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java index 6e34e43..950bc12 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java @@ -66,16 +66,18 @@ import org.apache.accumulo.trace.instrument.Trace; import org.apache.accumulo.trace.instrument.Tracer; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; -import org.apache.log4j.Logger; import org.apache.thrift.TException; import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.collect.Iterables; import com.google.common.net.HostAndPort; import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.TextFormat; public class GarbageCollectWriteAheadLogs { - private static final Logger log = Logger.getLogger(GarbageCollectWriteAheadLogs.class); + private static final Logger log = LoggerFactory.getLogger(GarbageCollectWriteAheadLogs.class); private final Instance instance; private final VolumeManager fs; @@ -191,7 +193,7 @@ public class GarbageCollectWriteAheadLogs { } catch (KeeperException.NoNodeException ex) { return false; } catch (Exception ex) { - log.debug(ex, ex); + log.debug(ex.toString(), ex); return true; } } @@ -349,13 +351,15 @@ public class GarbageCollectWriteAheadLogs { Entry<String,Path> wal = walIter.next(); String fullPath = wal.getValue().toString(); if (neededByReplication(conn, fullPath)) { - log.debug("Removing WAL from candidate deletion as it is still needed for replication: " + fullPath); + log.debug("Removing WAL from candidate deletion as it is still needed for replication: {} ", fullPath); // If we haven't already removed it, check to see if this WAL is // "in use" by replication (needed for replication purposes) status.currentLog.inUse++; walIter.remove(); sortedWALogs.remove(wal.getKey()); + } else { + log.debug("WAL not needed for replication {}", fullPath); } count++; } @@ -370,13 +374,25 @@ public class GarbageCollectWriteAheadLogs { */ protected boolean neededByReplication(Connector conn, String wal) { log.info("Checking replication table for " + wal); - Iterable<Entry<Key,Value>> iter; - try { - iter = getReplicationStatusForFile(conn, wal); - } catch (TableNotFoundException e) { - log.trace("Replication table was not found"); - return false; - } + + // try { + // log.info("Current state of Metadata table"); + // for (Entry<Key,Value> entry : conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) { + // log.info(entry.getKey().toStringNoTruncate() + "=" + TextFormat.shortDebugString(Status.parseFrom(entry.getValue().get()))); + // } + // } catch (Exception e) { + // log.error("Could not read metadata table"); + // } + // try { + // log.info("Current state of replication table"); + // for (Entry<Key,Value> entry : conn.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)) { + // log.info(entry.getKey().toStringNoTruncate() + "=" + TextFormat.shortDebugString(Status.parseFrom(entry.getValue().get()))); + // } + // } catch (Exception e) { + // log.error("Could not read replication table"); + // } + + Iterable<Entry<Key,Value>> iter = getReplicationStatusForFile(conn, wal); // TODO Push down this filter to the tserver to only return records // that are not completely replicated and convert this loop into a @@ -384,6 +400,7 @@ public class GarbageCollectWriteAheadLogs { for (Entry<Key,Value> entry : iter) { try { Status status = Status.parseFrom(entry.getValue().get()); + log.info("Checking if {} is safe for removal with {}", wal, TextFormat.shortDebugString(status)); if (!StatusUtil.isSafeForRemoval(status)) { return true; } @@ -395,21 +412,34 @@ public class GarbageCollectWriteAheadLogs { return false; } - protected Iterable<Entry<Key,Value>> getReplicationStatusForFile(Connector conn, String wal) throws TableNotFoundException { - Scanner replScanner = ReplicationTable.getScanner(conn); - - // Scan only the Status records - StatusSection.limit(replScanner); - // Only look for this specific WAL - replScanner.setRange(Range.exact(wal)); + protected Iterable<Entry<Key,Value>> getReplicationStatusForFile(Connector conn, String wal) { + Scanner metaScanner; + try { + metaScanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + } catch (TableNotFoundException e) { + throw new RuntimeException(e); + } - Scanner metaScanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); // Need to add in the replication section prefix metaScanner.setRange(Range.exact(ReplicationSection.getRowPrefix() + wal)); // Limit the column family to be sure metaScanner.fetchColumnFamily(ReplicationSection.COLF); - return Iterables.concat(replScanner, metaScanner); + try { + Scanner replScanner = ReplicationTable.getScanner(conn); + + // Scan only the Status records + StatusSection.limit(replScanner); + + // Only look for this specific WAL + replScanner.setRange(Range.exact(wal)); + + return Iterables.concat(metaScanner, replScanner); + } catch (TableNotFoundException e) { + // do nothing + } + + return metaScanner; } private int scanServers(Map<Path,String> fileToServerMap, Map<String,Path> nameToFileMap) throws Exception { http://git-wip-us.apache.org/repos/asf/accumulo/blob/53fc90fc/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java ---------------------------------------------------------------------- diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java index 749fc96..1a3995b 100644 --- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java +++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java @@ -29,17 +29,19 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.UUID; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.mock.MockInstance; import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.data.Key; @@ -59,13 +61,13 @@ import org.apache.accumulo.server.replication.ReplicationTable; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; -import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; +import com.google.common.collect.Iterables; import com.google.common.collect.Maps; public class GarbageCollectWriteAheadLogsTest { @@ -313,7 +315,7 @@ public class GarbageCollectWriteAheadLogsTest { } @Override - protected Iterable<Entry<Key,Value>> getReplicationStatusForFile(Connector conn, String wal) throws TableNotFoundException { + protected Iterable<Entry<Key,Value>> getReplicationStatusForFile(Connector conn, String wal) { return this.replData; } } @@ -434,4 +436,64 @@ public class GarbageCollectWriteAheadLogsTest { // Both should have been deleted Assert.assertEquals(0, nameToFileMap.size()); } + + @Test + public void noReplicationTableDoesntLimitMetatdataResults() throws Exception { + Instance inst = new MockInstance(testName.getMethodName()); + Connector conn = inst.getConnector("root", new PasswordToken("")); + + String wal = "hdfs://localhost:8020/accumulo/wal/tserver+port/123456-1234-1234-12345678"; + BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + Mutation m = new Mutation(ReplicationSection.getRowPrefix() + wal); + m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.newFileValue()); + bw.addMutation(m); + bw.close(); + + GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(inst, volMgr, false); + + Iterable<Entry<Key,Value>> data = gcWALs.getReplicationStatusForFile(conn, wal); + Entry<Key,Value> entry = Iterables.getOnlyElement(data); + + Assert.assertEquals(ReplicationSection.getRowPrefix() + wal, entry.getKey().getRow().toString()); + } + + @Test + public void fetchesReplicationEntriesFromMetadataAndReplicationTables() throws Exception { + Instance inst = new MockInstance(testName.getMethodName()); + Connector conn = inst.getConnector("root", new PasswordToken("")); + ReplicationTable.create(conn); + + String wal = "hdfs://localhost:8020/accumulo/wal/tserver+port/123456-1234-1234-12345678"; + BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + Mutation m = new Mutation(ReplicationSection.getRowPrefix() + wal); + m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.newFileValue()); + bw.addMutation(m); + bw.close(); + + bw = ReplicationTable.getBatchWriter(conn); + m = new Mutation(wal); + StatusSection.add(m, new Text("1"), StatusUtil.newFileValue()); + bw.addMutation(m); + bw.close(); + + GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(inst, volMgr, false); + + Iterable<Entry<Key,Value>> iter = gcWALs.getReplicationStatusForFile(conn, wal); + Map<Key,Value> data = new HashMap<>(); + for (Entry<Key,Value> e : iter) { + data.put(e.getKey(), e.getValue()); + } + + Assert.assertEquals(2, data.size()); + + // Should get one element from each table (metadata and replication) + for (Key k : data.keySet()) { + String row = k.getRow().toString(); + if (row.startsWith(ReplicationSection.getRowPrefix())) { + Assert.assertTrue(row.endsWith(wal)); + } else { + Assert.assertEquals(wal, row); + } + } + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/53fc90fc/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java index a6607e5..24842a9 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java +++ b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java @@ -177,6 +177,8 @@ public class ReplicationWorkAssigner extends Daemon { // Keep the state of the work we queued correct cleanupFinishedWork(); + + UtilWaitThread.sleep(conf.getTimeInMillis(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP)); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/53fc90fc/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java index d592716..0d4099c 100644 --- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java +++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java @@ -108,11 +108,11 @@ public class ReplicationIT extends ConfigurableMacIT { Scanner s = ReplicationTable.getScanner(connMaster); for (Entry<Key,Value> e : s) { - System.out.println(e.getKey().toStringNoTruncate() + " " + TextFormat.shortDebugString(Status.parseFrom(e.getValue().get()))); + log.info(e.getKey().toStringNoTruncate() + " " + TextFormat.shortDebugString(Status.parseFrom(e.getValue().get()))); } - System.out.println(); - System.out.println(); + log.info(""); + log.info(""); Thread.sleep(1000); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/53fc90fc/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java index 92f6fb3..b20a3bd 100644 --- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java +++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java @@ -55,6 +55,7 @@ import org.apache.accumulo.server.replication.ReplicationTable; import org.apache.accumulo.test.functional.ConfigurableMacIT; import org.apache.accumulo.tserver.TabletServer; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.io.Text; @@ -62,6 +63,7 @@ import org.junit.Assert; import org.junit.Test; import com.google.common.collect.Iterables; +import com.google.protobuf.TextFormat; /** * @@ -205,18 +207,6 @@ public class ReplicationWithGCIT extends ConfigurableMacIT { } bw.close(); - // System.out.println("**** WALs from metadata"); - // for (String metadataWal : metadataWals) { - // System.out.println(metadataWal); - // } - // - // System.out.println("**** WALs from replication"); - // s = ReplicationTable.getScanner(conn); - // StatusSection.limit(s); - // for (Entry<Key,Value> entry : s) { - // System.out.println(entry.getKey().toStringNoTruncate() + " " + Status.parseFrom(entry.getValue().get()).toString().replace("\n", ", ")); - // } - s = ReplicationTable.getScanner(conn); StatusSection.limit(s); bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig()); @@ -231,13 +221,6 @@ public class ReplicationWithGCIT extends ConfigurableMacIT { } bw.close(); - // System.out.println("**** WALs from replication"); - // s = ReplicationTable.getScanner(conn); - // StatusSection.limit(s); - // for (Entry<Key,Value> entry : s) { - // System.out.println(entry.getKey().toStringNoTruncate() + " " + Status.parseFrom(entry.getValue().get()).toString().replace("\n", ", ")); - // } - // Kill the tserver(s) and restart them // to ensure that the WALs we previously observed all move to closed. for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) { @@ -254,24 +237,6 @@ public class ReplicationWithGCIT extends ConfigurableMacIT { Entry<Key,Value> entry : s) {} } - // for (int i = 0; i < 5; i++) { - // s = conn.createScanner(MetadataTable.NAME, new Authorizations()); - // s.setRange(ReplicationSection.getRange()); - // System.out.println("**** Metadata"); - // for (Entry<Key,Value> entry : s) { - // System.out.println(entry.getKey().toStringNoTruncate() + " " + Status.parseFrom(entry.getValue().get()).toString().replace("\n", ", ")); - // } - // - // s = ReplicationTable.getScanner(conn); - // StatusSection.limit(s); - // System.out.println("**** Replication status"); - // for (Entry<Key,Value> entry : s) { - // System.out.println(entry.getKey().toStringNoTruncate() + " " + Status.parseFrom(entry.getValue().get()).toString().replace("\n", ", ")); - // } - // - // Thread.sleep(1000); - // } - try { boolean allClosed = true; for (int i = 0; i < 10; i++) { @@ -306,18 +271,10 @@ public class ReplicationWithGCIT extends ConfigurableMacIT { gc.waitFor(); } - // System.out.println("****** Replication table iterators"); - // System.out.println(conn.tableOperations().listIterators(ReplicationTable.NAME)); - // for (Entry<String,String> entry : conn.tableOperations().getProperties(ReplicationTable.NAME)) { - // System.out.println(entry.getKey()+ "=" + entry.getValue()); - // } - // System.out.println(); - - System.out.println("****** Final Replication logs before failure"); s = ReplicationTable.getScanner(conn); StatusSection.limit(s); for (Entry<Key,Value> entry : s) { - System.out.println(entry.getKey().toStringNoTruncate() + " " + Status.parseFrom(entry.getValue().get()).toString().replace("\n", ", ")); + log.info(entry.getKey().toStringNoTruncate() + " " + TextFormat.shortDebugString(Status.parseFrom(entry.getValue().get()))); } Assert.fail("Expected all replication records to be closed"); } @@ -325,6 +282,7 @@ public class ReplicationWithGCIT extends ConfigurableMacIT { @Test public void replicatedStatusEntriesAreDeleted() throws Exception { Connector conn = getConnector(); + FileSystem fs = FileSystem.getLocal(new Configuration()); String table1 = "table1"; // replication shouldn't exist when we begin @@ -469,7 +427,15 @@ public class ReplicationWithGCIT extends ConfigurableMacIT { Thread.sleep(1000); } - Assert.assertTrue("Did nto find any replication entries in the replication table", foundResults); + Assert.assertTrue("Did not find any replication entries in the replication table", foundResults); + + s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + s.setRange(ReplicationSection.getRange()); + for (Entry<Key,Value> entry : s) { + String row = entry.getKey().getRow().toString(); + Path file = new Path(row.substring(ReplicationSection.getRowPrefix().length())); + Assert.assertTrue(file + " did not exist when it should", fs.exists(file)); + } /** * After recovery completes, we should have unreplicated, closed Status messages. The close happens at the beginning of log recovery. http://git-wip-us.apache.org/repos/asf/accumulo/blob/53fc90fc/test/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/test/src/test/resources/log4j.properties b/test/src/test/resources/log4j.properties index 71c06e9..dd382f7 100644 --- a/test/src/test/resources/log4j.properties +++ b/test/src/test/resources/log4j.properties @@ -34,3 +34,5 @@ log4j.logger.org.apache.accumulo.core.file.rfile.bcfile=INFO log4j.logger.org.apache.accumulo.server.util.ReplicationTableUtil=TRACE log4j.logger.org.apache.accumulo.core.client.impl.TabletServerBatchReaderIterator=INFO log4j.logger.org.apache.accumulo.core.client.impl.ThriftScanner=INFO +log4j.logger.org.apache.accumulo.server.zookeeper.DistributedWorkQueue=INFO +log4j.logger.org.apache.accumulo.fate.zookeeper.DistributedReadWriteLock=WARN \ No newline at end of file