ACCUMULO-3320 Fetch the necessary info from the master and tservers in the GC
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/2b347346 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/2b347346 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/2b347346 Branch: refs/heads/master Commit: 2b3473469d38241f3a9ada79d968853801d91597 Parents: a754e1b Author: Josh Elser <els...@apache.org> Authored: Sat Nov 8 22:03:21 2014 -0500 Committer: Josh Elser <els...@apache.org> Committed: Mon Nov 10 13:34:51 2014 -0800 ---------------------------------------------------------------------- .../gc/GarbageCollectWriteAheadLogs.java | 129 ++++++++++++++++--- .../gc/GarbageCollectWriteAheadLogsTest.java | 79 ++++++++++++ .../master/MasterClientServiceHandler.java | 13 +- .../apache/accumulo/tserver/TabletServer.java | 7 + .../test/performance/thrift/NullTserver.java | 5 + 5 files changed, 215 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/2b347346/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java ---------------------------------------------------------------------- diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java index e96fee3..50256b2 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java @@ -36,11 +36,13 @@ import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.gc.thrift.GCStatus; import org.apache.accumulo.core.gc.thrift.GcCycleStats; +import org.apache.accumulo.core.master.thrift.MasterClientService; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection; import org.apache.accumulo.core.protobuf.ProtobufUtil; @@ -51,12 +53,14 @@ import org.apache.accumulo.core.replication.StatusUtil; import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.Credentials; +import org.apache.accumulo.core.security.thrift.TCredentials; import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client; import org.apache.accumulo.core.trace.Span; import org.apache.accumulo.core.trace.Trace; import org.apache.accumulo.core.trace.Tracer; +import org.apache.accumulo.core.trace.thrift.TInfo; import org.apache.accumulo.core.util.AddressUtil; import org.apache.accumulo.core.util.ThriftUtil; import org.apache.accumulo.core.zookeeper.ZooUtil; @@ -69,6 +73,7 @@ import org.apache.accumulo.server.zookeeper.ZooReaderWriter; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransportException; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -355,12 +360,23 @@ public class GarbageCollectWriteAheadLogs { throw new IllegalArgumentException(e); } + + final AccumuloConfiguration conf = new ServerConfigurationFactory(instance).getConfiguration(); + final TInfo tinfo = Tracer.traceInfo(); + final TCredentials tcreds = SystemCredentials.get().toThrift(getInstance()); + Set<String> activeWals = getActiveWals(conf, tinfo, tcreds); + int count = 0; Iterator<Entry<String,Path>> walIter = nameToFileMap.entrySet().iterator(); while (walIter.hasNext()) { Entry<String,Path> wal = walIter.next(); String fullPath = wal.getValue().toString(); + if (null == activeWals) { + log.debug("Could not contact servers to determine if WAL is still in use"); + walIter.remove(); + sortedWALogs.remove(wal.getKey()); + } if (neededByReplication(conn, fullPath)) { log.debug("Removing WAL from candidate deletion as it is still needed for replication: {} ", fullPath); // If we haven't already removed it, check to see if this WAL is @@ -378,6 +394,102 @@ public class GarbageCollectWriteAheadLogs { return count; } + private String getMasterAddress() { + try { + List<String> locations = getInstance().getMasterLocations(); + if (locations.size() == 0) + return null; + return locations.get(0); + } catch (Exception e) { + log.warn("Failed to obtain master host " + e); + } + + return null; + } + + private MasterClientService.Client getMasterConnection(AccumuloConfiguration conf) { + final String address = getMasterAddress(); + try { + if (address == null) { + log.warn("Could not fetch Master address"); + return null; + } + return ThriftUtil.getClient(new MasterClientService.Client.Factory(), address, Property.GENERAL_RPC_TIMEOUT, conf); + } catch (Exception e) { + log.warn("Issue with masterConnection (" + address + ") " + e, e); + } + return null; + } + + /** + * Fetch the set of WALs in use by tabletservers + * + * @return Set of WALs in use by tservers, null if they cannot be computed for some reason + */ + protected Set<String> getActiveWals(AccumuloConfiguration conf, TInfo tinfo, TCredentials tcreds) { + List<String> tservers = getActiveTservers(conf, tinfo, tcreds); + + // Compute the total set of WALs used by tservers + Set<String> walogs = null; + if (null != tservers) { + walogs = new HashSet<String>(); + for (String tserver : tservers) { + HostAndPort address = HostAndPort.fromString(tserver); + List<String> activeWalsForServer = getActiveWalsForServer(conf, tinfo, tcreds, address); + if (null == activeWalsForServer) { + log.debug("Could not fetch active wals from " + address); + return null; + } + log.debug("Got active wals for " + address + ", " + activeWalsForServer); + walogs.addAll(activeWalsForServer); + } + } + + return walogs; + } + + /** + * Get the active tabletservers as seen by the master. + * + * @return The active tabletservers, null if they can't be computed. + */ + protected List<String> getActiveTservers(AccumuloConfiguration conf, TInfo tinfo, TCredentials tcreds) { + MasterClientService.Client client = null; + + List<String> tservers = null; + try { + client = getMasterConnection(conf); + + if (null != client) { + tservers = client.getActiveTservers(tinfo, tcreds); + } + } catch (TException e) { + // If we can't fetch the tabletservers, we can't fetch any active WALs + log.warn("Failed to fetch active tabletservers from the master", e); + return null; + } finally { + ThriftUtil.returnClient(client); + } + + return tservers; + } + + protected List<String> getActiveWalsForServer(AccumuloConfiguration conf, TInfo tinfo, TCredentials tcreds, HostAndPort server) { + TabletClientService.Client tserverClient = null; + try { + tserverClient = ThriftUtil.getClient(new TabletClientService.Client.Factory(), server, conf); + return tserverClient.getActiveLogs(tinfo, tcreds); + } catch (TTransportException e) { + log.warn("Failed to fetch active write-ahead logs from " + server, e); + return null; + } catch (TException e) { + log.warn("Failed to fetch active write-ahead logs from " + server, e); + return null; + } finally { + ThriftUtil.returnClient(tserverClient); + } + } + /** * Determine if the given WAL is needed for replication * @@ -388,23 +500,6 @@ public class GarbageCollectWriteAheadLogs { protected boolean neededByReplication(Connector conn, String wal) { log.info("Checking replication table for " + wal); - // try { - // log.info("Current state of Metadata table"); - // for (Entry<Key,Value> entry : conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) { - // log.info(entry.getKey().toStringNoTruncate() + "=" + TextFormat.shortDebugString(Status.parseFrom(entry.getValue().get()))); - // } - // } catch (Exception e) { - // log.error("Could not read metadata table"); - // } - // try { - // log.info("Current state of replication table"); - // for (Entry<Key,Value> entry : conn.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)) { - // log.info(entry.getKey().toStringNoTruncate() + "=" + TextFormat.shortDebugString(Status.parseFrom(entry.getValue().get()))); - // } - // } catch (Exception e) { - // log.error("Could not read replication table"); - // } - Iterable<Entry<Key,Value>> iter = getReplicationStatusForFile(conn, wal); // TODO Push down this filter to the tserver to only return records http://git-wip-us.apache.org/repos/asf/accumulo/blob/2b347346/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java ---------------------------------------------------------------------- diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java index 71e5f7d..780907f 100644 --- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java +++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java @@ -27,12 +27,15 @@ import static org.junit.Assert.assertTrue; import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.UUID; import org.apache.accumulo.core.client.BatchWriter; @@ -41,6 +44,7 @@ import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.mock.MockInstance; import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; @@ -54,10 +58,13 @@ import org.apache.accumulo.core.replication.ReplicationTable; import org.apache.accumulo.core.replication.StatusUtil; import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.core.security.Credentials; +import org.apache.accumulo.core.security.thrift.TCredentials; +import org.apache.accumulo.core.trace.thrift.TInfo; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; +import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -66,6 +73,7 @@ import org.junit.rules.TestName; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; +import com.google.common.net.HostAndPort; public class GarbageCollectWriteAheadLogsTest { private static final long BLOCK_SIZE = 64000000L; @@ -517,4 +525,75 @@ public class GarbageCollectWriteAheadLogsTest { } } } + + @Test + public void getActiveWals() throws Exception { + GarbageCollectWriteAheadLogs gcWals = EasyMock.createMockBuilder(GarbageCollectWriteAheadLogs.class).addMockedMethod("getActiveTservers") + .addMockedMethod("getActiveWalsForServer").createMock(); + AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class); + TInfo tinfo = EasyMock.createMock(TInfo.class); + TCredentials tcreds = EasyMock.createMock(TCredentials.class); + + List<String> tservers = Arrays.asList("localhost:12345", "localhost:12346"); + EasyMock.expect(gcWals.getActiveTservers(conf, tinfo, tcreds)).andReturn(tservers); + int numWals = 0; + for (String tserver : tservers) { + EasyMock.expect(gcWals.getActiveWalsForServer(conf, tinfo, tcreds, HostAndPort.fromString(tserver))).andReturn(Arrays.asList("/wal" + numWals)); + numWals++; + } + + EasyMock.replay(gcWals); + + Set<String> wals = gcWals.getActiveWals(conf, tinfo, tcreds); + + EasyMock.verify(gcWals); + + Set<String> expectedWals = new HashSet<String>(); + for (int i = 0; i < numWals; i++) { + expectedWals.add("/wal" + i); + } + + Assert.assertEquals(expectedWals, wals); + } + + @Test + public void offlineMaster() throws Exception { + GarbageCollectWriteAheadLogs gcWals = EasyMock.createMockBuilder(GarbageCollectWriteAheadLogs.class).addMockedMethod("getActiveTservers") + .addMockedMethod("getActiveWalsForServer").createMock(); + AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class); + TInfo tinfo = EasyMock.createMock(TInfo.class); + TCredentials tcreds = EasyMock.createMock(TCredentials.class); + + EasyMock.expect(gcWals.getActiveTservers(conf, tinfo, tcreds)).andReturn(null); + + EasyMock.replay(gcWals); + + Set<String> wals = gcWals.getActiveWals(conf, tinfo, tcreds); + + EasyMock.verify(gcWals); + + Assert.assertNull("Expected to get null for active WALs", wals); + } + + @Test + public void offlineTserver() throws Exception { + GarbageCollectWriteAheadLogs gcWals = EasyMock.createMockBuilder(GarbageCollectWriteAheadLogs.class).addMockedMethod("getActiveTservers") + .addMockedMethod("getActiveWalsForServer").createMock(); + AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class); + TInfo tinfo = EasyMock.createMock(TInfo.class); + TCredentials tcreds = EasyMock.createMock(TCredentials.class); + + List<String> tservers = Arrays.asList("localhost:12345", "localhost:12346"); + EasyMock.expect(gcWals.getActiveTservers(conf, tinfo, tcreds)).andReturn(tservers); + EasyMock.expect(gcWals.getActiveWalsForServer(conf, tinfo, tcreds, HostAndPort.fromString("localhost:12345"))).andReturn(Arrays.asList("/wal" + 0)); + EasyMock.expect(gcWals.getActiveWalsForServer(conf, tinfo, tcreds, HostAndPort.fromString("localhost:12346"))).andReturn(null); + + EasyMock.replay(gcWals); + + Set<String> wals = gcWals.getActiveWals(conf, tinfo, tcreds); + + EasyMock.verify(gcWals); + + Assert.assertNull("Expected to get null for active WALs", wals); + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/2b347346/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java b/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java index 2007b36..9a3e532 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java +++ b/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map.Entry; import java.util.Set; @@ -435,7 +436,7 @@ class MasterClientServiceHandler extends FateServiceHandler implements MasterCli private void alterTableProperty(TCredentials c, String tableName, String property, String value, TableOperation op) throws ThriftSecurityException, ThriftTableOperationException { final String tableId = ClientServiceHandler.checkTableId(master.getInstance(), tableName, op); - String namespaceId = Tables.getNamespaceId(master.getInstance(), tableId); + String namespaceId = Tables.getNamespaceId(master.getInstance(), tableId); if (!master.security.canAlterTable(c, tableId, namespaceId)) throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); @@ -472,4 +473,14 @@ class MasterClientServiceHandler extends FateServiceHandler implements MasterCli master.waitForBalance(tinfo); } + @Override + public List<String> getActiveTservers(TInfo tinfo, TCredentials credentials) throws TException { + Set<TServerInstance> tserverInstances = master.onlineTabletServers(); + List<String> servers = new ArrayList<String>(); + for (TServerInstance tserverInstance : tserverInstances) { + servers.add(tserverInstance.getLocation().toString()); + } + + return servers; + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/2b347346/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 1e81947..6002789 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -1743,6 +1743,13 @@ public class TabletServer implements Runnable { return ret; } + + @Override + public List<String> getActiveLogs(TInfo tinfo, TCredentials credentials) throws TException { + Set<String> logs = new HashSet<String>(); + logger.getLogFiles(logs); + return new ArrayList<String>(logs); + } } private class SplitRunner implements Runnable { http://git-wip-us.apache.org/repos/asf/accumulo/blob/2b347346/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java index a47f21a..b6cf727 100644 --- a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java @@ -224,6 +224,11 @@ public class NullTserver { @Override public void closeConditionalUpdate(TInfo tinfo, long sessID) throws TException {} + + @Override + public List<String> getActiveLogs(TInfo tinfo, TCredentials credentials) throws TException { + return null; + } } static class Opts extends Help {