ACCUMULO-2575 Fix bug in GC where files were being prematurely deleted and add 
tests to catch

After realizing the deadlock potential when not having tservers write the 
initial
replication entry to the metadata table, the garbage collection code was not 
updated
for the shift in data layout. On a clean system, when first beginning 
replication,
the replication table will not yet exist, but replication entries will exist in 
the
metadata table.

The GC code still assuming that the lack of a replication table implied that
there was no data pending replication and short-circuited early. We need to 
check
the metadata table always, and, when present, also the replication table.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/53fc90fc
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/53fc90fc
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/53fc90fc

Branch: refs/heads/ACCUMULO-378
Commit: 53fc90fcc783a37df666915cec45f3da3a290c7f
Parents: 440d7fc
Author: Josh Elser <els...@apache.org>
Authored: Thu May 8 16:16:33 2014 -0400
Committer: Josh Elser <els...@apache.org>
Committed: Thu May 8 16:16:33 2014 -0400

----------------------------------------------------------------------
 .../gc/GarbageCollectWriteAheadLogs.java        | 70 ++++++++++++++------
 .../gc/GarbageCollectWriteAheadLogsTest.java    | 68 ++++++++++++++++++-
 .../replication/ReplicationWorkAssigner.java    |  2 +
 .../test/replication/ReplicationIT.java         |  6 +-
 .../test/replication/ReplicationWithGCIT.java   | 60 ++++-------------
 test/src/test/resources/log4j.properties        |  2 +
 6 files changed, 135 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/53fc90fc/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git 
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
 
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index 6e34e43..950bc12 100644
--- 
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ 
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -66,16 +66,18 @@ import org.apache.accumulo.trace.instrument.Trace;
 import org.apache.accumulo.trace.instrument.Tracer;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
 import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Iterables;
 import com.google.common.net.HostAndPort;
 import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.TextFormat;
 
 public class GarbageCollectWriteAheadLogs {
-  private static final Logger log = 
Logger.getLogger(GarbageCollectWriteAheadLogs.class);
+  private static final Logger log = 
LoggerFactory.getLogger(GarbageCollectWriteAheadLogs.class);
   
   private final Instance instance;
   private final VolumeManager fs;
@@ -191,7 +193,7 @@ public class GarbageCollectWriteAheadLogs {
     } catch (KeeperException.NoNodeException ex) {
       return false;
     } catch (Exception ex) {
-      log.debug(ex, ex);
+      log.debug(ex.toString(), ex);
       return true;
     }
   }
@@ -349,13 +351,15 @@ public class GarbageCollectWriteAheadLogs {
       Entry<String,Path> wal = walIter.next();
       String fullPath = wal.getValue().toString();
       if (neededByReplication(conn, fullPath)) {
-        log.debug("Removing WAL from candidate deletion as it is still needed 
for replication: " + fullPath);
+        log.debug("Removing WAL from candidate deletion as it is still needed 
for replication: {} ", fullPath);
         // If we haven't already removed it, check to see if this WAL is
         // "in use" by replication (needed for replication purposes)
         status.currentLog.inUse++;
 
         walIter.remove();
         sortedWALogs.remove(wal.getKey());
+      } else {
+        log.debug("WAL not needed for replication {}", fullPath);
       }
       count++;
     }
@@ -370,13 +374,25 @@ public class GarbageCollectWriteAheadLogs {
    */
   protected boolean neededByReplication(Connector conn, String wal) {
     log.info("Checking replication table for " + wal);
-    Iterable<Entry<Key,Value>> iter;
-    try {
-      iter = getReplicationStatusForFile(conn, wal);
-    } catch (TableNotFoundException e) {
-      log.trace("Replication table was not found");
-      return false;
-    }
+
+    // try {
+    // log.info("Current state of Metadata table");
+    // for (Entry<Key,Value> entry : conn.createScanner(MetadataTable.NAME, 
Authorizations.EMPTY)) {
+    // log.info(entry.getKey().toStringNoTruncate() + "=" + 
TextFormat.shortDebugString(Status.parseFrom(entry.getValue().get())));
+    // }
+    // } catch (Exception e) {
+    // log.error("Could not read metadata table");
+    // }
+    // try {
+    // log.info("Current state of replication table");
+    // for (Entry<Key,Value> entry : conn.createScanner(ReplicationTable.NAME, 
Authorizations.EMPTY)) {
+    // log.info(entry.getKey().toStringNoTruncate() + "=" + 
TextFormat.shortDebugString(Status.parseFrom(entry.getValue().get())));
+    // }
+    // } catch (Exception e) {
+    // log.error("Could not read replication table");
+    // }
+    
+    Iterable<Entry<Key,Value>> iter = getReplicationStatusForFile(conn, wal);
 
     // TODO Push down this filter to the tserver to only return records
     // that are not completely replicated and convert this loop into a
