ACCUMULO-3320 Fetch the necessary info from the master and tservers in the GC


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/2b347346
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/2b347346
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/2b347346

Branch: refs/heads/master
Commit: 2b3473469d38241f3a9ada79d968853801d91597
Parents: a754e1b
Author: Josh Elser <els...@apache.org>
Authored: Sat Nov 8 22:03:21 2014 -0500
Committer: Josh Elser <els...@apache.org>
Committed: Mon Nov 10 13:34:51 2014 -0800

----------------------------------------------------------------------
 .../gc/GarbageCollectWriteAheadLogs.java        | 129 ++++++++++++++++---
 .../gc/GarbageCollectWriteAheadLogsTest.java    |  79 ++++++++++++
 .../master/MasterClientServiceHandler.java      |  13 +-
 .../apache/accumulo/tserver/TabletServer.java   |   7 +
 .../test/performance/thrift/NullTserver.java    |   5 +
 5 files changed, 215 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/2b347346/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git 
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
 
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index e96fee3..50256b2 100644
--- 
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ 
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -36,11 +36,13 @@ import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.gc.thrift.GCStatus;
 import org.apache.accumulo.core.gc.thrift.GcCycleStats;
+import org.apache.accumulo.core.master.thrift.MasterClientService;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
@@ -51,12 +53,14 @@ import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
 import org.apache.accumulo.core.trace.Span;
 import org.apache.accumulo.core.trace.Trace;
 import org.apache.accumulo.core.trace.Tracer;
+import org.apache.accumulo.core.trace.thrift.TInfo;
 import org.apache.accumulo.core.util.AddressUtil;
 import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
@@ -69,6 +73,7 @@ import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -355,12 +360,23 @@ public class GarbageCollectWriteAheadLogs {
       throw new IllegalArgumentException(e);
     }
 
