This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new c1b1781f0e test volume replacement with flaky ample (#4781)
c1b1781f0e is described below

commit c1b1781f0e30a4b110946c9429a99e4918d08261
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Thu Aug 1 08:29:09 2024 -0700

    test volume replacement with flaky ample (#4781)
    
    The goal of these changes is to exercise the conditional mutation code
    used to replace volumes in tablets.
    
    Refactored some code in VolumeIT into VolumeBaseIT so that VolumeBaseIT
    could be extended by VolumeFlakyAmpleIT that injects manager and
    tservers that use FlakyAmple. Ran into problems during this change.
    First problem was that manager and tserver processes were dying because
    of OOME. Using heap dumps found that TestAmple objects were continually
    being created and each TestAmple object created a hadoop configuration
    object. Eventually memory would fill up with these configuration
    objects. Do not fully understand why the hadoop config object never
    went away because it seemed like the TestAmple objects should have been
    transient. This problem may be worth further investigation.  In this
    commit only a single TestAmple object per ServerContext is created to
    solve the problem.
    
    The second problem encountered was that VolumeIT was created 2 tables
    with 100 tablets each.  When using FlakyAmple all tablet operations take
    longer and the test ended up timing out because of this.  To solve this
    problem, lowered the number of tablet and increased the number of
    compactors.
    
    
    Co-authored-by: Dave Marion <dlmar...@apache.org>
---
 .../accumulo/manager/TabletGroupWatcher.java       |  23 +-
 .../apache/accumulo/test/VolumeFlakyAmpleIT.java   |  53 ++++
 .../java/org/apache/accumulo/test/VolumeIT.java    | 329 +--------------------
 .../test/{VolumeIT.java => VolumeITBase.java}      | 256 +++-------------
 .../test/ample/FlakyAmpleServerContext.java        |  16 +-
 5 files changed, 120 insertions(+), 557 deletions(-)

diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
index 6ef007449f..d621b65a46 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
@@ -44,7 +44,6 @@ import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import org.apache.accumulo.core.client.Scanner;
@@ -64,7 +63,6 @@ import 
org.apache.accumulo.core.manager.state.tables.TableState;
 import org.apache.accumulo.core.manager.thrift.ManagerGoalState;
 import org.apache.accumulo.core.manager.thrift.ManagerState;
 import org.apache.accumulo.core.manager.thrift.TabletServerStatus;
-import org.apache.accumulo.core.metadata.ReferencedTabletFile;
 import org.apache.accumulo.core.metadata.TServerInstance;
 import org.apache.accumulo.core.metadata.TabletState;
 import org.apache.accumulo.core.metadata.schema.Ample;
@@ -742,8 +740,13 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
           // replacement. We only want to stop looking for tablets that need 
volume replacement when
           // we have successfully processed all tablet metadata and no more 
volume replacements are
           // being performed.
+          Manager.log.debug("[{}] saw {} tablets needing volume replacement", 
store.name(),
+              tabletMgmtStats.totalVolumeReplacements);
           lookForTabletsNeedingVolReplacement = 
tabletMgmtStats.totalVolumeReplacements != 0
               || tabletMgmtStats.tabletsWithErrors != 0;
+          if (!lookForTabletsNeedingVolReplacement) {
+            Manager.log.debug("[{}] no longer looking for volume 
replacements", store.name());
+          }
         }
 
         // provide stats after flushing changes to avoid race conditions w/ 
delete table
@@ -1048,9 +1051,19 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
         vr.filesToRemove.forEach(tabletMutator::deleteFile);
         vr.filesToAdd.forEach(tabletMutator::putFile);
 
-        tabletMutator.submit(
-            tm -> tm.getLogs().containsAll(vr.logsToAdd) && 
tm.getFiles().containsAll(vr.filesToAdd
-                
.keySet().stream().map(ReferencedTabletFile::insert).collect(Collectors.toSet())));
+        tabletMutator.submit(tm -> {
+          // Check to see if the logs and files are removed. Checking if the 
new files or logs were
+          // added has a race condition, those could have been successfully 
added and then removed
+          // before this check runs, like if a compaction runs. Once the old 
volumes are removed
+          // nothing should ever add them again.
+          var logsRemoved =
+              Collections.disjoint(Set.copyOf(tm.getLogs()), 
Set.copyOf(vr.logsToRemove));
+          var filesRemoved = Collections.disjoint(tm.getFiles(), 
Set.copyOf(vr.filesToRemove));
+          LOG.debug(
+              "replaceVolume conditional mutation rejection check {} 
logsRemoved:{} filesRemoved:{}",
+              tm.getExtent(), logsRemoved, filesRemoved);
+          return logsRemoved && filesRemoved;
+        });
       }
 
       tabletsMutator.process().forEach((extent, result) -> {
diff --git 
a/test/src/main/java/org/apache/accumulo/test/VolumeFlakyAmpleIT.java 
b/test/src/main/java/org/apache/accumulo/test/VolumeFlakyAmpleIT.java
new file mode 100644
index 0000000000..1262c00fa0
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/VolumeFlakyAmpleIT.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test;
+
+import java.util.TreeSet;
+
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.ample.FlakyAmpleManager;
+import org.apache.accumulo.test.ample.FlakyAmpleTserver;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+
+public class VolumeFlakyAmpleIT extends VolumeITBase {
+  @Override
+  public void configure(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
+    super.configure(cfg, hadoopCoreSite);
+    cfg.setServerClass(ServerType.MANAGER, FlakyAmpleManager.class);
+    cfg.setServerClass(ServerType.TABLET_SERVER, FlakyAmpleTserver.class);
+    // The test creates a lots of tablet that need to compact. Reserving and 
commiting compactions
+    // is slower because of FlakyAmple causing conditional mutations to fail. 
So start more
+    // compactors to compensate for this.
+    cfg.getClusterServerConfiguration().setNumDefaultCompactors(3);
+  }
+
+  @Override
+  protected TreeSet<Text> generateSplits() {
+    // The regular version of this test creates 100 tablets. However 100 
tablets and FlakyAmple
+    // causing each tablet operation take longer results in longer test runs 
times. So lower the
+    // number of tablets to 10 to speed up the test with flaky ample.
+    TreeSet<Text> splits = new TreeSet<>();
+    for (int i = 10; i < 100; i += 10) {
+      splits.add(new Text(String.format("%06d", i * 100)));
+    }
+    return splits;
+  }
+}
diff --git a/test/src/main/java/org/apache/accumulo/test/VolumeIT.java 
b/test/src/main/java/org/apache/accumulo/test/VolumeIT.java
index 25bea4e892..e6336d6d35 100644
--- a/test/src/main/java/org/apache/accumulo/test/VolumeIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/VolumeIT.java
@@ -18,16 +18,9 @@
  */
 package org.apache.accumulo.test;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
 
-import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -36,94 +29,34 @@ import java.util.List;
 import java.util.Map.Entry;
 import java.util.SortedSet;
 import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Consumer;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableExistsException;
-import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.admin.CompactionConfig;
 import org.apache.accumulo.core.client.admin.DiskUsage;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.clientImpl.ClientContext;
-import org.apache.accumulo.core.conf.ClientProperty;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.InstanceId;
 import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.metadata.AccumuloTable;
 import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.metadata.StoredTabletFile;
-import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
 import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.security.TablePermission;
-import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.init.Initialize;
-import org.apache.accumulo.server.log.WalStateManager;
-import org.apache.accumulo.server.log.WalStateManager.WalMarkerException;
-import org.apache.accumulo.server.log.WalStateManager.WalState;
-import org.apache.accumulo.server.security.SystemCredentials;
 import org.apache.accumulo.server.util.Admin;
-import org.apache.accumulo.test.functional.ConfigurableMacBase;
-import org.apache.accumulo.test.util.FileMetadataUtil;
-import org.apache.commons.configuration2.PropertiesConfiguration;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.io.Text;
-import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.junit.jupiter.api.Test;
 
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-
-public class VolumeIT extends ConfigurableMacBase {
-
-  private File volDirBase;
-  private Path v1, v2, v3;
-  private List<String> expected = new ArrayList<>();
-
-  @Override
-  public void configure(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
-    File baseDir = cfg.getDir();
-    volDirBase = new File(baseDir, "volumes");
-    File v1f = new File(volDirBase, "v1");
-    File v2f = new File(volDirBase, "v2");
-    v1 = new Path("file://" + v1f.getAbsolutePath());
-    v2 = new Path("file://" + v2f.getAbsolutePath());
-    File v3f = new File(volDirBase, "v3");
-    v3 = new Path("file://" + v3f.getAbsolutePath());
-    // setup expected rows
-    for (int i = 0; i < 100; i++) {
-      String row = String.format("%06d", i * 100 + 3);
-      expected.add(row + ":cf1:cq1:1");
-    }
-
-    // Run MAC on two locations in the local file system
-    cfg.setProperty(Property.INSTANCE_VOLUMES, v1 + "," + v2);
-    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
-    cfg.setClientProperty(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT.getKey(), 
"15s");
-
-    // use raw local file system so walogs sync and flush will work
-    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
-
-    super.configure(cfg, hadoopCoreSite);
-  }
+public class VolumeIT extends VolumeITBase {
 
   @Test
   public void test() throws Exception {
@@ -174,23 +107,6 @@ public class VolumeIT extends ConfigurableMacBase {
     }
   }
 
-  private void verifyData(List<String> expected, Scanner createScanner) {
-
-    List<String> actual = new ArrayList<>();
-
-    for (Entry<Key,Value> entry : createScanner) {
-      Key k = entry.getKey();
-      actual.add(k.getRow() + ":" + k.getColumnFamily() + ":" + 
k.getColumnQualifier() + ":"
-          + entry.getValue());
-    }
-
-    Collections.sort(expected);
-    Collections.sort(actual);
-
-    createScanner.close();
-    assertEquals(expected, actual);
-  }
-
   @Test
   public void testAddVolumes() throws Exception {
     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProperties()).build()) {
@@ -263,111 +179,6 @@ public class VolumeIT extends ConfigurableMacBase {
     }
   }
 
-  private void writeData(String tableName, AccumuloClient client) throws 
AccumuloException,
-      AccumuloSecurityException, TableExistsException, TableNotFoundException {
-
-    TreeSet<Text> splits = new TreeSet<>();
-    for (int i = 1; i < 100; i++) {
-      splits.add(new Text(String.format("%06d", i * 100)));
-    }
-
-    NewTableConfiguration ntc = new NewTableConfiguration().withSplits(splits);
-    client.tableOperations().create(tableName, ntc);
-
-    try (BatchWriter bw = client.createBatchWriter(tableName)) {
-      for (int i = 0; i < 100; i++) {
-        String row = String.format("%06d", i * 100 + 3);
-        Mutation m = new Mutation(row);
-        m.put("cf1", "cq1", "1");
-        bw.addMutation(m);
-      }
-    }
-  }
-
-  private void verifyVolumesUsed(AccumuloClient client, String tableName, 
boolean shouldExist,
-      Path... paths) throws Exception {
-    verifyVolumesUsed(client, tableName, shouldExist, false, paths);
-  }
-
-  private void verifyVolumesUsed(AccumuloClient client, String tableName, 
boolean shouldExist,
-      boolean rangedFiles, Path... paths) throws Exception {
-
-    if (!client.tableOperations().exists(tableName)) {
-      assertFalse(shouldExist);
-
-      writeData(tableName, client);
-
-      verifyData(expected, client.createScanner(tableName, 
Authorizations.EMPTY));
-
-      client.tableOperations().flush(tableName, null, null, true);
-    }
-
-    verifyData(expected, client.createScanner(tableName, 
Authorizations.EMPTY));
-
-    TableId tableId = 
TableId.of(client.tableOperations().tableIdMap().get(tableName));
-    try (Scanner metaScanner =
-        client.createScanner(AccumuloTable.METADATA.tableName(), 
Authorizations.EMPTY)) {
-      metaScanner.fetchColumnFamily(DataFileColumnFamily.NAME);
-      metaScanner.setRange(new KeyExtent(tableId, null, null).toMetaRange());
-
-      int[] counts = new int[paths.length];
-
-      outer: for (Entry<Key,Value> entry : metaScanner) {
-        String path = 
StoredTabletFile.of(entry.getKey().getColumnQualifier()).getMetadataPath();
-
-        for (int i = 0; i < paths.length; i++) {
-          if (path.contains(paths[i].toString())) {
-            counts[i]++;
-            continue outer;
-          }
-        }
-
-        fail("Unexpected volume " + path);
-      }
-
-      // keep retrying until WAL state information in ZooKeeper stabilizes or 
until test times out
-      retry: while (true) {
-        WalStateManager wals = new WalStateManager(getServerContext());
-        try {
-          outer: for (Entry<Path,WalState> entry : 
wals.getAllState().entrySet()) {
-            for (Path path : paths) {
-              if (entry.getKey().toString().startsWith(path.toString())) {
-                continue outer;
-              }
-            }
-            log.warn("Unexpected volume " + entry.getKey() + " (" + 
entry.getValue() + ")");
-            UtilWaitThread.sleep(100);
-            continue retry;
-          }
-        } catch (WalMarkerException e) {
-          Throwable cause = e.getCause();
-          if (cause instanceof NoNodeException) {
-            // ignore WALs being cleaned up
-            continue retry;
-          }
-          throw e;
-        }
-        break;
-      }
-
-      // if a volume is chosen randomly for each tablet, then the probability 
that a volume will not
-      // be chosen for any tablet is ((num_volumes -
-      // 1)/num_volumes)^num_tablets. For 100 tablets and 3 volumes the 
probability that only 2
-      // volumes would be chosen is 2.46e-18
-
-      int sum = 0;
-      for (int count : counts) {
-        assertTrue(count > 0);
-        sum += count;
-      }
-
-      // When ranged files exist we there should be twice as many
-      // as the test split each file into 2
-      int expectedCount = rangedFiles ? 200 : 100;
-      assertEquals(expectedCount, sum);
-    }
-  }
-
   @Test
   public void testRemoveVolumes() throws Exception {
     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProperties()).build()) {
@@ -411,93 +222,6 @@ public class VolumeIT extends ConfigurableMacBase {
     }
   }
 
-  private void testReplaceVolume(AccumuloClient client, boolean cleanShutdown, 
boolean rangedFiles)
-      throws Exception {
-    String[] tableNames = getUniqueNames(3);
-
-    verifyVolumesUsed(client, tableNames[0], false, v1, v2);
-
-    // write to 2nd table, but do not flush data to disk before shutdown
-    try (AccumuloClient c2 =
-        cluster.createAccumuloClient("root", new 
PasswordToken(ROOT_PASSWORD))) {
-      writeData(tableNames[1], c2);
-    }
-
-    // If flag is true then for each file split and create two files
-    // to verify volume replacement works on files with ranges
-    if (rangedFiles) {
-      splitFilesWithRange(client, tableNames[0]);
-      splitFilesWithRange(client, tableNames[1]);
-    }
-
-    if (cleanShutdown) {
-      assertEquals(0, cluster.exec(Admin.class, 
"stopAll").getProcess().waitFor());
-    }
-
-    cluster.stop();
-
-    File v1f = new File(v1.toUri());
-    File v8f = new File(new File(v1.getParent().toUri()), "v8");
-    assertTrue(v1f.renameTo(v8f), "Failed to rename " + v1f + " to " + v8f);
-    Path v8 = new Path(v8f.toURI());
-
-    File v2f = new File(v2.toUri());
-    File v9f = new File(new File(v2.getParent().toUri()), "v9");
-    assertTrue(v2f.renameTo(v9f), "Failed to rename " + v2f + " to " + v9f);
-    Path v9 = new Path(v9f.toURI());
-
-    updateConfig(config -> {
-      config.setProperty(Property.INSTANCE_VOLUMES.getKey(), v8 + "," + v9);
-      config.setProperty(Property.INSTANCE_VOLUMES_REPLACEMENTS.getKey(),
-          v1 + " " + v8 + "," + v2 + " " + v9);
-    });
-
-    // start cluster and verify that volumes were replaced
-    cluster.start();
-
-    verifyVolumesUsed(client, tableNames[0], true, rangedFiles, v8, v9);
-    verifyVolumesUsed(client, tableNames[1], true, rangedFiles, v8, v9);
-
-    // verify writes to new dir
-    client.tableOperations().compact(tableNames[0], null, null, true, true);
-    client.tableOperations().compact(tableNames[1], null, null, true, true);
-
-    // Always pass false for ranged files as compaction will clean them up if 
exist
-    verifyVolumesUsed(client, tableNames[0], true, false, v8, v9);
-    verifyVolumesUsed(client, tableNames[1], true, false, v8, v9);
-
-    client.tableOperations().compact(AccumuloTable.ROOT.tableName(),
-        new CompactionConfig().setWait(true));
-
-    // check that root tablet is not on volume 1 or 2
-    int count = 0;
-    for (StoredTabletFile file : ((ClientContext) 
client).getAmple().readTablet(RootTable.EXTENT)
-        .getFiles()) {
-      assertTrue(file.getMetadataPath().startsWith(v8.toString())
-          || file.getMetadataPath().startsWith(v9.toString()));
-      count++;
-    }
-
-    assertTrue(count > 0);
-
-    client.tableOperations().clone(tableNames[1], tableNames[2], true, new 
HashMap<>(),
-        new HashSet<>());
-
-    client.tableOperations().flush(AccumuloTable.METADATA.tableName(), null, 
null, true);
-    client.tableOperations().flush(AccumuloTable.ROOT.tableName(), null, null, 
true);
-
-    verifyVolumesUsed(client, tableNames[0], true, v8, v9);
-    verifyVolumesUsed(client, tableNames[1], true, v8, v9);
-    verifyVolumesUsed(client, tableNames[2], true, v8, v9);
-  }
-
-  @Test
-  public void testCleanReplaceVolumes() throws Exception {
-    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProperties()).build()) {
-      testReplaceVolume(client, true, false);
-    }
-  }
-
   @Test
   public void testDirtyReplaceVolumes() throws Exception {
     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProperties()).build()) {
@@ -519,55 +243,4 @@ public class VolumeIT extends ConfigurableMacBase {
     }
   }
 