@@ -384,6 +400,7 @@ public class GarbageCollectWriteAheadLogs {
     for (Entry<Key,Value> entry : iter) {
       try {
         Status status = Status.parseFrom(entry.getValue().get());
+        log.info("Checking if {} is safe for removal with {}", wal, 
TextFormat.shortDebugString(status));
         if (!StatusUtil.isSafeForRemoval(status)) {
           return true;
         }
@@ -395,21 +412,34 @@ public class GarbageCollectWriteAheadLogs {
     return false;
   }
 
-  protected Iterable<Entry<Key,Value>> getReplicationStatusForFile(Connector 
conn, String wal) throws TableNotFoundException {
-    Scanner replScanner = ReplicationTable.getScanner(conn);
-
-    // Scan only the Status records
-    StatusSection.limit(replScanner);
-    // Only look for this specific WAL
-    replScanner.setRange(Range.exact(wal));
+  protected Iterable<Entry<Key,Value>> getReplicationStatusForFile(Connector 
conn, String wal) {
+    Scanner metaScanner;
+    try {
+      metaScanner = conn.createScanner(MetadataTable.NAME, 
Authorizations.EMPTY);
+    } catch (TableNotFoundException e) {
+      throw new RuntimeException(e);
+    }
 
-    Scanner metaScanner = conn.createScanner(MetadataTable.NAME, 
Authorizations.EMPTY);
     // Need to add in the replication section prefix
     metaScanner.setRange(Range.exact(ReplicationSection.getRowPrefix() + wal));
     // Limit the column family to be sure
     metaScanner.fetchColumnFamily(ReplicationSection.COLF);
 
-    return Iterables.concat(replScanner, metaScanner);
+    try {
+      Scanner replScanner = ReplicationTable.getScanner(conn);
+  
+      // Scan only the Status records
+      StatusSection.limit(replScanner);
+
+      // Only look for this specific WAL
+      replScanner.setRange(Range.exact(wal));
+
+      return Iterables.concat(metaScanner, replScanner);
+    } catch (TableNotFoundException e) {
+      // do nothing
+    }
+
+    return metaScanner;
   }
 
   private int scanServers(Map<Path,String> fileToServerMap, Map<String,Path> 
nameToFileMap) throws Exception {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/53fc90fc/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --git 
a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
 
b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index 749fc96..1a3995b 100644
--- 
a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ 
b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -29,17 +29,19 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.UUID;
 
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.data.Key;
@@ -59,13 +61,13 @@ import 
org.apache.accumulo.server.replication.ReplicationTable;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
-import org.easymock.EasyMock;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
 
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 
 public class GarbageCollectWriteAheadLogsTest {
@@ -313,7 +315,7 @@ public class GarbageCollectWriteAheadLogsTest {
     }
 
     @Override
-    protected Iterable<Entry<Key,Value>> getReplicationStatusForFile(Connector 
conn, String wal) throws TableNotFoundException {
+    protected Iterable<Entry<Key,Value>> getReplicationStatusForFile(Connector 
conn, String wal) {
       return this.replData;
     }
   }
@@ -434,4 +436,64 @@ public class GarbageCollectWriteAheadLogsTest {
     // Both should have been deleted
     Assert.assertEquals(0, nameToFileMap.size());
   }
+
+  @Test
+  public void noReplicationTableDoesntLimitMetatdataResults() throws Exception 
{
+    Instance inst = new MockInstance(testName.getMethodName());
+    Connector conn = inst.getConnector("root", new PasswordToken(""));
+
+    String wal = 
"hdfs://localhost:8020/accumulo/wal/tserver+port/123456-1234-1234-12345678";
+    BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new 
BatchWriterConfig());
+    Mutation m = new Mutation(ReplicationSection.getRowPrefix() + wal);
+    m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.newFileValue());
+    bw.addMutation(m);
+    bw.close();
+
+    GarbageCollectWriteAheadLogs gcWALs = new 
GarbageCollectWriteAheadLogs(inst, volMgr, false);
+
+    Iterable<Entry<Key,Value>> data = gcWALs.getReplicationStatusForFile(conn, 
wal);
+    Entry<Key,Value> entry = Iterables.getOnlyElement(data);
+
+    Assert.assertEquals(ReplicationSection.getRowPrefix() + wal, 
entry.getKey().getRow().toString());
+  }
+
+  @Test
+  public void fetchesReplicationEntriesFromMetadataAndReplicationTables() 
throws Exception {
+    Instance inst = new MockInstance(testName.getMethodName());
+    Connector conn = inst.getConnector("root", new PasswordToken(""));
+    ReplicationTable.create(conn);
+
+    String wal = 
"hdfs://localhost:8020/accumulo/wal/tserver+port/123456-1234-1234-12345678";
+    BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new 
BatchWriterConfig());
+    Mutation m = new Mutation(ReplicationSection.getRowPrefix() + wal);
+    m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.newFileValue());
+    bw.addMutation(m);
+    bw.close();
+
+    bw = ReplicationTable.getBatchWriter(conn);
+    m = new Mutation(wal);
+    StatusSection.add(m, new Text("1"), StatusUtil.newFileValue());
+    bw.addMutation(m);
+    bw.close();
+
+    GarbageCollectWriteAheadLogs gcWALs = new 
GarbageCollectWriteAheadLogs(inst, volMgr, false);
+
+    Iterable<Entry<Key,Value>> iter = gcWALs.getReplicationStatusForFile(conn, 
wal);
+    Map<Key,Value> data = new HashMap<>();
+    for (Entry<Key,Value> e : iter) {
+      data.put(e.getKey(), e.getValue());
+    }
+
+    Assert.assertEquals(2, data.size());
+
+    // Should get one element from each table (metadata and replication)
+    for (Key k : data.keySet()) {
+      String row = k.getRow().toString();
+      if (row.startsWith(ReplicationSection.getRowPrefix())) {
+        Assert.assertTrue(row.endsWith(wal));
+      } else {
+        Assert.assertEquals(wal, row);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/53fc90fc/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java
----------------------------------------------------------------------
diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java
 
b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java
index a6607e5..24842a9 100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java
@@ -177,6 +177,8 @@ public class ReplicationWorkAssigner extends Daemon {
 
       // Keep the state of the work we queued correct
       cleanupFinishedWork();
+
+      
UtilWaitThread.sleep(conf.getTimeInMillis(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/53fc90fc/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java 
b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
index d592716..0d4099c 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
@@ -108,11 +108,11 @@ public class ReplicationIT extends ConfigurableMacIT {
       
       Scanner s = ReplicationTable.getScanner(connMaster);
       for (Entry<Key,Value> e : s) {
-        System.out.println(e.getKey().toStringNoTruncate() + " " + 
TextFormat.shortDebugString(Status.parseFrom(e.getValue().get())));
+        log.info(e.getKey().toStringNoTruncate() + " " + 
TextFormat.shortDebugString(Status.parseFrom(e.getValue().get())));
       }
 
-      System.out.println();
-      System.out.println();
+      log.info("");
+      log.info("");
 
       Thread.sleep(1000);
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/53fc90fc/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
 
b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
index 92f6fb3..b20a3bd 100644
--- 
a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
+++ 
b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
@@ -55,6 +55,7 @@ import 
org.apache.accumulo.server.replication.ReplicationTable;
 import org.apache.accumulo.test.functional.ConfigurableMacIT;
 import org.apache.accumulo.tserver.TabletServer;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.io.Text;
@@ -62,6 +63,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import com.google.common.collect.Iterables;
+import com.google.protobuf.TextFormat;
 
 /**
  * 
@@ -205,18 +207,6 @@ public class ReplicationWithGCIT extends ConfigurableMacIT 
{
     }
     bw.close();
 
-    // System.out.println("**** WALs from metadata");
-    // for (String metadataWal : metadataWals) {
-    // System.out.println(metadataWal);
-    // }
-    //
-    // System.out.println("**** WALs from replication");
-    // s = ReplicationTable.getScanner(conn);
-    // StatusSection.limit(s);
-    // for (Entry<Key,Value> entry : s) {
-    // System.out.println(entry.getKey().toStringNoTruncate() + " " + 
Status.parseFrom(entry.getValue().get()).toString().replace("\n", ", "));
-    // }
-
     s = ReplicationTable.getScanner(conn);
     StatusSection.limit(s);
     bw = conn.createBatchWriter(ReplicationTable.NAME, new 
BatchWriterConfig());
@@ -231,13 +221,6 @@ public class ReplicationWithGCIT extends ConfigurableMacIT 
{
     }
     bw.close();
 
-    // System.out.println("**** WALs from replication");
-    // s = ReplicationTable.getScanner(conn);
-    // StatusSection.limit(s);
-    // for (Entry<Key,Value> entry : s) {
-    // System.out.println(entry.getKey().toStringNoTruncate() + " " + 
Status.parseFrom(entry.getValue().get()).toString().replace("\n", ", "));
-    // }
-
     // Kill the tserver(s) and restart them
     // to ensure that the WALs we previously observed all move to closed.
     for (ProcessReference proc : 
cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
@@ -254,24 +237,6 @@ public class ReplicationWithGCIT extends ConfigurableMacIT 
{
       Entry<Key,Value> entry : s) {}
     }
 
-    // for (int i = 0; i < 5; i++) {
-    // s = conn.createScanner(MetadataTable.NAME, new Authorizations());
-    // s.setRange(ReplicationSection.getRange());
-    // System.out.println("**** Metadata");
-    // for (Entry<Key,Value> entry : s) {
-    // System.out.println(entry.getKey().toStringNoTruncate() + " " + 
Status.parseFrom(entry.getValue().get()).toString().replace("\n", ", "));
-    // }
-    //
-    // s = ReplicationTable.getScanner(conn);
-    // StatusSection.limit(s);
-    // System.out.println("**** Replication status");
-    // for (Entry<Key,Value> entry : s) {
-    // System.out.println(entry.getKey().toStringNoTruncate() + " " + 
Status.parseFrom(entry.getValue().get()).toString().replace("\n", ", "));
-    // }
-    //
-    // Thread.sleep(1000);
-    // }
-
     try {
       boolean allClosed = true;
       for (int i = 0; i < 10; i++) {
@@ -306,18 +271,10 @@ public class ReplicationWithGCIT extends 
ConfigurableMacIT {
       gc.waitFor();
     }
 
-    // System.out.println("****** Replication table iterators");
-    // 
System.out.println(conn.tableOperations().listIterators(ReplicationTable.NAME));
-    // for (Entry<String,String> entry : 
conn.tableOperations().getProperties(ReplicationTable.NAME)) {
-    // System.out.println(entry.getKey()+ "=" + entry.getValue());
-    // }
-    // System.out.println();
-
-    System.out.println("****** Final Replication logs before failure");
     s = ReplicationTable.getScanner(conn);
     StatusSection.limit(s);
     for (Entry<Key,Value> entry : s) {
-      System.out.println(entry.getKey().toStringNoTruncate() + " " + 
Status.parseFrom(entry.getValue().get()).toString().replace("\n", ", "));
+      log.info(entry.getKey().toStringNoTruncate() + " " + 
TextFormat.shortDebugString(Status.parseFrom(entry.getValue().get())));
     }
     Assert.fail("Expected all replication records to be closed");
   }
@@ -325,6 +282,7 @@ public class ReplicationWithGCIT extends ConfigurableMacIT {
   @Test
   public void replicatedStatusEntriesAreDeleted() throws Exception {
     Connector conn = getConnector();
+    FileSystem fs = FileSystem.getLocal(new Configuration());
     String table1 = "table1";
 
     // replication shouldn't exist when we begin
@@ -469,7 +427,15 @@ public class ReplicationWithGCIT extends ConfigurableMacIT 
{
       Thread.sleep(1000);
     }
 
-    Assert.assertTrue("Did nto find any replication entries in the replication 
table", foundResults);
+    Assert.assertTrue("Did not find any replication entries in the replication 
table", foundResults);
+
+    s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    s.setRange(ReplicationSection.getRange());
+    for (Entry<Key,Value> entry : s) {
+      String row = entry.getKey().getRow().toString();
+      Path file = new 
Path(row.substring(ReplicationSection.getRowPrefix().length()));
+      Assert.assertTrue(file + " did not exist when it should", 
fs.exists(file));
+    }
 
     /**
      * After recovery completes, we should have unreplicated, closed Status 
messages. The close happens at the beginning of log recovery.

http://git-wip-us.apache.org/repos/asf/accumulo/blob/53fc90fc/test/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/test/src/test/resources/log4j.properties 
b/test/src/test/resources/log4j.properties
index 71c06e9..dd382f7 100644
--- a/test/src/test/resources/log4j.properties
+++ b/test/src/test/resources/log4j.properties
@@ -34,3 +34,5 @@ log4j.logger.org.apache.accumulo.core.file.rfile.bcfile=INFO
 log4j.logger.org.apache.accumulo.server.util.ReplicationTableUtil=TRACE
 
log4j.logger.org.apache.accumulo.core.client.impl.TabletServerBatchReaderIterator=INFO
 log4j.logger.org.apache.accumulo.core.client.impl.ThriftScanner=INFO
+log4j.logger.org.apache.accumulo.server.zookeeper.DistributedWorkQueue=INFO
+log4j.logger.org.apache.accumulo.fate.zookeeper.DistributedReadWriteLock=WARN
\ No newline at end of file

Reply via email to