Repository: accumulo
Updated Branches:
  refs/heads/master 67249ec2e -> b7a529b75


ACCUMULO-3423 more fixes for replication


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ecf2298e
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ecf2298e
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ecf2298e

Branch: refs/heads/master
Commit: ecf2298eb9b764c01edc3f16f0c0cb6e6c4006cb
Parents: 844166a
Author: Eric C. Newton <eric.new...@gmail.com>
Authored: Mon Jun 15 09:59:38 2015 -0400
Committer: Eric C. Newton <eric.new...@gmail.com>
Committed: Mon Jun 15 09:59:38 2015 -0400

----------------------------------------------------------------------
 .../accumulo/server/log/WalStateManager.java    |  8 ++++-
 .../CloseWriteAheadLogReferences.java           |  2 +-
 .../java/org/apache/accumulo/master/Master.java | 11 +++++++
 .../accumulo/master/TabletGroupWatcher.java     |  4 ++-
 .../test/replication/ReplicationIT.java         | 31 +++++---------------
 5 files changed, 29 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/ecf2298e/server/base/src/main/java/org/apache/accumulo/server/log/WalStateManager.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/log/WalStateManager.java 
b/server/base/src/main/java/org/apache/accumulo/server/log/WalStateManager.java
index 1540938..52844c1 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/log/WalStateManager.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/log/WalStateManager.java
@@ -34,6 +34,8 @@ import 
org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.hadoop.fs.Path;
 import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /*
  * This class governs the space in Zookeeper that advertises the status of 
Write-Ahead Logs
@@ -68,6 +70,8 @@ public class WalStateManager {
     }
   }
 
+  private static final Logger log = 
LoggerFactory.getLogger(WalStateManager.class);
+
   public final static String ZWALS = "/wals";
 
   public static enum WalState {
@@ -113,6 +117,7 @@ public class WalStateManager {
       if (state == WalState.OPEN) {
         policy = NodeExistsPolicy.FAIL;
       }
+      log.debug("Setting {} to {}", path.getName(), state);
       zoo.putPersistentData(root() + "/" + tsi.toString() + "/" + 
path.getName(), data, policy);
     } catch (KeeperException | InterruptedException e) {
       throw new WalMarkerException(e);
@@ -193,6 +198,7 @@ public class WalStateManager {
   // garbage collector knows it's safe to remove the marker for a closed log
   public void removeWalMarker(TServerInstance instance, UUID uuid) throws 
WalMarkerException {
     try {
+      log.debug("Removing {}", uuid);
       String path = root() + "/" + instance.toString() + "/" + uuid.toString();
       zoo.delete(path, -1);
     } catch (InterruptedException | KeeperException e) {
@@ -211,8 +217,8 @@ public class WalStateManager {
   }
 
   // tablet server can mark the log as closed (but still needed), for 
replication to begin
+  // master can mark a log as unreferenced after it has made log recovery 
markers on the tablets that need to be recovered
   public void closeWal(TServerInstance instance, Path path) throws 
WalMarkerException {
     updateState(instance, path, WalState.CLOSED);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ecf2298e/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
----------------------------------------------------------------------
diff --git 
a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
 
b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
index 8857939..0c09396 100644
--- 
a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
+++ 
b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
@@ -185,7 +185,7 @@ public class CloseWriteAheadLogReferences implements 
Runnable {
 
         // We only want to clean up WALs (which is everything but rfiles) and 
only when
         // metadata doesn't have a reference to the given WAL
-        if (!status.getClosed() && !replFile.endsWith(RFILE_SUFFIX) && 
!isClosed) {
+        if (!status.getClosed() && !replFile.endsWith(RFILE_SUFFIX) && 
isClosed) {
           try {
             closeWal(bw, entry.getKey());
             recordsClosed++;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ecf2298e/server/master/src/main/java/org/apache/accumulo/master/Master.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java 
b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index 9a324fb..0cf84f2 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -97,6 +97,8 @@ import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManager.FileType;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.server.init.Initialize;
+import org.apache.accumulo.server.log.WalStateManager;
+import org.apache.accumulo.server.log.WalStateManager.WalMarkerException;
 import org.apache.accumulo.server.master.LiveTServerSet;
 import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
 import org.apache.accumulo.server.master.balancer.DefaultLoadBalancer;
@@ -1586,4 +1588,13 @@ public class Master extends AccumuloServerContext 
implements LiveTServerSet.List
       return new HashSet<TServerInstance>(serversToShutdown);
     }
   }
+
+  public void markDeadServerLogsAsClosed(Map<TServerInstance,List<Path>> 
logsForDeadServers) throws WalMarkerException {
+    WalStateManager mgr = new WalStateManager(this.inst, 
ZooReaderWriter.getInstance());
+    for (Entry<TServerInstance,List<Path>> server : 
logsForDeadServers.entrySet()) {
+      for (Path path : server.getValue()) {
+          mgr.closeWal(server.getKey(), path);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ecf2298e/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
----------------------------------------------------------------------
diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
 
b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
index d55781e..d2cbf62 100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
@@ -69,6 +69,7 @@ import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.fs.FileRef;
 import org.apache.accumulo.server.fs.VolumeManager.FileType;
 import org.apache.accumulo.server.log.WalStateManager;
+import org.apache.accumulo.server.log.WalStateManager.WalMarkerException;
 import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
 import org.apache.accumulo.server.master.state.Assignment;
 import org.apache.accumulo.server.master.state.ClosableIterator;
@@ -748,12 +749,13 @@ class TabletGroupWatcher extends Daemon {
 
   private void flushChanges(SortedMap<TServerInstance,TabletServerStatus> 
currentTServers, List<Assignment> assignments, List<Assignment> assigned,
       List<TabletLocationState> assignedToDeadServers, 
Map<TServerInstance,List<Path>> logsForDeadServers, 
Map<KeyExtent,TServerInstance> unassigned)
-      throws DistributedStoreException, TException {
+      throws DistributedStoreException, TException, WalMarkerException {
     if (!assignedToDeadServers.isEmpty()) {
       int maxServersToShow = min(assignedToDeadServers.size(), 100);
       Master.log.debug(assignedToDeadServers.size() + " assigned to dead 
servers: " + assignedToDeadServers.subList(0, maxServersToShow) + "...");
       Master.log.debug("logs for dead servers: " + logsForDeadServers);
       store.unassign(assignedToDeadServers, logsForDeadServers);
+      this.master.markDeadServerLogsAsClosed(logsForDeadServers);
       this.master.nextEvent.event("Marked %d tablets as unassigned because 
they don't have current servers", assignedToDeadServers.size());
     }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ecf2298e/test/src/main/java/org/apache/accumulo/test/replication/ReplicationIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/accumulo/test/replication/ReplicationIT.java 
b/test/src/main/java/org/apache/accumulo/test/replication/ReplicationIT.java
index 55379a4..e0a9121 100644
--- a/test/src/main/java/org/apache/accumulo/test/replication/ReplicationIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/replication/ReplicationIT.java
@@ -435,26 +435,17 @@ public class ReplicationIT extends ConfigurableMacBase {
     Assert.assertTrue("Replication table did not exist", online);
 
     Assert.assertTrue(ReplicationTable.isOnline(conn));
-    conn.securityOperations().grantTablePermission("root", 
ReplicationTable.NAME, TablePermission.READ);
 
     // Verify that we found a single replication record that's for table1
     Scanner s = ReplicationTable.getScanner(conn);
     StatusSection.limit(s);
-    Iterator<Entry<Key,Value>> iter = s.iterator();
-    attempts = 5;
-    while (attempts > 0) {
-      if (!iter.hasNext()) {
-        s.close();
-        Thread.sleep(1000);
-        s = ReplicationTable.getScanner(conn);
-        iter = s.iterator();
-        attempts--;
-      } else {
+    for (int i = 0; i < 5; i++) {
+      if (Iterators.size(s.iterator()) == 1) {
         break;
       }
+      Thread.sleep(1000);
     }
-    Assert.assertTrue(iter.hasNext());
-    Entry<Key,Value> entry = iter.next();
+    Entry<Key,Value> entry = Iterators.getOnlyElement(s.iterator());
     // We should at least find one status record for this table, we might find 
a second if another log was started from ingesting the data
     Assert.assertEquals("Expected to find replication entry for " + table1, 
conn.tableOperations().tableIdMap().get(table1), entry.getKey()
         .getColumnQualifier().toString());
@@ -469,23 +460,15 @@ public class ReplicationIT extends ConfigurableMacBase {
     // After the commit on these mutations, we'll get a replication entry in 
accumulo.metadata for table2
     // Don't want to compact table2 as it ultimately cause the entry in 
accumulo.metadata to be removed before we can verify it's there
 
-    // After writing data, we'll get a replication table online
-    Assert.assertTrue(ReplicationTable.isOnline(conn));
-
     Set<String> tableIds = 
Sets.newHashSet(conn.tableOperations().tableIdMap().get(table1), 
conn.tableOperations().tableIdMap().get(table2));
     Set<String> tableIdsForMetadata = Sets.newHashSet(tableIds);
 
+    List<Entry<Key,Value>> records = new ArrayList<>();
     s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
     s.setRange(MetadataSchema.ReplicationSection.getRange());
-
-    List<Entry<Key,Value>> records = new ArrayList<>();
     for (Entry<Key,Value> metadata : s) {
       records.add(metadata);
-    }
-    s = ReplicationTable.getScanner(conn);
-    StatusSection.limit(s);
-    for (Entry<Key,Value> replication : s) {
-      records.add(replication);
+      log.debug("Meta: {} => {}", metadata.getKey().toStringNoTruncate(), 
metadata.getValue().toString());
     }
 
     Assert.assertEquals("Expected to find 2 records, but actually found " + 
records, 2, records.size());
@@ -503,7 +486,7 @@ public class ReplicationIT extends ConfigurableMacBase {
     // Verify that we found two replication records: one for table1 and one 
for table2
     s = ReplicationTable.getScanner(conn);
     StatusSection.limit(s);
-    iter = s.iterator();
+    Iterator<Entry<Key,Value>> iter = s.iterator();
     Assert.assertTrue("Found no records in replication table", iter.hasNext());
     entry = iter.next();
     Assert.assertTrue("Expected to find element in replication table", 
tableIds.remove(entry.getKey().getColumnQualifier().toString()));

Reply via email to