-  @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "paths 
provided by test")
-  private void updateConfig(Consumer<PropertiesConfiguration> updater) throws 
Exception {
-    var file = new File(cluster.getAccumuloPropertiesPath());
-    var config = new PropertiesConfiguration();
-    try (FileReader out = new FileReader(file, UTF_8)) {
-      config.read(out);
-    }
-    updater.accept(config);
-    try (FileWriter out = new FileWriter(file, UTF_8)) {
-      config.write(out);
-    }
-  }
-
-  // Go through each tablet file in metadata and split the files into two files
-  // by adding two new entries that covers half of the file. This will test 
that
-  // files with ranges work properly with volume replacement
-  private void splitFilesWithRange(AccumuloClient client, String tableName) 
throws Exception {
-    
client.securityOperations().grantTablePermission(cluster.getConfig().getRootUserName(),
-        AccumuloTable.METADATA.tableName(), TablePermission.WRITE);
-    final ServerContext ctx = getServerContext();
-    ctx.setCredentials(new 
SystemCredentials(client.instanceOperations().getInstanceId(), "root",
-        new PasswordToken(ROOT_PASSWORD)));
-
-    AtomicInteger i = new AtomicInteger();
-    FileMetadataUtil.mutateTabletFiles(ctx, tableName, null, null, (tm, 
mutator, file, value) -> {
-      i.incrementAndGet();
-
-      // Create a mutation to delete the existing file metadata entry with 
infinite range
-      mutator.deleteFile(file);
-
-      // Find the midpoint and create two new files, each with a range 
covering half the file
-      Text tabletMidPoint = getTabletMidPoint(tm.getExtent().endRow());
-      // Handle edge case for last tablet
-      if (tabletMidPoint == null) {
-        tabletMidPoint = new Text(
-            String.format("%06d", 
Integer.parseInt(tm.getExtent().prevEndRow().toString()) + 50));
-      }
-
-      final DataFileValue newValue = new DataFileValue(Integer.max(1, (int) 
(value.getSize() / 2)),
-          Integer.max(1, (int) (value.getNumEntries() / 2)));
-      mutator.putFile(StoredTabletFile.of(file.getPath(),
-          new Range(tm.getExtent().prevEndRow(), false, tabletMidPoint, 
true)), newValue);
-      mutator.putFile(StoredTabletFile.of(file.getPath(),
-          new Range(tabletMidPoint, false, tm.getExtent().endRow(), true)), 
newValue);
-    });
-  }
-
-  private static Text getTabletMidPoint(Text row) {
-    return row != null ? new Text(String.format("%06d", 
Integer.parseInt(row.toString()) - 50))
-        : null;
-  }
 }