+
+    final AccumuloConfiguration conf = new 
ServerConfigurationFactory(instance).getConfiguration();
+    final TInfo tinfo = Tracer.traceInfo();
+    final TCredentials tcreds = 
SystemCredentials.get().toThrift(getInstance());
+    Set<String> activeWals = getActiveWals(conf, tinfo, tcreds);
+
     int count = 0;
 
     Iterator<Entry<String,Path>> walIter = nameToFileMap.entrySet().iterator();
     while (walIter.hasNext()) {
       Entry<String,Path> wal = walIter.next();
       String fullPath = wal.getValue().toString();
+      if (null == activeWals) {
+        log.debug("Could not contact servers to determine if WAL is still in 
use");
+        walIter.remove();
+        sortedWALogs.remove(wal.getKey());
+      }
       if (neededByReplication(conn, fullPath)) {
         log.debug("Removing WAL from candidate deletion as it is still needed 
for replication: {} ", fullPath);
         // If we haven't already removed it, check to see if this WAL is
@@ -378,6 +394,102 @@ public class GarbageCollectWriteAheadLogs {
     return count;
   }
 
+  private String getMasterAddress() {
+    try {
+      List<String> locations = getInstance().getMasterLocations();
+      if (locations.size() == 0)
+        return null;
+      return locations.get(0);
+    } catch (Exception e) {
+      log.warn("Failed to obtain master host " + e);
+    }
+
+    return null;
+  }
+
+  private MasterClientService.Client getMasterConnection(AccumuloConfiguration 
conf) {
+    final String address = getMasterAddress();
+    try {
+      if (address == null) {
+        log.warn("Could not fetch Master address");
+        return null;
+      }
+      return ThriftUtil.getClient(new MasterClientService.Client.Factory(), 
address, Property.GENERAL_RPC_TIMEOUT, conf);
+    } catch (Exception e) {
+      log.warn("Issue with masterConnection (" + address + ") " + e, e);
+    }
+    return null;
+  }
+
+  /**
+   * Fetch the set of WALs in use by tabletservers
+   *
+   * @return Set of WALs in use by tservers, null if they cannot be computed 
for some reason
+   */
+  protected Set<String> getActiveWals(AccumuloConfiguration conf, TInfo tinfo, 
TCredentials tcreds) {
+    List<String> tservers = getActiveTservers(conf, tinfo, tcreds);
+
+    // Compute the total set of WALs used by tservers
+    Set<String> walogs = null;
+    if (null != tservers) {
+      walogs = new HashSet<String>();
+      for (String tserver : tservers) {
+        HostAndPort address = HostAndPort.fromString(tserver);
+        List<String> activeWalsForServer = getActiveWalsForServer(conf, tinfo, 
tcreds, address);
+        if (null == activeWalsForServer) {
+          log.debug("Could not fetch active wals from " + address);
+          return null;
+        }
+        log.debug("Got active wals for " + address + ", " + 
activeWalsForServer);
+        walogs.addAll(activeWalsForServer);
+      }
+    }
+
+    return walogs;
+  }
+
+  /**
+   * Get the active tabletservers as seen by the master.
+   *
+   * @return The active tabletservers, null if they can't be computed.
+   */
+  protected List<String> getActiveTservers(AccumuloConfiguration conf, TInfo 
tinfo, TCredentials tcreds) {
+    MasterClientService.Client client = null;
+
+    List<String> tservers = null;
+    try {
+      client = getMasterConnection(conf);
+
+      if (null != client) {
+        tservers = client.getActiveTservers(tinfo, tcreds);
+      }
+    } catch (TException e) {
+      // If we can't fetch the tabletservers, we can't fetch any active WALs
+      log.warn("Failed to fetch active tabletservers from the master", e);
+      return null;
+    } finally {
+      ThriftUtil.returnClient(client);
+    }
+
+    return tservers;
+  }
+
+  protected List<String> getActiveWalsForServer(AccumuloConfiguration conf, 
TInfo tinfo, TCredentials tcreds, HostAndPort server) {
+    TabletClientService.Client tserverClient = null;
+    try {
+      tserverClient = ThriftUtil.getClient(new 
TabletClientService.Client.Factory(), server, conf);
+      return tserverClient.getActiveLogs(tinfo, tcreds);
+    } catch (TTransportException e) {
+      log.warn("Failed to fetch active write-ahead logs from " + server, e);
+      return null;
+    } catch (TException e) {
+      log.warn("Failed to fetch active write-ahead logs from " + server, e);
+      return null;
+    } finally {
+      ThriftUtil.returnClient(tserverClient);
+    }
+  }
+
   /**
    * Determine if the given WAL is needed for replication
    *
@@ -388,23 +500,6 @@ public class GarbageCollectWriteAheadLogs {
   protected boolean neededByReplication(Connector conn, String wal) {
     log.info("Checking replication table for " + wal);
 
-    // try {
-    // log.info("Current state of Metadata table");
-    // for (Entry<Key,Value> entry : conn.createScanner(MetadataTable.NAME, 
Authorizations.EMPTY)) {
-    // log.info(entry.getKey().toStringNoTruncate() + "=" + 
TextFormat.shortDebugString(Status.parseFrom(entry.getValue().get())));
-    // }
-    // } catch (Exception e) {
-    // log.error("Could not read metadata table");
-    // }
-    // try {
-    // log.info("Current state of replication table");
-    // for (Entry<Key,Value> entry : conn.createScanner(ReplicationTable.NAME, 
Authorizations.EMPTY)) {
-    // log.info(entry.getKey().toStringNoTruncate() + "=" + 
TextFormat.shortDebugString(Status.parseFrom(entry.getValue().get())));
-    // }
-    // } catch (Exception e) {
-    // log.error("Could not read replication table");
-    // }
-
     Iterable<Entry<Key,Value>> iter = getReplicationStatusForFile(conn, wal);
 
     // TODO Push down this filter to the tserver to only return records

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2b347346/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --git 
a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
 
b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index 71e5f7d..780907f 100644
--- 
a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ 
b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -27,12 +27,15 @@ import static org.junit.Assert.assertTrue;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.UUID;
 
 import org.apache.accumulo.core.client.BatchWriter;
@@ -41,6 +44,7 @@ import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
@@ -54,10 +58,13 @@ import 
org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.trace.thrift.TInfo;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.easymock.EasyMock;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
@@ -66,6 +73,7 @@ import org.junit.rules.TestName;
 
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
+import com.google.common.net.HostAndPort;
 
 public class GarbageCollectWriteAheadLogsTest {
   private static final long BLOCK_SIZE = 64000000L;
@@ -517,4 +525,75 @@ public class GarbageCollectWriteAheadLogsTest {
       }
     }
   }
+
+  @Test
+  public void getActiveWals() throws Exception {
+    GarbageCollectWriteAheadLogs gcWals = 
EasyMock.createMockBuilder(GarbageCollectWriteAheadLogs.class).addMockedMethod("getActiveTservers")
+        .addMockedMethod("getActiveWalsForServer").createMock();
+    AccumuloConfiguration conf = 
EasyMock.createMock(AccumuloConfiguration.class);
+    TInfo tinfo = EasyMock.createMock(TInfo.class);
+    TCredentials tcreds = EasyMock.createMock(TCredentials.class);
+
+    List<String> tservers = Arrays.asList("localhost:12345", 
"localhost:12346");
+    EasyMock.expect(gcWals.getActiveTservers(conf, tinfo, 
tcreds)).andReturn(tservers);
+    int numWals = 0;
+    for (String tserver : tservers) {
+      EasyMock.expect(gcWals.getActiveWalsForServer(conf, tinfo, tcreds, 
HostAndPort.fromString(tserver))).andReturn(Arrays.asList("/wal" + numWals));
+      numWals++;
+    }
+
+    EasyMock.replay(gcWals);
+
+    Set<String> wals = gcWals.getActiveWals(conf, tinfo, tcreds);
+
+    EasyMock.verify(gcWals);
+
+    Set<String> expectedWals = new HashSet<String>();
+    for (int i = 0; i < numWals; i++) {
+      expectedWals.add("/wal" + i);
+    }
+
+    Assert.assertEquals(expectedWals, wals);
+  }
+
+  @Test
+  public void offlineMaster() throws Exception {
+    GarbageCollectWriteAheadLogs gcWals = 
EasyMock.createMockBuilder(GarbageCollectWriteAheadLogs.class).addMockedMethod("getActiveTservers")
+        .addMockedMethod("getActiveWalsForServer").createMock();
+    AccumuloConfiguration conf = 
EasyMock.createMock(AccumuloConfiguration.class);
+    TInfo tinfo = EasyMock.createMock(TInfo.class);
+    TCredentials tcreds = EasyMock.createMock(TCredentials.class);
+
+    EasyMock.expect(gcWals.getActiveTservers(conf, tinfo, 
tcreds)).andReturn(null);
+
+    EasyMock.replay(gcWals);
+
+    Set<String> wals = gcWals.getActiveWals(conf, tinfo, tcreds);
+
+    EasyMock.verify(gcWals);
+
+    Assert.assertNull("Expected to get null for active WALs", wals);
+  }
+
+  @Test
+  public void offlineTserver() throws Exception {
+    GarbageCollectWriteAheadLogs gcWals = 
EasyMock.createMockBuilder(GarbageCollectWriteAheadLogs.class).addMockedMethod("getActiveTservers")
+        .addMockedMethod("getActiveWalsForServer").createMock();
+    AccumuloConfiguration conf = 
EasyMock.createMock(AccumuloConfiguration.class);
+    TInfo tinfo = EasyMock.createMock(TInfo.class);
+    TCredentials tcreds = EasyMock.createMock(TCredentials.class);
+
+    List<String> tservers = Arrays.asList("localhost:12345", 
"localhost:12346");
+    EasyMock.expect(gcWals.getActiveTservers(conf, tinfo, 
tcreds)).andReturn(tservers);
+    EasyMock.expect(gcWals.getActiveWalsForServer(conf, tinfo, tcreds, 
HostAndPort.fromString("localhost:12345"))).andReturn(Arrays.asList("/wal" + 
0));
+    EasyMock.expect(gcWals.getActiveWalsForServer(conf, tinfo, tcreds, 
HostAndPort.fromString("localhost:12346"))).andReturn(null);
+
+    EasyMock.replay(gcWals);
+
+    Set<String> wals = gcWals.getActiveWals(conf, tinfo, tcreds);
+
+    EasyMock.verify(gcWals);
+
+    Assert.assertNull("Expected to get null for active WALs", wals);
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2b347346/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
----------------------------------------------------------------------
diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
 
b/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
index 2007b36..9a3e532 100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map.Entry;
 import java.util.Set;
 
@@ -435,7 +436,7 @@ class MasterClientServiceHandler extends FateServiceHandler 
implements MasterCli
   private void alterTableProperty(TCredentials c, String tableName, String 
property, String value, TableOperation op) throws ThriftSecurityException,
       ThriftTableOperationException {
     final String tableId = 
ClientServiceHandler.checkTableId(master.getInstance(), tableName, op);
-    String namespaceId = Tables.getNamespaceId(master.getInstance(), tableId); 
+    String namespaceId = Tables.getNamespaceId(master.getInstance(), tableId);
     if (!master.security.canAlterTable(c, tableId, namespaceId))
       throw new ThriftSecurityException(c.getPrincipal(), 
SecurityErrorCode.PERMISSION_DENIED);
 
@@ -472,4 +473,14 @@ class MasterClientServiceHandler extends 
FateServiceHandler implements MasterCli
     master.waitForBalance(tinfo);
   }
 
+  @Override
+  public List<String> getActiveTservers(TInfo tinfo, TCredentials credentials) 
throws TException {
+    Set<TServerInstance> tserverInstances = master.onlineTabletServers();
+    List<String> servers = new ArrayList<String>();
+    for (TServerInstance tserverInstance : tserverInstances) {
+      servers.add(tserverInstance.getLocation().toString());
+    }
+
+    return servers;
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2b347346/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 1e81947..6002789 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -1743,6 +1743,13 @@ public class TabletServer implements Runnable {
 
       return ret;
     }
+    
+    @Override
+    public List<String> getActiveLogs(TInfo tinfo, TCredentials credentials) 
throws TException {
+      Set<String> logs = new HashSet<String>();
+      logger.getLogFiles(logs);
+      return new ArrayList<String>(logs);
+    }
   }
 
   private class SplitRunner implements Runnable {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2b347346/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
 
b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
index a47f21a..b6cf727 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
@@ -224,6 +224,11 @@ public class NullTserver {
 
     @Override
     public void closeConditionalUpdate(TInfo tinfo, long sessID) throws 
TException {}
+    
+    @Override
+    public List<String> getActiveLogs(TInfo tinfo, TCredentials credentials) 
throws TException {
+      return null;
+    }
   }
 
   static class Opts extends Help {

Reply via email to