ACCUMULO-3320 Some cleanup and add IT suffix to test name.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/00105ce1 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/00105ce1 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/00105ce1 Branch: refs/heads/master Commit: 00105ce12259de5d37a7e1e1fac4964b15b08be4 Parents: 09cb6b2 Author: Josh Elser <els...@apache.org> Authored: Mon Nov 10 10:11:13 2014 -0800 Committer: Josh Elser <els...@apache.org> Committed: Mon Nov 10 13:34:51 2014 -0800 ---------------------------------------------------------------------- .../CloseWriteAheadLogReferences.java | 5 + ...arbageCollectorCommunicatesWithTServers.java | 446 ------------------- ...bageCollectorCommunicatesWithTServersIT.java | 446 +++++++++++++++++++ 3 files changed, 451 insertions(+), 446 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/00105ce1/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java ---------------------------------------------------------------------- diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java index a41e965..b0fd0f4 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java @@ -344,6 +344,9 @@ public class CloseWriteAheadLogReferences implements Runnable { Set<String> walogs = null; if (null != tservers) { walogs = new HashSet<String>(); + // TODO If we have a lot of tservers, this might start to take a fair amount of time + // Consider adding a threadpool to parallelize the requests. + // Alternatively, we might have to move to a solution that doesn't involve tserver RPC for (String tserver : tservers) { HostAndPort address = HostAndPort.fromString(tserver); List<String> activeWalsForServer = getActiveWalsForServer(conf, tinfo, tcreds, address); @@ -374,6 +377,8 @@ public class CloseWriteAheadLogReferences implements Runnable { try { client = getMasterConnection(conf); + // Could do this through InstanceOperations, but that would set a bunch of new Watchers via ZK on every tserver + // node. The master is already tracking all of this info, so hopefully this is less overall work. if (null != client) { tservers = client.getActiveTservers(tinfo, tcreds); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/00105ce1/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServers.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServers.java b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServers.java deleted file mode 100644 index dff2726..0000000 --- a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServers.java +++ /dev/null @@ -1,446 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.replication; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; - -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.impl.ClientConfigurationHelper; -import org.apache.accumulo.core.client.impl.ClientExecReturn; -import org.apache.accumulo.core.client.impl.MasterClient; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.master.thrift.MasterClientService; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.schema.MetadataSchema; -import org.apache.accumulo.core.protobuf.ProtobufUtil; -import org.apache.accumulo.core.replication.ReplicationTable; -import org.apache.accumulo.core.replication.proto.Replication.Status; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.security.Credentials; -import org.apache.accumulo.core.security.thrift.TCredentials; -import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client; -import org.apache.accumulo.core.trace.Tracer; -import org.apache.accumulo.core.util.ThriftUtil; -import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; -import org.apache.accumulo.test.functional.AbstractMacIT; -import org.apache.accumulo.test.functional.ConfigurableMacIT; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RawLocalFileSystem; -import org.apache.hadoop.io.Text; -import org.bouncycastle.util.Arrays; -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * ACCUMULO-3302 series of tests which ensure that a WAL is prematurely closed when a TServer may still continue to use it. Checking that no tablet references a - * WAL is insufficient to determine if a WAL will never be used in the future. - */ -public class GarbageCollectorCommunicatesWithTServers extends ConfigurableMacIT { - private static final Logger log = LoggerFactory.getLogger(GarbageCollectorCommunicatesWithTServers.class); - - private final int GC_PERIOD_SECONDS = 1; - - @Override - public int defaultTimeoutSeconds() { - return 2 * 60; - } - - @Override - public void configure(MiniAccumuloConfigImpl cfg, Configuration coreSite) { - cfg.setNumTservers(1); - cfg.setProperty(Property.GC_CYCLE_DELAY, GC_PERIOD_SECONDS + "s"); - // Wait longer to try to let the replication table come online before a cycle runs - cfg.setProperty(Property.GC_CYCLE_START, "10s"); - cfg.setProperty(Property.REPLICATION_NAME, "master"); - // Set really long delays for the master to do stuff for replication - cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_DELAY, "240s"); - cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "240s"); - cfg.setProperty(Property.REPLICATION_DRIVER_DELAY, "240s"); - cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M"); - coreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); - } - - /** - * Fetch all of the WALs referenced by tablets in the metadata table for this table - */ - private Set<String> getWalsForTable(String tableName) throws Exception { - final Connector conn = getConnector(); - final String tableId = conn.tableOperations().tableIdMap().get(tableName); - - Assert.assertNotNull("Could not determine table ID for " + tableName, tableId); - - Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); - Range r = MetadataSchema.TabletsSection.getRange(tableId); - s.setRange(r); - s.fetchColumnFamily(MetadataSchema.TabletsSection.LogColumnFamily.NAME); - - Set<String> wals = new HashSet<String>(); - for (Entry<Key,Value> entry : s) { - log.debug("Reading WALs: {}={}", entry.getKey().toStringNoTruncate(), entry.getValue()); - // hostname:port/uri://path/to/wal - String cq = entry.getKey().getColumnQualifier().toString(); - int index = cq.indexOf('/'); - // Normalize the path - String path = new Path(cq.substring(index + 1)).toString(); - log.debug("Extracted file: " + path); - wals.add(path); - } - - return wals; - } - - /** - * Fetch all of the rfiles referenced by tablets in the metadata table for this table - */ - private Set<String> getFilesForTable(String tableName) throws Exception { - final Connector conn = getConnector(); - final String tableId = conn.tableOperations().tableIdMap().get(tableName); - - Assert.assertNotNull("Could not determine table ID for " + tableName, tableId); - - Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); - Range r = MetadataSchema.TabletsSection.getRange(tableId); - s.setRange(r); - s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME); - - Set<String> rfiles = new HashSet<String>(); - for (Entry<Key,Value> entry : s) { - log.debug("Reading RFiles: {}={}", entry.getKey().toStringNoTruncate(), entry.getValue()); - // uri://path/to/wal - String cq = entry.getKey().getColumnQualifier().toString(); - String path = new Path(cq).toString(); - log.debug("Normalize path to rfile: {}", path); - rfiles.add(path); - } - - return rfiles; - } - - /** - * Get the replication status messages for the given table that exist in the metadata table (~repl entries) - */ - private Map<String,Status> getMetadataStatusForTable(String tableName) throws Exception { - final Connector conn = getConnector(); - final String tableId = conn.tableOperations().tableIdMap().get(tableName); - - Assert.assertNotNull("Could not determine table ID for " + tableName, tableId); - - Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); - Range r = MetadataSchema.ReplicationSection.getRange(); - s.setRange(r); - s.fetchColumn(MetadataSchema.ReplicationSection.COLF, new Text(tableId)); - - Map<String,Status> fileToStatus = new HashMap<String,Status>(); - for (Entry<Key,Value> entry : s) { - Text file = new Text(); - MetadataSchema.ReplicationSection.getFile(entry.getKey(), file); - Status status = Status.parseFrom(entry.getValue().get()); - log.info("Got status for {}: {}", file, ProtobufUtil.toString(status)); - fileToStatus.put(file.toString(), status); - } - - return fileToStatus; - } - - @Test - public void testActiveWalPrecludesClosing() throws Exception { - final String table = getUniqueNames(1)[0]; - final Connector conn = getConnector(); - - // Bring the replication table online first and foremost - ReplicationTable.setOnline(conn); - - log.info("Creating {}", table); - conn.tableOperations().create(table); - - conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION.getKey(), "true"); - - log.info("Writing a few mutations to the table"); - - BatchWriter bw = conn.createBatchWriter(table, null); - - byte[] empty = new byte[0]; - for (int i = 0; i < 5; i++) { - Mutation m = new Mutation(Integer.toString(i)); - m.put(empty, empty, empty); - bw.addMutation(m); - } - - log.info("Flushing mutations to the server"); - bw.flush(); - - log.info("Checking that metadata only has one WAL recorded for this table"); - - Set<String> wals = getWalsForTable(table); - Assert.assertEquals("Expected to only find one WAL for the table", 1, wals.size()); - - log.info("Compacting the table which will remove all WALs from the tablets"); - - // Flush our test table to remove the WAL references in it - conn.tableOperations().flush(table, null, null, true); - // Flush the metadata table too because it will have a reference to the WAL - conn.tableOperations().flush(MetadataTable.NAME, null, null, true); - - log.info("Waiting for replication table to come online"); - - log.info("Fetching replication statuses from metadata table"); - - Map<String,Status> fileToStatus = getMetadataStatusForTable(table); - - Assert.assertEquals("Expected to only find one replication status message", 1, fileToStatus.size()); - - String walName = fileToStatus.keySet().iterator().next(); - Assert.assertEquals("Expected log file name from tablet to equal replication entry", wals.iterator().next(), walName); - - Status status = fileToStatus.get(walName); - - Assert.assertEquals("Expected Status for file to not be closed", false, status.getClosed()); - - log.info("Checking to see that log entries are removed from tablet section after MinC"); - // After compaction, the log column should be gone from the tablet - Set<String> walsAfterMinc = getWalsForTable(table); - Assert.assertEquals("Expected to find no WALs for tablet", 0, walsAfterMinc.size()); - - Set<String> filesForTable = getFilesForTable(table); - Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTable.size()); - log.info("Files for table before MajC: {}", filesForTable); - - // Issue a MajC to roll a new file in HDFS - conn.tableOperations().compact(table, null, null, false, true); - - Set<String> filesForTableAfterCompaction = getFilesForTable(table); - - log.info("Files for table after MajC: {}", filesForTableAfterCompaction); - - Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTableAfterCompaction.size()); - Assert.assertNotEquals("Expected the files before and after compaction to differ", filesForTableAfterCompaction, filesForTable); - - // Use the rfile which was just replaced by the MajC to determine when the GC has ran - Path fileToBeDeleted = new Path(filesForTable.iterator().next()); - FileSystem fs = fileToBeDeleted.getFileSystem(new Configuration()); - - boolean fileExists = fs.exists(fileToBeDeleted); - while (fileExists) { - log.info("File which should get deleted still exists: {}", fileToBeDeleted); - Thread.sleep(2000); - fileExists = fs.exists(fileToBeDeleted); - } - - // At this point in time, we *know* that the GarbageCollector has run which means that the Status - // for our WAL should not be altered. - - log.info("Re-checking that WALs are still not referenced for our table"); - - Set<String> walsAfterMajc = getWalsForTable(table); - Assert.assertEquals("Expected to find no WALs in tablets section: " + walsAfterMajc, 0, walsAfterMajc.size()); - - Map<String,Status> fileToStatusAfterMinc = getMetadataStatusForTable(table); - Assert.assertEquals("Expected to still find only one replication status message: " + fileToStatusAfterMinc, 1, fileToStatusAfterMinc.size()); - - Assert.assertEquals("Status before and after MinC should be identical", fileToStatus, fileToStatusAfterMinc); - } - - @Test - public void testUnreferencedWalInTserverIsClosed() throws Exception { - final String[] names = getUniqueNames(2); - // `table` will be replicated, `otherTable` is only used to roll the WAL on the tserver - final String table = names[0], otherTable = names[1]; - final Connector conn = getConnector(); - - // Bring the replication table online first and foremost - ReplicationTable.setOnline(conn); - - log.info("Creating {}", table); - conn.tableOperations().create(table); - - conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION.getKey(), "true"); - - log.info("Writing a few mutations to the table"); - - BatchWriter bw = conn.createBatchWriter(table, null); - - byte[] empty = new byte[0]; - for (int i = 0; i < 5; i++) { - Mutation m = new Mutation(Integer.toString(i)); - m.put(empty, empty, empty); - bw.addMutation(m); - } - - log.info("Flushing mutations to the server"); - bw.close(); - - log.info("Checking that metadata only has one WAL recorded for this table"); - - Set<String> wals = getWalsForTable(table); - Assert.assertEquals("Expected to only find one WAL for the table", 1, wals.size()); - - log.info("Compacting the table which will remove all WALs from the tablets"); - - // Flush our test table to remove the WAL references in it - conn.tableOperations().flush(table, null, null, true); - // Flush the metadata table too because it will have a reference to the WAL - conn.tableOperations().flush(MetadataTable.NAME, null, null, true); - - log.info("Waiting for replication table to come online"); - - log.info("Fetching replication statuses from metadata table"); - - Map<String,Status> fileToStatus = getMetadataStatusForTable(table); - - Assert.assertEquals("Expected to only find one replication status message", 1, fileToStatus.size()); - - String walName = fileToStatus.keySet().iterator().next(); - Assert.assertEquals("Expected log file name from tablet to equal replication entry", wals.iterator().next(), walName); - - Status status = fileToStatus.get(walName); - - Assert.assertEquals("Expected Status for file to not be closed", false, status.getClosed()); - - log.info("Checking to see that log entries are removed from tablet section after MinC"); - // After compaction, the log column should be gone from the tablet - Set<String> walsAfterMinc = getWalsForTable(table); - Assert.assertEquals("Expected to find no WALs for tablet", 0, walsAfterMinc.size()); - - Set<String> filesForTable = getFilesForTable(table); - Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTable.size()); - log.info("Files for table before MajC: {}", filesForTable); - - // Issue a MajC to roll a new file in HDFS - conn.tableOperations().compact(table, null, null, false, true); - - Set<String> filesForTableAfterCompaction = getFilesForTable(table); - - log.info("Files for table after MajC: {}", filesForTableAfterCompaction); - - Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTableAfterCompaction.size()); - Assert.assertNotEquals("Expected the files before and after compaction to differ", filesForTableAfterCompaction, filesForTable); - - // Use the rfile which was just replaced by the MajC to determine when the GC has ran - Path fileToBeDeleted = new Path(filesForTable.iterator().next()); - FileSystem fs = fileToBeDeleted.getFileSystem(new Configuration()); - - boolean fileExists = fs.exists(fileToBeDeleted); - while (fileExists) { - log.info("File which should get deleted still exists: {}", fileToBeDeleted); - Thread.sleep(2000); - fileExists = fs.exists(fileToBeDeleted); - } - - // At this point in time, we *know* that the GarbageCollector has run which means that the Status - // for our WAL should not be altered. - - log.info("Re-checking that WALs are still not referenced for our table"); - - Set<String> walsAfterMajc = getWalsForTable(table); - Assert.assertEquals("Expected to find no WALs in tablets section: " + walsAfterMajc, 0, walsAfterMajc.size()); - - Map<String,Status> fileToStatusAfterMinc = getMetadataStatusForTable(table); - Assert.assertEquals("Expected to still find only one replication status message: " + fileToStatusAfterMinc, 1, fileToStatusAfterMinc.size()); - - Assert.assertEquals("Status before and after MinC should be identical", fileToStatus, fileToStatusAfterMinc); - - /* - * To verify that the WALs is still getting closed, we have to force the tserver to close the existing WAL and open a new one instead. The easiest way to do - * this is to write a load of data that will exceed the 1.33% full threshold that the logger keeps track of - */ - - conn.tableOperations().create(otherTable); - bw = conn.createBatchWriter(otherTable, null); - // 500k - byte[] bigValue = new byte[1024 * 500]; - Arrays.fill(bigValue, (byte)1); - // 500k * 50 - for (int i = 0; i < 50; i++) { - Mutation m = new Mutation(Integer.toString(i)); - m.put(empty, empty, bigValue); - bw.addMutation(m); - if (i % 10 == 0) { - bw.flush(); - } - } - - bw.close(); - - conn.tableOperations().flush(otherTable, null, null, true); - - final TCredentials tcreds = new Credentials("root", new PasswordToken(AbstractMacIT.ROOT_PASSWORD)).toThrift(conn.getInstance()); - - // Get the tservers which the master deems as active - List<String> tservers = MasterClient.execute(conn.getInstance(), new ClientExecReturn<List<String>,MasterClientService.Client>() { - @Override - public List<String> execute(MasterClientService.Client client) throws Exception { - return client.getActiveTservers(Tracer.traceInfo(), tcreds); - } - }); - - Assert.assertEquals("Expected only one active tservers", 1, tservers.size()); - - String tserver = tservers.get(0); - AccumuloConfiguration rpcConfig = ClientConfigurationHelper.getClientRpcConfiguration(conn.getInstance()); - - // Get the active WALs from that server - log.info("Fetching active WALs from {}", tserver); - - Client client = ThriftUtil.getTServerClient(tserver, rpcConfig); - List<String> activeWalsForTserver = client.getActiveLogs(Tracer.traceInfo(), tcreds); - - log.info("Active wals: {}", activeWalsForTserver); - - Assert.assertEquals("Expected to find only one active WAL", 1, activeWalsForTserver.size()); - - String activeWal = new Path(activeWalsForTserver.get(0)).toString(); - - Assert.assertNotEquals("Current active WAL on tserver should not be the original WAL we saw", walName, activeWal); - - log.info("Ensuring that replication status does get closed after WAL is no longer in use by Tserver"); - - do { - Map<String,Status> replicationStatuses = getMetadataStatusForTable(table); - - log.info("Got replication status messages {}", replicationStatuses); - Assert.assertEquals("Did not expect to find additional status records", 1, replicationStatuses.size()); - - status = replicationStatuses.values().iterator().next(); - log.info("Current status: {}", ProtobufUtil.toString(status)); - - if (status.getClosed()) { - return; - } - - log.info("Status is not yet closed, waiting for garbage collector to close it"); - - Thread.sleep(2000); - } while (true); - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/00105ce1/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java new file mode 100644 index 0000000..1044677 --- /dev/null +++ b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java @@ -0,0 +1,446 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.replication; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.impl.ClientConfigurationHelper; +import org.apache.accumulo.core.client.impl.ClientExecReturn; +import org.apache.accumulo.core.client.impl.MasterClient; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.master.thrift.MasterClientService; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.protobuf.ProtobufUtil; +import org.apache.accumulo.core.replication.ReplicationTable; +import org.apache.accumulo.core.replication.proto.Replication.Status; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.Credentials; +import org.apache.accumulo.core.security.thrift.TCredentials; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client; +import org.apache.accumulo.core.trace.Tracer; +import org.apache.accumulo.core.util.ThriftUtil; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.functional.AbstractMacIT; +import org.apache.accumulo.test.functional.ConfigurableMacIT; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.io.Text; +import org.bouncycastle.util.Arrays; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ACCUMULO-3302 series of tests which ensure that a WAL is prematurely closed when a TServer may still continue to use it. Checking that no tablet references a + * WAL is insufficient to determine if a WAL will never be used in the future. + */ +public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacIT { + private static final Logger log = LoggerFactory.getLogger(GarbageCollectorCommunicatesWithTServersIT.class); + + private final int GC_PERIOD_SECONDS = 1; + + @Override + public int defaultTimeoutSeconds() { + return 2 * 60; + } + + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration coreSite) { + cfg.setNumTservers(1); + cfg.setProperty(Property.GC_CYCLE_DELAY, GC_PERIOD_SECONDS + "s"); + // Wait longer to try to let the replication table come online before a cycle runs + cfg.setProperty(Property.GC_CYCLE_START, "10s"); + cfg.setProperty(Property.REPLICATION_NAME, "master"); + // Set really long delays for the master to do stuff for replication. We don't need + // it to be doing anything, so just let it sleep + cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_DELAY, "240s"); + cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "240s"); + cfg.setProperty(Property.REPLICATION_DRIVER_DELAY, "240s"); + // Pull down the maximum size of the wal so we can test close()'ing it. + cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M"); + coreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + } + + /** + * Fetch all of the WALs referenced by tablets in the metadata table for this table + */ + private Set<String> getWalsForTable(String tableName) throws Exception { + final Connector conn = getConnector(); + final String tableId = conn.tableOperations().tableIdMap().get(tableName); + + Assert.assertNotNull("Could not determine table ID for " + tableName, tableId); + + Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + Range r = MetadataSchema.TabletsSection.getRange(tableId); + s.setRange(r); + s.fetchColumnFamily(MetadataSchema.TabletsSection.LogColumnFamily.NAME); + + Set<String> wals = new HashSet<String>(); + for (Entry<Key,Value> entry : s) { + log.debug("Reading WALs: {}={}", entry.getKey().toStringNoTruncate(), entry.getValue()); + // hostname:port/uri://path/to/wal + String cq = entry.getKey().getColumnQualifier().toString(); + int index = cq.indexOf('/'); + // Normalize the path + String path = new Path(cq.substring(index + 1)).toString(); + log.debug("Extracted file: " + path); + wals.add(path); + } + + return wals; + } + + /** + * Fetch all of the rfiles referenced by tablets in the metadata table for this table + */ + private Set<String> getFilesForTable(String tableName) throws Exception { + final Connector conn = getConnector(); + final String tableId = conn.tableOperations().tableIdMap().get(tableName); + + Assert.assertNotNull("Could not determine table ID for " + tableName, tableId); + + Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + Range r = MetadataSchema.TabletsSection.getRange(tableId); + s.setRange(r); + s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME); + + Set<String> rfiles = new HashSet<String>(); + for (Entry<Key,Value> entry : s) { + log.debug("Reading RFiles: {}={}", entry.getKey().toStringNoTruncate(), entry.getValue()); + // uri://path/to/wal + String cq = entry.getKey().getColumnQualifier().toString(); + String path = new Path(cq).toString(); + log.debug("Normalize path to rfile: {}", path); + rfiles.add(path); + } + + return rfiles; + } + + /** + * Get the replication status messages for the given table that exist in the metadata table (~repl entries) + */ + private Map<String,Status> getMetadataStatusForTable(String tableName) throws Exception { + final Connector conn = getConnector(); + final String tableId = conn.tableOperations().tableIdMap().get(tableName); + + Assert.assertNotNull("Could not determine table ID for " + tableName, tableId); + + Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + Range r = MetadataSchema.ReplicationSection.getRange(); + s.setRange(r); + s.fetchColumn(MetadataSchema.ReplicationSection.COLF, new Text(tableId)); + + Map<String,Status> fileToStatus = new HashMap<String,Status>(); + for (Entry<Key,Value> entry : s) { + Text file = new Text(); + MetadataSchema.ReplicationSection.getFile(entry.getKey(), file); + Status status = Status.parseFrom(entry.getValue().get()); + log.info("Got status for {}: {}", file, ProtobufUtil.toString(status)); + fileToStatus.put(file.toString(), status); + } + + return fileToStatus; + } + + @Test + public void testActiveWalPrecludesClosing() throws Exception { + final String table = getUniqueNames(1)[0]; + final Connector conn = getConnector(); + + // Bring the replication table online first and foremost + ReplicationTable.setOnline(conn); + + log.info("Creating {}", table); + conn.tableOperations().create(table); + + conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION.getKey(), "true"); + + log.info("Writing a few mutations to the table"); + + BatchWriter bw = conn.createBatchWriter(table, null); + + byte[] empty = new byte[0]; + for (int i = 0; i < 5; i++) { + Mutation m = new Mutation(Integer.toString(i)); + m.put(empty, empty, empty); + bw.addMutation(m); + } + + log.info("Flushing mutations to the server"); + bw.flush(); + + log.info("Checking that metadata only has one WAL recorded for this table"); + + Set<String> wals = getWalsForTable(table); + Assert.assertEquals("Expected to only find one WAL for the table", 1, wals.size()); + + log.info("Compacting the table which will remove all WALs from the tablets"); + + // Flush our test table to remove the WAL references in it + conn.tableOperations().flush(table, null, null, true); + // Flush the metadata table too because it will have a reference to the WAL + conn.tableOperations().flush(MetadataTable.NAME, null, null, true); + + log.info("Waiting for replication table to come online"); + + log.info("Fetching replication statuses from metadata table"); + + Map<String,Status> fileToStatus = getMetadataStatusForTable(table); + + Assert.assertEquals("Expected to only find one replication status message", 1, fileToStatus.size()); + + String walName = fileToStatus.keySet().iterator().next(); + Assert.assertEquals("Expected log file name from tablet to equal replication entry", wals.iterator().next(), walName); + + Status status = fileToStatus.get(walName); + + Assert.assertEquals("Expected Status for file to not be closed", false, status.getClosed()); + + log.info("Checking to see that log entries are removed from tablet section after MinC"); + // After compaction, the log column should be gone from the tablet + Set<String> walsAfterMinc = getWalsForTable(table); + Assert.assertEquals("Expected to find no WALs for tablet", 0, walsAfterMinc.size()); + + Set<String> filesForTable = getFilesForTable(table); + Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTable.size()); + log.info("Files for table before MajC: {}", filesForTable); + + // Issue a MajC to roll a new file in HDFS + conn.tableOperations().compact(table, null, null, false, true); + + Set<String> filesForTableAfterCompaction = getFilesForTable(table); + + log.info("Files for table after MajC: {}", filesForTableAfterCompaction); + + Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTableAfterCompaction.size()); + Assert.assertNotEquals("Expected the files before and after compaction to differ", filesForTableAfterCompaction, filesForTable); + + // Use the rfile which was just replaced by the MajC to determine when the GC has ran + Path fileToBeDeleted = new Path(filesForTable.iterator().next()); + FileSystem fs = fileToBeDeleted.getFileSystem(new Configuration()); + + boolean fileExists = fs.exists(fileToBeDeleted); + while (fileExists) { + log.info("File which should get deleted still exists: {}", fileToBeDeleted); + Thread.sleep(2000); + fileExists = fs.exists(fileToBeDeleted); + } + + // At this point in time, we *know* that the GarbageCollector has run which means that the Status + // for our WAL should not be altered. + + log.info("Re-checking that WALs are still not referenced for our table"); + + Set<String> walsAfterMajc = getWalsForTable(table); + Assert.assertEquals("Expected to find no WALs in tablets section: " + walsAfterMajc, 0, walsAfterMajc.size()); + + Map<String,Status> fileToStatusAfterMinc = getMetadataStatusForTable(table); + Assert.assertEquals("Expected to still find only one replication status message: " + fileToStatusAfterMinc, 1, fileToStatusAfterMinc.size()); + + Assert.assertEquals("Status before and after MinC should be identical", fileToStatus, fileToStatusAfterMinc); + } + + @Test + public void testUnreferencedWalInTserverIsClosed() throws Exception { + final String[] names = getUniqueNames(2); + // `table` will be replicated, `otherTable` is only used to roll the WAL on the tserver + final String table = names[0], otherTable = names[1]; + final Connector conn = getConnector(); + + // Bring the replication table online first and foremost + ReplicationTable.setOnline(conn); + + log.info("Creating {}", table); + conn.tableOperations().create(table); + + conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION.getKey(), "true"); + + log.info("Writing a few mutations to the table"); + + BatchWriter bw = conn.createBatchWriter(table, null); + + byte[] empty = new byte[0]; + for (int i = 0; i < 5; i++) { + Mutation m = new Mutation(Integer.toString(i)); + m.put(empty, empty, empty); + bw.addMutation(m); + } + + log.info("Flushing mutations to the server"); + bw.close(); + + log.info("Checking that metadata only has one WAL recorded for this table"); + + Set<String> wals = getWalsForTable(table); + Assert.assertEquals("Expected to only find one WAL for the table", 1, wals.size()); + + log.info("Compacting the table which will remove all WALs from the tablets"); + + // Flush our test table to remove the WAL references in it + conn.tableOperations().flush(table, null, null, true); + // Flush the metadata table too because it will have a reference to the WAL + conn.tableOperations().flush(MetadataTable.NAME, null, null, true); + + log.info("Fetching replication statuses from metadata table"); + + Map<String,Status> fileToStatus = getMetadataStatusForTable(table); + + Assert.assertEquals("Expected to only find one replication status message", 1, fileToStatus.size()); + + String walName = fileToStatus.keySet().iterator().next(); + Assert.assertEquals("Expected log file name from tablet to equal replication entry", wals.iterator().next(), walName); + + Status status = fileToStatus.get(walName); + + Assert.assertEquals("Expected Status for file to not be closed", false, status.getClosed()); + + log.info("Checking to see that log entries are removed from tablet section after MinC"); + // After compaction, the log column should be gone from the tablet + Set<String> walsAfterMinc = getWalsForTable(table); + Assert.assertEquals("Expected to find no WALs for tablet", 0, walsAfterMinc.size()); + + Set<String> filesForTable = getFilesForTable(table); + Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTable.size()); + log.info("Files for table before MajC: {}", filesForTable); + + // Issue a MajC to roll a new file in HDFS + conn.tableOperations().compact(table, null, null, false, true); + + Set<String> filesForTableAfterCompaction = getFilesForTable(table); + + log.info("Files for table after MajC: {}", filesForTableAfterCompaction); + + Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTableAfterCompaction.size()); + Assert.assertNotEquals("Expected the files before and after compaction to differ", filesForTableAfterCompaction, filesForTable); + + // Use the rfile which was just replaced by the MajC to determine when the GC has ran + Path fileToBeDeleted = new Path(filesForTable.iterator().next()); + FileSystem fs = fileToBeDeleted.getFileSystem(new Configuration()); + + boolean fileExists = fs.exists(fileToBeDeleted); + while (fileExists) { + log.info("File which should get deleted still exists: {}", fileToBeDeleted); + Thread.sleep(2000); + fileExists = fs.exists(fileToBeDeleted); + } + + // At this point in time, we *know* that the GarbageCollector has run which means that the Status + // for our WAL should not be altered. + + log.info("Re-checking that WALs are still not referenced for our table"); + + Set<String> walsAfterMajc = getWalsForTable(table); + Assert.assertEquals("Expected to find no WALs in tablets section: " + walsAfterMajc, 0, walsAfterMajc.size()); + + Map<String,Status> fileToStatusAfterMinc = getMetadataStatusForTable(table); + Assert.assertEquals("Expected to still find only one replication status message: " + fileToStatusAfterMinc, 1, fileToStatusAfterMinc.size()); + + Assert.assertEquals("Status before and after MinC should be identical", fileToStatus, fileToStatusAfterMinc); + + /* + * To verify that the WALs is still getting closed, we have to force the tserver to close the existing WAL and open a new one instead. The easiest way to do + * this is to write a load of data that will exceed the 1.33% full threshold that the logger keeps track of + */ + + conn.tableOperations().create(otherTable); + bw = conn.createBatchWriter(otherTable, null); + // 500k + byte[] bigValue = new byte[1024 * 500]; + Arrays.fill(bigValue, (byte)1); + // 500k * 50 + for (int i = 0; i < 50; i++) { + Mutation m = new Mutation(Integer.toString(i)); + m.put(empty, empty, bigValue); + bw.addMutation(m); + if (i % 10 == 0) { + bw.flush(); + } + } + + bw.close(); + + conn.tableOperations().flush(otherTable, null, null, true); + + final TCredentials tcreds = new Credentials("root", new PasswordToken(AbstractMacIT.ROOT_PASSWORD)).toThrift(conn.getInstance()); + + // Get the tservers which the master deems as active + List<String> tservers = MasterClient.execute(conn.getInstance(), new ClientExecReturn<List<String>,MasterClientService.Client>() { + @Override + public List<String> execute(MasterClientService.Client client) throws Exception { + return client.getActiveTservers(Tracer.traceInfo(), tcreds); + } + }); + + Assert.assertEquals("Expected only one active tservers", 1, tservers.size()); + + String tserver = tservers.get(0); + AccumuloConfiguration rpcConfig = ClientConfigurationHelper.getClientRpcConfiguration(conn.getInstance()); + + // Get the active WALs from that server + log.info("Fetching active WALs from {}", tserver); + + Client client = ThriftUtil.getTServerClient(tserver, rpcConfig); + List<String> activeWalsForTserver = client.getActiveLogs(Tracer.traceInfo(), tcreds); + + log.info("Active wals: {}", activeWalsForTserver); + + Assert.assertEquals("Expected to find only one active WAL", 1, activeWalsForTserver.size()); + + String activeWal = new Path(activeWalsForTserver.get(0)).toString(); + + Assert.assertNotEquals("Current active WAL on tserver should not be the original WAL we saw", walName, activeWal); + + log.info("Ensuring that replication status does get closed after WAL is no longer in use by Tserver"); + + do { + Map<String,Status> replicationStatuses = getMetadataStatusForTable(table); + + log.info("Got replication status messages {}", replicationStatuses); + Assert.assertEquals("Did not expect to find additional status records", 1, replicationStatuses.size()); + + status = replicationStatuses.values().iterator().next(); + log.info("Current status: {}", ProtobufUtil.toString(status)); + + if (status.getClosed()) { + return; + } + + log.info("Status is not yet closed, waiting for garbage collector to close it"); + + Thread.sleep(2000); + } while (true); + } +}