diff --git a/test/src/main/java/org/apache/accumulo/test/VolumeIT.java 
b/test/src/main/java/org/apache/accumulo/test/VolumeITBase.java
similarity index 61%
copy from test/src/main/java/org/apache/accumulo/test/VolumeIT.java
copy to test/src/main/java/org/apache/accumulo/test/VolumeITBase.java
index 25bea4e892..2622824151 100644
--- a/test/src/main/java/org/apache/accumulo/test/VolumeIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/VolumeITBase.java
@@ -28,18 +28,15 @@ import java.io.File;
 import java.io.FileReader;
 import java.io.FileWriter;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map.Entry;
-import java.util.SortedSet;
+import java.util.Map;
 import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 
-import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.AccumuloException;
@@ -49,13 +46,11 @@ import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.admin.CompactionConfig;
-import org.apache.accumulo.core.client.admin.DiskUsage;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.clientImpl.ClientContext;
 import org.apache.accumulo.core.conf.ClientProperty;
 import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.InstanceId;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
@@ -66,42 +61,42 @@ import org.apache.accumulo.core.metadata.AccumuloTable;
 import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.metadata.StoredTabletFile;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
-import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.server.ServerContext;
-import org.apache.accumulo.server.init.Initialize;
 import org.apache.accumulo.server.log.WalStateManager;
