This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new c1b1781f0e test volume replacement with flaky ample (#4781) c1b1781f0e is described below commit c1b1781f0e30a4b110946c9429a99e4918d08261 Author: Keith Turner <ktur...@apache.org> AuthorDate: Thu Aug 1 08:29:09 2024 -0700 test volume replacement with flaky ample (#4781) The goal of these changes is to exercise the conditional mutation code used to replace volumes in tablets. Refactored some code in VolumeIT into VolumeBaseIT so that VolumeBaseIT could be extended by VolumeFlakyAmpleIT that injects manager and tservers that use FlakyAmple. Ran into problems during this change. First problem was that manager and tserver processes were dying because of OOME. Using heap dumps found that TestAmple objects were continually being created and each TestAmple object created a hadoop configuration object. Eventually memory would fill up with these configuration objects. Do not fully understand why the hadoop config object never went away because it seemed like the TestAmple objects should have been transient. This problem may be worth further investigation. In this commit only a single TestAmple object per ServerContext is created to solve the problem. The second problem encountered was that VolumeIT was created 2 tables with 100 tablets each. When using FlakyAmple all tablet operations take longer and the test ended up timing out because of this. To solve this problem, lowered the number of tablet and increased the number of compactors. Co-authored-by: Dave Marion <dlmar...@apache.org> --- .../accumulo/manager/TabletGroupWatcher.java | 23 +- .../apache/accumulo/test/VolumeFlakyAmpleIT.java | 53 ++++ .../java/org/apache/accumulo/test/VolumeIT.java | 329 +-------------------- .../test/{VolumeIT.java => VolumeITBase.java} | 256 +++------------- .../test/ample/FlakyAmpleServerContext.java | 16 +- 5 files changed, 120 insertions(+), 557 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index 6ef007449f..d621b65a46 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -44,7 +44,6 @@ import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.accumulo.core.client.Scanner; @@ -64,7 +63,6 @@ import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.manager.thrift.ManagerGoalState; import org.apache.accumulo.core.manager.thrift.ManagerState; import org.apache.accumulo.core.manager.thrift.TabletServerStatus; -import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.TabletState; import org.apache.accumulo.core.metadata.schema.Ample; @@ -742,8 +740,13 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { // replacement. We only want to stop looking for tablets that need volume replacement when // we have successfully processed all tablet metadata and no more volume replacements are // being performed. + Manager.log.debug("[{}] saw {} tablets needing volume replacement", store.name(), + tabletMgmtStats.totalVolumeReplacements); lookForTabletsNeedingVolReplacement = tabletMgmtStats.totalVolumeReplacements != 0 || tabletMgmtStats.tabletsWithErrors != 0; + if (!lookForTabletsNeedingVolReplacement) { + Manager.log.debug("[{}] no longer looking for volume replacements", store.name()); + } } // provide stats after flushing changes to avoid race conditions w/ delete table @@ -1048,9 +1051,19 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { vr.filesToRemove.forEach(tabletMutator::deleteFile); vr.filesToAdd.forEach(tabletMutator::putFile); - tabletMutator.submit( - tm -> tm.getLogs().containsAll(vr.logsToAdd) && tm.getFiles().containsAll(vr.filesToAdd - .keySet().stream().map(ReferencedTabletFile::insert).collect(Collectors.toSet()))); + tabletMutator.submit(tm -> { + // Check to see if the logs and files are removed. Checking if the new files or logs were + // added has a race condition, those could have been successfully added and then removed + // before this check runs, like if a compaction runs. Once the old volumes are removed + // nothing should ever add them again. + var logsRemoved = + Collections.disjoint(Set.copyOf(tm.getLogs()), Set.copyOf(vr.logsToRemove)); + var filesRemoved = Collections.disjoint(tm.getFiles(), Set.copyOf(vr.filesToRemove)); + LOG.debug( + "replaceVolume conditional mutation rejection check {} logsRemoved:{} filesRemoved:{}", + tm.getExtent(), logsRemoved, filesRemoved); + return logsRemoved && filesRemoved; + }); } tabletsMutator.process().forEach((extent, result) -> { diff --git a/test/src/main/java/org/apache/accumulo/test/VolumeFlakyAmpleIT.java b/test/src/main/java/org/apache/accumulo/test/VolumeFlakyAmpleIT.java new file mode 100644 index 0000000000..1262c00fa0 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/VolumeFlakyAmpleIT.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test; + +import java.util.TreeSet; + +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.ample.FlakyAmpleManager; +import org.apache.accumulo.test.ample.FlakyAmpleTserver; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; + +public class VolumeFlakyAmpleIT extends VolumeITBase { + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + super.configure(cfg, hadoopCoreSite); + cfg.setServerClass(ServerType.MANAGER, FlakyAmpleManager.class); + cfg.setServerClass(ServerType.TABLET_SERVER, FlakyAmpleTserver.class); + // The test creates a lots of tablet that need to compact. Reserving and commiting compactions + // is slower because of FlakyAmple causing conditional mutations to fail. So start more + // compactors to compensate for this. + cfg.getClusterServerConfiguration().setNumDefaultCompactors(3); + } + + @Override + protected TreeSet<Text> generateSplits() { + // The regular version of this test creates 100 tablets. However 100 tablets and FlakyAmple + // causing each tablet operation take longer results in longer test runs times. So lower the + // number of tablets to 10 to speed up the test with flaky ample. + TreeSet<Text> splits = new TreeSet<>(); + for (int i = 10; i < 100; i += 10) { + splits.add(new Text(String.format("%06d", i * 100))); + } + return splits; + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/VolumeIT.java b/test/src/main/java/org/apache/accumulo/test/VolumeIT.java index 25bea4e892..e6336d6d35 100644 --- a/test/src/main/java/org/apache/accumulo/test/VolumeIT.java +++ b/test/src/main/java/org/apache/accumulo/test/VolumeIT.java @@ -18,16 +18,9 @@ */ package org.apache.accumulo.test; -import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; -import java.io.File; -import java.io.FileReader; -import java.io.FileWriter; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -36,94 +29,34 @@ import java.util.List; import java.util.Map.Entry; import java.util.SortedSet; import java.util.TreeSet; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableExistsException; -import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.DiskUsage; import org.apache.accumulo.core.client.admin.NewTableConfiguration; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.clientImpl.ClientContext; -import org.apache.accumulo.core.conf.ClientProperty; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.InstanceId; import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.StoredTabletFile; -import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.security.TablePermission; -import org.apache.accumulo.core.util.UtilWaitThread; -import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; -import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.init.Initialize; -import org.apache.accumulo.server.log.WalStateManager; -import org.apache.accumulo.server.log.WalStateManager.WalMarkerException; -import org.apache.accumulo.server.log.WalStateManager.WalState; -import org.apache.accumulo.server.security.SystemCredentials; import org.apache.accumulo.server.util.Admin; -import org.apache.accumulo.test.functional.ConfigurableMacBase; -import org.apache.accumulo.test.util.FileMetadataUtil; -import org.apache.commons.configuration2.PropertiesConfiguration; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.io.Text; -import org.apache.zookeeper.KeeperException.NoNodeException; import org.junit.jupiter.api.Test; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; - -public class VolumeIT extends ConfigurableMacBase { - - private File volDirBase; - private Path v1, v2, v3; - private List<String> expected = new ArrayList<>(); - - @Override - public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - File baseDir = cfg.getDir(); - volDirBase = new File(baseDir, "volumes"); - File v1f = new File(volDirBase, "v1"); - File v2f = new File(volDirBase, "v2"); - v1 = new Path("file://" + v1f.getAbsolutePath()); - v2 = new Path("file://" + v2f.getAbsolutePath()); - File v3f = new File(volDirBase, "v3"); - v3 = new Path("file://" + v3f.getAbsolutePath()); - // setup expected rows - for (int i = 0; i < 100; i++) { - String row = String.format("%06d", i * 100 + 3); - expected.add(row + ":cf1:cq1:1"); - } - - // Run MAC on two locations in the local file system - cfg.setProperty(Property.INSTANCE_VOLUMES, v1 + "," + v2); - cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s"); - cfg.setClientProperty(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT.getKey(), "15s"); - - // use raw local file system so walogs sync and flush will work - hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); - - super.configure(cfg, hadoopCoreSite); - } +public class VolumeIT extends VolumeITBase { @Test public void test() throws Exception { @@ -174,23 +107,6 @@ public class VolumeIT extends ConfigurableMacBase { } } - private void verifyData(List<String> expected, Scanner createScanner) { - - List<String> actual = new ArrayList<>(); - - for (Entry<Key,Value> entry : createScanner) { - Key k = entry.getKey(); - actual.add(k.getRow() + ":" + k.getColumnFamily() + ":" + k.getColumnQualifier() + ":" - + entry.getValue()); - } - - Collections.sort(expected); - Collections.sort(actual); - - createScanner.close(); - assertEquals(expected, actual); - } - @Test public void testAddVolumes() throws Exception { try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { @@ -263,111 +179,6 @@ public class VolumeIT extends ConfigurableMacBase { } } - private void writeData(String tableName, AccumuloClient client) throws AccumuloException, - AccumuloSecurityException, TableExistsException, TableNotFoundException { - - TreeSet<Text> splits = new TreeSet<>(); - for (int i = 1; i < 100; i++) { - splits.add(new Text(String.format("%06d", i * 100))); - } - - NewTableConfiguration ntc = new NewTableConfiguration().withSplits(splits); - client.tableOperations().create(tableName, ntc); - - try (BatchWriter bw = client.createBatchWriter(tableName)) { - for (int i = 0; i < 100; i++) { - String row = String.format("%06d", i * 100 + 3); - Mutation m = new Mutation(row); - m.put("cf1", "cq1", "1"); - bw.addMutation(m); - } - } - } - - private void verifyVolumesUsed(AccumuloClient client, String tableName, boolean shouldExist, - Path... paths) throws Exception { - verifyVolumesUsed(client, tableName, shouldExist, false, paths); - } - - private void verifyVolumesUsed(AccumuloClient client, String tableName, boolean shouldExist, - boolean rangedFiles, Path... paths) throws Exception { - - if (!client.tableOperations().exists(tableName)) { - assertFalse(shouldExist); - - writeData(tableName, client); - - verifyData(expected, client.createScanner(tableName, Authorizations.EMPTY)); - - client.tableOperations().flush(tableName, null, null, true); - } - - verifyData(expected, client.createScanner(tableName, Authorizations.EMPTY)); - - TableId tableId = TableId.of(client.tableOperations().tableIdMap().get(tableName)); - try (Scanner metaScanner = - client.createScanner(AccumuloTable.METADATA.tableName(), Authorizations.EMPTY)) { - metaScanner.fetchColumnFamily(DataFileColumnFamily.NAME); - metaScanner.setRange(new KeyExtent(tableId, null, null).toMetaRange()); - - int[] counts = new int[paths.length]; - - outer: for (Entry<Key,Value> entry : metaScanner) { - String path = StoredTabletFile.of(entry.getKey().getColumnQualifier()).getMetadataPath(); - - for (int i = 0; i < paths.length; i++) { - if (path.contains(paths[i].toString())) { - counts[i]++; - continue outer; - } - } - - fail("Unexpected volume " + path); - } - - // keep retrying until WAL state information in ZooKeeper stabilizes or until test times out - retry: while (true) { - WalStateManager wals = new WalStateManager(getServerContext()); - try { - outer: for (Entry<Path,WalState> entry : wals.getAllState().entrySet()) { - for (Path path : paths) { - if (entry.getKey().toString().startsWith(path.toString())) { - continue outer; - } - } - log.warn("Unexpected volume " + entry.getKey() + " (" + entry.getValue() + ")"); - UtilWaitThread.sleep(100); - continue retry; - } - } catch (WalMarkerException e) { - Throwable cause = e.getCause(); - if (cause instanceof NoNodeException) { - // ignore WALs being cleaned up - continue retry; - } - throw e; - } - break; - } - - // if a volume is chosen randomly for each tablet, then the probability that a volume will not - // be chosen for any tablet is ((num_volumes - - // 1)/num_volumes)^num_tablets. For 100 tablets and 3 volumes the probability that only 2 - // volumes would be chosen is 2.46e-18 - - int sum = 0; - for (int count : counts) { - assertTrue(count > 0); - sum += count; - } - - // When ranged files exist we there should be twice as many - // as the test split each file into 2 - int expectedCount = rangedFiles ? 200 : 100; - assertEquals(expectedCount, sum); - } - } - @Test public void testRemoveVolumes() throws Exception { try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { @@ -411,93 +222,6 @@ public class VolumeIT extends ConfigurableMacBase { } } - private void testReplaceVolume(AccumuloClient client, boolean cleanShutdown, boolean rangedFiles) - throws Exception { - String[] tableNames = getUniqueNames(3); - - verifyVolumesUsed(client, tableNames[0], false, v1, v2); - - // write to 2nd table, but do not flush data to disk before shutdown - try (AccumuloClient c2 = - cluster.createAccumuloClient("root", new PasswordToken(ROOT_PASSWORD))) { - writeData(tableNames[1], c2); - } - - // If flag is true then for each file split and create two files - // to verify volume replacement works on files with ranges - if (rangedFiles) { - splitFilesWithRange(client, tableNames[0]); - splitFilesWithRange(client, tableNames[1]); - } - - if (cleanShutdown) { - assertEquals(0, cluster.exec(Admin.class, "stopAll").getProcess().waitFor()); - } - - cluster.stop(); - - File v1f = new File(v1.toUri()); - File v8f = new File(new File(v1.getParent().toUri()), "v8"); - assertTrue(v1f.renameTo(v8f), "Failed to rename " + v1f + " to " + v8f); - Path v8 = new Path(v8f.toURI()); - - File v2f = new File(v2.toUri()); - File v9f = new File(new File(v2.getParent().toUri()), "v9"); - assertTrue(v2f.renameTo(v9f), "Failed to rename " + v2f + " to " + v9f); - Path v9 = new Path(v9f.toURI()); - - updateConfig(config -> { - config.setProperty(Property.INSTANCE_VOLUMES.getKey(), v8 + "," + v9); - config.setProperty(Property.INSTANCE_VOLUMES_REPLACEMENTS.getKey(), - v1 + " " + v8 + "," + v2 + " " + v9); - }); - - // start cluster and verify that volumes were replaced - cluster.start(); - - verifyVolumesUsed(client, tableNames[0], true, rangedFiles, v8, v9); - verifyVolumesUsed(client, tableNames[1], true, rangedFiles, v8, v9); - - // verify writes to new dir - client.tableOperations().compact(tableNames[0], null, null, true, true); - client.tableOperations().compact(tableNames[1], null, null, true, true); - - // Always pass false for ranged files as compaction will clean them up if exist - verifyVolumesUsed(client, tableNames[0], true, false, v8, v9); - verifyVolumesUsed(client, tableNames[1], true, false, v8, v9); - - client.tableOperations().compact(AccumuloTable.ROOT.tableName(), - new CompactionConfig().setWait(true)); - - // check that root tablet is not on volume 1 or 2 - int count = 0; - for (StoredTabletFile file : ((ClientContext) client).getAmple().readTablet(RootTable.EXTENT) - .getFiles()) { - assertTrue(file.getMetadataPath().startsWith(v8.toString()) - || file.getMetadataPath().startsWith(v9.toString())); - count++; - } - - assertTrue(count > 0); - - client.tableOperations().clone(tableNames[1], tableNames[2], true, new HashMap<>(), - new HashSet<>()); - - client.tableOperations().flush(AccumuloTable.METADATA.tableName(), null, null, true); - client.tableOperations().flush(AccumuloTable.ROOT.tableName(), null, null, true); - - verifyVolumesUsed(client, tableNames[0], true, v8, v9); - verifyVolumesUsed(client, tableNames[1], true, v8, v9); - verifyVolumesUsed(client, tableNames[2], true, v8, v9); - } - - @Test - public void testCleanReplaceVolumes() throws Exception { - try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { - testReplaceVolume(client, true, false); - } - } - @Test public void testDirtyReplaceVolumes() throws Exception { try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { @@ -519,55 +243,4 @@ public class VolumeIT extends ConfigurableMacBase { } } - @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "paths provided by test") - private void updateConfig(Consumer<PropertiesConfiguration> updater) throws Exception { - var file = new File(cluster.getAccumuloPropertiesPath()); - var config = new PropertiesConfiguration(); - try (FileReader out = new FileReader(file, UTF_8)) { - config.read(out); - } - updater.accept(config); - try (FileWriter out = new FileWriter(file, UTF_8)) { - config.write(out); - } - } - - // Go through each tablet file in metadata and split the files into two files - // by adding two new entries that covers half of the file. This will test that - // files with ranges work properly with volume replacement - private void splitFilesWithRange(AccumuloClient client, String tableName) throws Exception { - client.securityOperations().grantTablePermission(cluster.getConfig().getRootUserName(), - AccumuloTable.METADATA.tableName(), TablePermission.WRITE); - final ServerContext ctx = getServerContext(); - ctx.setCredentials(new SystemCredentials(client.instanceOperations().getInstanceId(), "root", - new PasswordToken(ROOT_PASSWORD))); - - AtomicInteger i = new AtomicInteger(); - FileMetadataUtil.mutateTabletFiles(ctx, tableName, null, null, (tm, mutator, file, value) -> { - i.incrementAndGet(); - - // Create a mutation to delete the existing file metadata entry with infinite range - mutator.deleteFile(file); - - // Find the midpoint and create two new files, each with a range covering half the file - Text tabletMidPoint = getTabletMidPoint(tm.getExtent().endRow()); - // Handle edge case for last tablet - if (tabletMidPoint == null) { - tabletMidPoint = new Text( - String.format("%06d", Integer.parseInt(tm.getExtent().prevEndRow().toString()) + 50)); - } - - final DataFileValue newValue = new DataFileValue(Integer.max(1, (int) (value.getSize() / 2)), - Integer.max(1, (int) (value.getNumEntries() / 2))); - mutator.putFile(StoredTabletFile.of(file.getPath(), - new Range(tm.getExtent().prevEndRow(), false, tabletMidPoint, true)), newValue); - mutator.putFile(StoredTabletFile.of(file.getPath(), - new Range(tabletMidPoint, false, tm.getExtent().endRow(), true)), newValue); - }); - } - - private static Text getTabletMidPoint(Text row) { - return row != null ? new Text(String.format("%06d", Integer.parseInt(row.toString()) - 50)) - : null; - } } diff --git a/test/src/main/java/org/apache/accumulo/test/VolumeIT.java b/test/src/main/java/org/apache/accumulo/test/VolumeITBase.java similarity index 61% copy from test/src/main/java/org/apache/accumulo/test/VolumeIT.java copy to test/src/main/java/org/apache/accumulo/test/VolumeITBase.java index 25bea4e892..2622824151 100644 --- a/test/src/main/java/org/apache/accumulo/test/VolumeIT.java +++ b/test/src/main/java/org/apache/accumulo/test/VolumeITBase.java @@ -28,18 +28,15 @@ import java.io.File; import java.io.FileReader; import java.io.FileWriter; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map.Entry; -import java.util.SortedSet; +import java.util.Map; import java.util.TreeSet; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; -import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.AccumuloException; @@ -49,13 +46,11 @@ import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.CompactionConfig; -import org.apache.accumulo.core.client.admin.DiskUsage; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.conf.ClientProperty; import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.InstanceId; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; @@ -66,42 +61,42 @@ import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.DataFileValue; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.TablePermission; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.server.ServerContext; -import org.apache.accumulo.server.init.Initialize; import org.apache.accumulo.server.log.WalStateManager; -import org.apache.accumulo.server.log.WalStateManager.WalMarkerException; -import org.apache.accumulo.server.log.WalStateManager.WalState; import org.apache.accumulo.server.security.SystemCredentials; import org.apache.accumulo.server.util.Admin; import org.apache.accumulo.test.functional.ConfigurableMacBase; import org.apache.accumulo.test.util.FileMetadataUtil; import org.apache.commons.configuration2.PropertiesConfiguration; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.io.Text; -import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.KeeperException; import org.junit.jupiter.api.Test; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -public class VolumeIT extends ConfigurableMacBase { +public abstract class VolumeITBase extends ConfigurableMacBase { + protected Path v1; + protected Path v2; + protected Path v3; + protected List<String> expected = new ArrayList<>(); - private File volDirBase; - private Path v1, v2, v3; - private List<String> expected = new ArrayList<>(); + private static Text getTabletMidPoint(Text row) { + return row != null ? new Text(String.format("%06d", Integer.parseInt(row.toString()) - 50)) + : null; + } @Override public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { File baseDir = cfg.getDir(); - volDirBase = new File(baseDir, "volumes"); + File volDirBase = new File(baseDir, "volumes"); File v1f = new File(volDirBase, "v1"); File v2f = new File(volDirBase, "v2"); v1 = new Path("file://" + v1f.getAbsolutePath()); @@ -125,60 +120,11 @@ public class VolumeIT extends ConfigurableMacBase { super.configure(cfg, hadoopCoreSite); } - @Test - public void test() throws Exception { - // create a table - try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { - String tableName = getUniqueNames(1)[0]; - // create set of splits - SortedSet<Text> partitions = new TreeSet<>(); - for (String s : "d,m,t".split(",")) { - partitions.add(new Text(s)); - } - // create table with splits - NewTableConfiguration ntc = new NewTableConfiguration().withSplits(partitions); - client.tableOperations().create(tableName, ntc); - // scribble over the splits - VolumeChooserIT.writeDataToTable(client, tableName, VolumeChooserIT.alpha_rows); - // write the data to disk, read it back - client.tableOperations().flush(tableName, null, null, true); - try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) { - int i = 0; - for (Entry<Key,Value> entry : scanner) { - assertEquals(VolumeChooserIT.alpha_rows[i++], entry.getKey().getRow().toString()); - } - } - // verify the new files are written to the different volumes - try (Scanner scanner = - client.createScanner(AccumuloTable.METADATA.tableName(), Authorizations.EMPTY)) { - scanner.setRange(new Range("1", "1<")); - scanner.fetchColumnFamily(DataFileColumnFamily.NAME); - int fileCount = 0; - - for (Entry<Key,Value> entry : scanner) { - boolean inV1 = StoredTabletFile.of(entry.getKey().getColumnQualifier()).getMetadataPath() - .contains(v1.toString()); - boolean inV2 = StoredTabletFile.of(entry.getKey().getColumnQualifier()).getMetadataPath() - .contains(v2.toString()); - assertTrue(inV1 || inV2); - fileCount++; - } - assertEquals(4, fileCount); - List<DiskUsage> diskUsage = - client.tableOperations().getDiskUsage(Collections.singleton(tableName)); - assertEquals(1, diskUsage.size()); - long usage = diskUsage.get(0).getUsage(); - log.debug("usage {}", usage); - assertTrue(usage > 700 && usage < 900); - } - } - } - - private void verifyData(List<String> expected, Scanner createScanner) { + protected void verifyData(List<String> expected, Scanner createScanner) { List<String> actual = new ArrayList<>(); - for (Entry<Key,Value> entry : createScanner) { + for (Map.Entry<Key,Value> entry : createScanner) { Key k = entry.getKey(); actual.add(k.getRow() + ":" + k.getColumnFamily() + ":" + k.getColumnQualifier() + ":" + entry.getValue()); @@ -191,85 +137,18 @@ public class VolumeIT extends ConfigurableMacBase { assertEquals(expected, actual); } - @Test - public void testAddVolumes() throws Exception { - try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { - String[] tableNames = getUniqueNames(2); - - InstanceId uuid = verifyAndShutdownCluster(client, tableNames[0]); - - updateConfig(config -> config.setProperty(Property.INSTANCE_VOLUMES.getKey(), - v1 + "," + v2 + "," + v3)); - - // initialize volume - assertEquals(0, cluster.exec(Initialize.class, "--add-volumes").getProcess().waitFor()); - - checkVolumesInitialized(Arrays.asList(v1, v2, v3), uuid); - - // start cluster and verify that new volume is used - cluster.start(); - - verifyVolumesUsed(client, tableNames[1], false, v1, v2, v3); - } - } - - // grab uuid before shutting down cluster - private InstanceId verifyAndShutdownCluster(AccumuloClient c, String tableName) throws Exception { - InstanceId uuid = c.instanceOperations().getInstanceId(); - - verifyVolumesUsed(c, tableName, false, v1, v2); - - assertEquals(0, cluster.exec(Admin.class, "stopAll").getProcess().waitFor()); - cluster.stop(); - - return uuid; - } - - @Test - public void testNonConfiguredVolumes() throws Exception { - - String[] tableNames = getUniqueNames(2); - - try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { - - InstanceId uuid = verifyAndShutdownCluster(client, tableNames[0]); - - updateConfig(config -> config.setProperty(Property.INSTANCE_VOLUMES.getKey(), v2 + "," + v3)); - - // initialize volume - assertEquals(0, cluster.exec(Initialize.class, "--add-volumes").getProcess().waitFor()); - - checkVolumesInitialized(Arrays.asList(v1, v2, v3), uuid); - - // start cluster and verify that new volume is used - cluster.start(); - - // verify we can still read the tables (tableNames[0] is likely to have a file still on v1) - verifyData(expected, client.createScanner(tableNames[0], Authorizations.EMPTY)); - - // v1 should not have any data for tableNames[1] - verifyVolumesUsed(client, tableNames[1], false, v2, v3); - } - } - - // check that all volumes are initialized - private void checkVolumesInitialized(List<Path> volumes, InstanceId uuid) throws Exception { - for (Path volumePath : volumes) { - FileSystem fs = volumePath.getFileSystem(cluster.getServerContext().getHadoopConf()); - Path vp = new Path(volumePath, Constants.INSTANCE_ID_DIR); - FileStatus[] iids = fs.listStatus(vp); - assertEquals(1, iids.length); - assertEquals(uuid.canonical(), iids[0].getPath().getName()); + protected TreeSet<Text> generateSplits() { + TreeSet<Text> splits = new TreeSet<>(); + for (int i = 1; i < 100; i++) { + splits.add(new Text(String.format("%06d", i * 100))); } + return splits; } private void writeData(String tableName, AccumuloClient client) throws AccumuloException, AccumuloSecurityException, TableExistsException, TableNotFoundException { - TreeSet<Text> splits = new TreeSet<>(); - for (int i = 1; i < 100; i++) { - splits.add(new Text(String.format("%06d", i * 100))); - } + TreeSet<Text> splits = generateSplits(); NewTableConfiguration ntc = new NewTableConfiguration().withSplits(splits); client.tableOperations().create(tableName, ntc); @@ -284,7 +163,7 @@ public class VolumeIT extends ConfigurableMacBase { } } - private void verifyVolumesUsed(AccumuloClient client, String tableName, boolean shouldExist, + protected void verifyVolumesUsed(AccumuloClient client, String tableName, boolean shouldExist, Path... paths) throws Exception { verifyVolumesUsed(client, tableName, shouldExist, false, paths); } @@ -307,12 +186,12 @@ public class VolumeIT extends ConfigurableMacBase { TableId tableId = TableId.of(client.tableOperations().tableIdMap().get(tableName)); try (Scanner metaScanner = client.createScanner(AccumuloTable.METADATA.tableName(), Authorizations.EMPTY)) { - metaScanner.fetchColumnFamily(DataFileColumnFamily.NAME); + metaScanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME); metaScanner.setRange(new KeyExtent(tableId, null, null).toMetaRange()); int[] counts = new int[paths.length]; - outer: for (Entry<Key,Value> entry : metaScanner) { + outer: for (Map.Entry<Key,Value> entry : metaScanner) { String path = StoredTabletFile.of(entry.getKey().getColumnQualifier()).getMetadataPath(); for (int i = 0; i < paths.length; i++) { @@ -329,7 +208,8 @@ public class VolumeIT extends ConfigurableMacBase { retry: while (true) { WalStateManager wals = new WalStateManager(getServerContext()); try { - outer: for (Entry<Path,WalState> entry : wals.getAllState().entrySet()) { + outer: for (Map.Entry<Path,WalStateManager.WalState> entry : wals.getAllState() + .entrySet()) { for (Path path : paths) { if (entry.getKey().toString().startsWith(path.toString())) { continue outer; @@ -339,9 +219,9 @@ public class VolumeIT extends ConfigurableMacBase { UtilWaitThread.sleep(100); continue retry; } - } catch (WalMarkerException e) { + } catch (WalStateManager.WalMarkerException e) { Throwable cause = e.getCause(); - if (cause instanceof NoNodeException) { + if (cause instanceof KeeperException.NoNodeException) { // ignore WALs being cleaned up continue retry; } @@ -363,56 +243,14 @@ public class VolumeIT extends ConfigurableMacBase { // When ranged files exist we there should be twice as many // as the test split each file into 2 - int expectedCount = rangedFiles ? 200 : 100; + int numTablets = generateSplits().size() + 1; + int expectedCount = rangedFiles ? numTablets * 2 : numTablets; assertEquals(expectedCount, sum); } } - @Test - public void testRemoveVolumes() throws Exception { - try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { - String[] tableNames = getUniqueNames(2); - - verifyVolumesUsed(client, tableNames[0], false, v1, v2); - - assertEquals(0, cluster.exec(Admin.class, "stopAll").getProcess().waitFor()); - cluster.stop(); - - updateConfig(config -> config.setProperty(Property.INSTANCE_VOLUMES.getKey(), v2.toString())); - - // start cluster and verify that volume was decommissioned - cluster.start(); - - client.tableOperations().compact(tableNames[0], null, null, true, true); - - verifyVolumesUsed(client, tableNames[0], true, v2); - - client.tableOperations().compact(AccumuloTable.ROOT.tableName(), - new CompactionConfig().setWait(true)); - - // check that root tablet is not on volume 1 - int count = 0; - for (StoredTabletFile file : ((ClientContext) client).getAmple().readTablet(RootTable.EXTENT) - .getFiles()) { - assertTrue(file.getMetadataPath().startsWith(v2.toString())); - count++; - } - - assertTrue(count > 0); - - client.tableOperations().clone(tableNames[0], tableNames[1], true, new HashMap<>(), - new HashSet<>()); - - client.tableOperations().flush(AccumuloTable.METADATA.tableName(), null, null, true); - client.tableOperations().flush(AccumuloTable.ROOT.tableName(), null, null, true); - - verifyVolumesUsed(client, tableNames[0], true, v2); - verifyVolumesUsed(client, tableNames[1], true, v2); - } - } - - private void testReplaceVolume(AccumuloClient client, boolean cleanShutdown, boolean rangedFiles) - throws Exception { + protected void testReplaceVolume(AccumuloClient client, boolean cleanShutdown, + boolean rangedFiles) throws Exception { String[] tableNames = getUniqueNames(3); verifyVolumesUsed(client, tableNames[0], false, v1, v2); @@ -498,29 +336,8 @@ public class VolumeIT extends ConfigurableMacBase { } } - @Test - public void testDirtyReplaceVolumes() throws Exception { - try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { - testReplaceVolume(client, false, false); - } - } - - @Test - public void testCleanReplaceVolumesWithRangedFiles() throws Exception { - try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { - testReplaceVolume(client, true, true); - } - } - - @Test - public void testDirtyReplaceVolumesWithRangedFiles() throws Exception { - try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { - testReplaceVolume(client, false, true); - } - } - @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "paths provided by test") - private void updateConfig(Consumer<PropertiesConfiguration> updater) throws Exception { + protected void updateConfig(Consumer<PropertiesConfiguration> updater) throws Exception { var file = new File(cluster.getAccumuloPropertiesPath()); var config = new PropertiesConfiguration(); try (FileReader out = new FileReader(file, UTF_8)) { @@ -550,7 +367,7 @@ public class VolumeIT extends ConfigurableMacBase { mutator.deleteFile(file); // Find the midpoint and create two new files, each with a range covering half the file - Text tabletMidPoint = getTabletMidPoint(tm.getExtent().endRow()); + Text tabletMidPoint = VolumeITBase.getTabletMidPoint(tm.getExtent().endRow()); // Handle edge case for last tablet if (tabletMidPoint == null) { tabletMidPoint = new Text( @@ -565,9 +382,4 @@ public class VolumeIT extends ConfigurableMacBase { new Range(tabletMidPoint, false, tm.getExtent().endRow(), true)), newValue); }); } - - private static Text getTabletMidPoint(Text row) { - return row != null ? new Text(String.format("%06d", Integer.parseInt(row.toString()) - 50)) - : null; - } } diff --git a/test/src/main/java/org/apache/accumulo/test/ample/FlakyAmpleServerContext.java b/test/src/main/java/org/apache/accumulo/test/ample/FlakyAmpleServerContext.java index 357be1aaff..209cfbbbb8 100644 --- a/test/src/main/java/org/apache/accumulo/test/ample/FlakyAmpleServerContext.java +++ b/test/src/main/java/org/apache/accumulo/test/ample/FlakyAmpleServerContext.java @@ -19,12 +19,15 @@ package org.apache.accumulo.test.ample; import java.util.Map; +import java.util.function.Supplier; import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.test.ample.metadata.TestAmple; +import com.google.common.base.Suppliers; + /** * A goal of this class is to exercise the lambdas passed to * {@link org.apache.accumulo.core.metadata.schema.Ample.ConditionalTabletMutator#submit(Ample.RejectionHandler)}. @@ -33,13 +36,22 @@ import org.apache.accumulo.test.ample.metadata.TestAmple; */ public class FlakyAmpleServerContext extends ServerContext { + private final Supplier<Ample> ampleSupplier; + public FlakyAmpleServerContext(SiteConfiguration siteConfig) { super(siteConfig); + // Each instance of TestAmple created will create a new Hadoop configuration object. These + // seemed to hang around and cause OOME and process death. Did not track down why they were + // hanging around, but decided to avoid creating a new instance of TestAmple each time Ample is + // requested in order to avoid creating those hadoop config objects. + ampleSupplier = Suppliers.memoize(() -> TestAmple.create( + this, Map.of(Ample.DataLevel.USER, Ample.DataLevel.USER.metaTable(), + Ample.DataLevel.METADATA, Ample.DataLevel.METADATA.metaTable()), + FlakyInterceptor::new)); } @Override public Ample getAmple() { - return TestAmple.create(this, Map.of(Ample.DataLevel.USER, Ample.DataLevel.USER.metaTable(), - Ample.DataLevel.METADATA, Ample.DataLevel.METADATA.metaTable()), FlakyInterceptor::new); + return ampleSupplier.get(); } }