-import org.apache.accumulo.server.log.WalStateManager.WalMarkerException;
-import org.apache.accumulo.server.log.WalStateManager.WalState;
 import org.apache.accumulo.server.security.SystemCredentials;
 import org.apache.accumulo.server.util.Admin;
 import org.apache.accumulo.test.functional.ConfigurableMacBase;
 import org.apache.accumulo.test.util.FileMetadataUtil;
 import org.apache.commons.configuration2.PropertiesConfiguration;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.io.Text;
-import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException;
 import org.junit.jupiter.api.Test;
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 
-public class VolumeIT extends ConfigurableMacBase {
+public abstract class VolumeITBase extends ConfigurableMacBase {
+  protected Path v1;
+  protected Path v2;
+  protected Path v3;
+  protected List<String> expected = new ArrayList<>();
 
-  private File volDirBase;
-  private Path v1, v2, v3;
-  private List<String> expected = new ArrayList<>();
+  private static Text getTabletMidPoint(Text row) {
+    return row != null ? new Text(String.format("%06d", 
Integer.parseInt(row.toString()) - 50))
+        : null;
+  }
 
   @Override
   public void configure(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
     File baseDir = cfg.getDir();
-    volDirBase = new File(baseDir, "volumes");
+    File volDirBase = new File(baseDir, "volumes");
     File v1f = new File(volDirBase, "v1");
     File v2f = new File(volDirBase, "v2");
     v1 = new Path("file://" + v1f.getAbsolutePath());
@@ -125,60 +120,11 @@ public class VolumeIT extends ConfigurableMacBase {
     super.configure(cfg, hadoopCoreSite);
   }
 
-  @Test
-  public void test() throws Exception {
-    // create a table
-    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProperties()).build()) {
-      String tableName = getUniqueNames(1)[0];
-      // create set of splits
-      SortedSet<Text> partitions = new TreeSet<>();
-      for (String s : "d,m,t".split(",")) {
-        partitions.add(new Text(s));
-      }
-      // create table with splits
-      NewTableConfiguration ntc = new 
NewTableConfiguration().withSplits(partitions);
-      client.tableOperations().create(tableName, ntc);
-      // scribble over the splits
-      VolumeChooserIT.writeDataToTable(client, tableName, 
VolumeChooserIT.alpha_rows);
-      // write the data to disk, read it back
-      client.tableOperations().flush(tableName, null, null, true);
-      try (Scanner scanner = client.createScanner(tableName, 
Authorizations.EMPTY)) {
-        int i = 0;
-        for (Entry<Key,Value> entry : scanner) {
-          assertEquals(VolumeChooserIT.alpha_rows[i++], 
entry.getKey().getRow().toString());
-        }
-      }
-      // verify the new files are written to the different volumes
-      try (Scanner scanner =
-          client.createScanner(AccumuloTable.METADATA.tableName(), 
Authorizations.EMPTY)) {
-        scanner.setRange(new Range("1", "1<"));
-        scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
-        int fileCount = 0;
-
-        for (Entry<Key,Value> entry : scanner) {
-          boolean inV1 = 
StoredTabletFile.of(entry.getKey().getColumnQualifier()).getMetadataPath()
-              .contains(v1.toString());
-          boolean inV2 = 
StoredTabletFile.of(entry.getKey().getColumnQualifier()).getMetadataPath()
-              .contains(v2.toString());
-          assertTrue(inV1 || inV2);
-          fileCount++;
-        }
-        assertEquals(4, fileCount);
-        List<DiskUsage> diskUsage =
-            
client.tableOperations().getDiskUsage(Collections.singleton(tableName));
-        assertEquals(1, diskUsage.size());
-        long usage = diskUsage.get(0).getUsage();
-        log.debug("usage {}", usage);
-        assertTrue(usage > 700 && usage < 900);
-      }
-    }
-  }
-
-  private void verifyData(List<String> expected, Scanner createScanner) {
+  protected void verifyData(List<String> expected, Scanner createScanner) {
 
     List<String> actual = new ArrayList<>();
 
-    for (Entry<Key,Value> entry : createScanner) {
+    for (Map.Entry<Key,Value> entry : createScanner) {
       Key k = entry.getKey();
       actual.add(k.getRow() + ":" + k.getColumnFamily() + ":" + 
k.getColumnQualifier() + ":"
           + entry.getValue());
@@ -191,85 +137,18 @@ public class VolumeIT extends ConfigurableMacBase {
     assertEquals(expected, actual);
   }
 
-  @Test
-  public void testAddVolumes() throws Exception {
-    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProperties()).build()) {
-      String[] tableNames = getUniqueNames(2);
-
-      InstanceId uuid = verifyAndShutdownCluster(client, tableNames[0]);
-
-      updateConfig(config -> 
config.setProperty(Property.INSTANCE_VOLUMES.getKey(),
-          v1 + "," + v2 + "," + v3));
-
-      // initialize volume
-      assertEquals(0, cluster.exec(Initialize.class, 
"--add-volumes").getProcess().waitFor());
-
-      checkVolumesInitialized(Arrays.asList(v1, v2, v3), uuid);
-
-      // start cluster and verify that new volume is used
-      cluster.start();
-
-      verifyVolumesUsed(client, tableNames[1], false, v1, v2, v3);
-    }
-  }
-
-  // grab uuid before shutting down cluster
-  private InstanceId verifyAndShutdownCluster(AccumuloClient c, String 
tableName) throws Exception {
-    InstanceId uuid = c.instanceOperations().getInstanceId();
-
-    verifyVolumesUsed(c, tableName, false, v1, v2);
-
-    assertEquals(0, cluster.exec(Admin.class, 
"stopAll").getProcess().waitFor());
-    cluster.stop();
-
-    return uuid;
-  }
-
-  @Test
-  public void testNonConfiguredVolumes() throws Exception {
-
-    String[] tableNames = getUniqueNames(2);
-
-    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProperties()).build()) {
-
-      InstanceId uuid = verifyAndShutdownCluster(client, tableNames[0]);
-
-      updateConfig(config -> 
config.setProperty(Property.INSTANCE_VOLUMES.getKey(), v2 + "," + v3));
-
-      // initialize volume
-      assertEquals(0, cluster.exec(Initialize.class, 
"--add-volumes").getProcess().waitFor());
-
-      checkVolumesInitialized(Arrays.asList(v1, v2, v3), uuid);
-
-      // start cluster and verify that new volume is used
-      cluster.start();
-
-      // verify we can still read the tables (tableNames[0] is likely to have 
a file still on v1)
-      verifyData(expected, client.createScanner(tableNames[0], 
Authorizations.EMPTY));
-
-      // v1 should not have any data for tableNames[1]
-      verifyVolumesUsed(client, tableNames[1], false, v2, v3);
-    }
-  }
-
-  // check that all volumes are initialized
-  private void checkVolumesInitialized(List<Path> volumes, InstanceId uuid) 
throws Exception {
-    for (Path volumePath : volumes) {
-      FileSystem fs = 
volumePath.getFileSystem(cluster.getServerContext().getHadoopConf());
-      Path vp = new Path(volumePath, Constants.INSTANCE_ID_DIR);
-      FileStatus[] iids = fs.listStatus(vp);
-      assertEquals(1, iids.length);
-      assertEquals(uuid.canonical(), iids[0].getPath().getName());
+  protected TreeSet<Text> generateSplits() {
+    TreeSet<Text> splits = new TreeSet<>();
+    for (int i = 1; i < 100; i++) {
+      splits.add(new Text(String.format("%06d", i * 100)));
     }
+    return splits;
   }
 
   private void writeData(String tableName, AccumuloClient client) throws 
AccumuloException,
       AccumuloSecurityException, TableExistsException, TableNotFoundException {
 
-    TreeSet<Text> splits = new TreeSet<>();
-    for (int i = 1; i < 100; i++) {
-      splits.add(new Text(String.format("%06d", i * 100)));
-    }
+    TreeSet<Text> splits = generateSplits();
 
     NewTableConfiguration ntc = new NewTableConfiguration().withSplits(splits);
     client.tableOperations().create(tableName, ntc);
@@ -284,7 +163,7 @@ public class VolumeIT extends ConfigurableMacBase {
     }
   }
 
-  private void verifyVolumesUsed(AccumuloClient client, String tableName, 
boolean shouldExist,
+  protected void verifyVolumesUsed(AccumuloClient client, String tableName, 
boolean shouldExist,
       Path... paths) throws Exception {
     verifyVolumesUsed(client, tableName, shouldExist, false, paths);
   }
@@ -307,12 +186,12 @@ public class VolumeIT extends ConfigurableMacBase {
     TableId tableId = 
TableId.of(client.tableOperations().tableIdMap().get(tableName));
     try (Scanner metaScanner =
         client.createScanner(AccumuloTable.METADATA.tableName(), 
Authorizations.EMPTY)) {
-      metaScanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+      
metaScanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
       metaScanner.setRange(new KeyExtent(tableId, null, null).toMetaRange());
 
       int[] counts = new int[paths.length];
 
-      outer: for (Entry<Key,Value> entry : metaScanner) {
+      outer: for (Map.Entry<Key,Value> entry : metaScanner) {
         String path = 
StoredTabletFile.of(entry.getKey().getColumnQualifier()).getMetadataPath();
 
         for (int i = 0; i < paths.length; i++) {
@@ -329,7 +208,8 @@ public class VolumeIT extends ConfigurableMacBase {
       retry: while (true) {
         WalStateManager wals = new WalStateManager(getServerContext());
         try {
-          outer: for (Entry<Path,WalState> entry : 
wals.getAllState().entrySet()) {
+          outer: for (Map.Entry<Path,WalStateManager.WalState> entry : 
wals.getAllState()
+              .entrySet()) {
             for (Path path : paths) {
               if (entry.getKey().toString().startsWith(path.toString())) {
                 continue outer;
@@ -339,9 +219,9 @@ public class VolumeIT extends ConfigurableMacBase {
             UtilWaitThread.sleep(100);
             continue retry;
           }
-        } catch (WalMarkerException e) {
+        } catch (WalStateManager.WalMarkerException e) {
           Throwable cause = e.getCause();
-          if (cause instanceof NoNodeException) {
+          if (cause instanceof KeeperException.NoNodeException) {
             // ignore WALs being cleaned up
             continue retry;
           }
@@ -363,56 +243,14 @@ public class VolumeIT extends ConfigurableMacBase {
 
       // When ranged files exist we there should be twice as many
       // as the test split each file into 2
-      int expectedCount = rangedFiles ? 200 : 100;
+      int numTablets = generateSplits().size() + 1;
+      int expectedCount = rangedFiles ? numTablets * 2 : numTablets;
       assertEquals(expectedCount, sum);
     }
   }
 
-  @Test
-  public void testRemoveVolumes() throws Exception {
-    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProperties()).build()) {
-      String[] tableNames = getUniqueNames(2);
-
-      verifyVolumesUsed(client, tableNames[0], false, v1, v2);
-
-      assertEquals(0, cluster.exec(Admin.class, 
"stopAll").getProcess().waitFor());
-      cluster.stop();
-
-      updateConfig(config -> 
config.setProperty(Property.INSTANCE_VOLUMES.getKey(), v2.toString()));
-
-      // start cluster and verify that volume was decommissioned
-      cluster.start();
-
-      client.tableOperations().compact(tableNames[0], null, null, true, true);
-
-      verifyVolumesUsed(client, tableNames[0], true, v2);
-
-      client.tableOperations().compact(AccumuloTable.ROOT.tableName(),
-          new CompactionConfig().setWait(true));
-
-      // check that root tablet is not on volume 1
-      int count = 0;
-      for (StoredTabletFile file : ((ClientContext) 
client).getAmple().readTablet(RootTable.EXTENT)
-          .getFiles()) {
-        assertTrue(file.getMetadataPath().startsWith(v2.toString()));
-        count++;
-      }
-
-      assertTrue(count > 0);
-
-      client.tableOperations().clone(tableNames[0], tableNames[1], true, new 
HashMap<>(),
-          new HashSet<>());
-
-      client.tableOperations().flush(AccumuloTable.METADATA.tableName(), null, 
null, true);
-      client.tableOperations().flush(AccumuloTable.ROOT.tableName(), null, 
null, true);
-
-      verifyVolumesUsed(client, tableNames[0], true, v2);
-      verifyVolumesUsed(client, tableNames[1], true, v2);
-    }
-  }
-
-  private void testReplaceVolume(AccumuloClient client, boolean cleanShutdown, 
boolean rangedFiles)
-      throws Exception {
+  protected void testReplaceVolume(AccumuloClient client, boolean 
cleanShutdown,
+      boolean rangedFiles) throws Exception {
     String[] tableNames = getUniqueNames(3);
 
     verifyVolumesUsed(client, tableNames[0], false, v1, v2);
@@ -498,29 +336,8 @@ public class VolumeIT extends ConfigurableMacBase {
     }
   }
 
-  @Test
-  public void testDirtyReplaceVolumes() throws Exception {
-    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProperties()).build()) {
-      testReplaceVolume(client, false, false);
-    }
-  }
-
-  @Test
-  public void testCleanReplaceVolumesWithRangedFiles() throws Exception {
-    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProperties()).build()) {
-      testReplaceVolume(client, true, true);
-    }
-  }
-
-  @Test
-  public void testDirtyReplaceVolumesWithRangedFiles() throws Exception {
-    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProperties()).build()) {
-      testReplaceVolume(client, false, true);
-    }
-  }
-
   @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "paths 
provided by test")
-  private void updateConfig(Consumer<PropertiesConfiguration> updater) throws 
Exception {
+  protected void updateConfig(Consumer<PropertiesConfiguration> updater) 
throws Exception {
     var file = new File(cluster.getAccumuloPropertiesPath());
     var config = new PropertiesConfiguration();
     try (FileReader out = new FileReader(file, UTF_8)) {
@@ -550,7 +367,7 @@ public class VolumeIT extends ConfigurableMacBase {
       mutator.deleteFile(file);
 
       // Find the midpoint and create two new files, each with a range 
covering half the file
-      Text tabletMidPoint = getTabletMidPoint(tm.getExtent().endRow());
+      Text tabletMidPoint = 
VolumeITBase.getTabletMidPoint(tm.getExtent().endRow());
       // Handle edge case for last tablet
       if (tabletMidPoint == null) {
         tabletMidPoint = new Text(
@@ -565,9 +382,4 @@ public class VolumeIT extends ConfigurableMacBase {
           new Range(tabletMidPoint, false, tm.getExtent().endRow(), true)), 
newValue);
     });
   }
-
-  private static Text getTabletMidPoint(Text row) {
-    return row != null ? new Text(String.format("%06d", 
Integer.parseInt(row.toString()) - 50))
-        : null;
-  }
 }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/ample/FlakyAmpleServerContext.java
 
b/test/src/main/java/org/apache/accumulo/test/ample/FlakyAmpleServerContext.java
index 357be1aaff..209cfbbbb8 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/ample/FlakyAmpleServerContext.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/ample/FlakyAmpleServerContext.java
@@ -19,12 +19,15 @@
 package org.apache.accumulo.test.ample;
 
 import java.util.Map;
+import java.util.function.Supplier;
 
 import org.apache.accumulo.core.conf.SiteConfiguration;
 import org.apache.accumulo.core.metadata.schema.Ample;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.test.ample.metadata.TestAmple;
 
+import com.google.common.base.Suppliers;
+
 /**
  * A goal of this class is to exercise the lambdas passed to
  * {@link 
org.apache.accumulo.core.metadata.schema.Ample.ConditionalTabletMutator#submit(Ample.RejectionHandler)}.
@@ -33,13 +36,22 @@ import org.apache.accumulo.test.ample.metadata.TestAmple;
  */
 public class FlakyAmpleServerContext extends ServerContext {
 
+  private final Supplier<Ample> ampleSupplier;
+
   public FlakyAmpleServerContext(SiteConfiguration siteConfig) {
     super(siteConfig);
+    // Each instance of TestAmple created will create a new Hadoop 
configuration object. These
+    // seemed to hang around and cause OOME and process death. Did not track 
down why they were
+    // hanging around, but decided to avoid creating a new instance of 
TestAmple each time Ample is
+    // requested in order to avoid creating those hadoop config objects.
+    ampleSupplier = Suppliers.memoize(() -> TestAmple.create(
+        this, Map.of(Ample.DataLevel.USER, Ample.DataLevel.USER.metaTable(),
+            Ample.DataLevel.METADATA, Ample.DataLevel.METADATA.metaTable()),
+        FlakyInterceptor::new));
   }
 
   @Override
   public Ample getAmple() {
-    return TestAmple.create(this, Map.of(Ample.DataLevel.USER, 
Ample.DataLevel.USER.metaTable(),
-        Ample.DataLevel.METADATA, Ample.DataLevel.METADATA.metaTable()), 
FlakyInterceptor::new);
+    return ampleSupplier.get();
   }
 }


Reply via email to