This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit bb673da62f321d02e597ab8ea33f57a95c7607ab
Merge: 18e9a3453b 238a5226d3
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Thu Jun 15 12:14:07 2023 +0000

    Merge branch 'main' into elasticity

 .../apache/accumulo/core/metadata/TabletFile.java  |  13 +-
 pom.xml                                            |   3 +-
 .../accumulo/server/conf/util/ZooInfoViewer.java   | 162 +----------
 .../accumulo/server/conf/util/ZooPropEditor.java   | 305 +++++++++++++++++++++
 .../accumulo/server/conf/util/ZooPropUtils.java    | 140 ++++++++++
 .../server/conf/util/ZooInfoViewerTest.java        |  86 +-----
 .../server/conf/util/ZooPropEditorTest.java        |  30 +-
 .../server/conf/util/ZooPropUtilsTest.java         |  68 +++++
 .../accumulo/manager/TabletGroupWatcher.java       |  21 +-
 .../manager/tableOps/bulkVer2/PrepBulkImport.java  |   2 +-
 .../accumulo/monitor/resources/css/screen.css      |   3 -
 .../org/apache/accumulo/tserver/tablet/Tablet.java |  35 ++-
 .../accumulo/start/spi/KeywordExecutable.java      |   5 +-
 .../accumulo/test/conf/util/ZooPropEditorIT.java   | 138 ++++++++++
 .../test/functional/GarbageCollectorTrashBase.java |   2 +-
 ...ageCollectorTrashEnabledWithCustomPolicyIT.java |   2 +-
 .../apache/accumulo/test/functional/SplitIT.java   |  76 +++++
 .../apache/accumulo/test/start/KeywordStartIT.java |  71 +++--
 18 files changed, 853 insertions(+), 309 deletions(-)

diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
index 44fb1ba38f,807387d87e..b5f46e70f2
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
@@@ -170,12 -145,11 +170,11 @@@ abstract class TabletGroupWatcher exten
    private static class TabletLists {
      private final List<Assignment> assignments = new ArrayList<>();
      private final List<Assignment> assigned = new ArrayList<>();
 -    private final List<TabletLocationState> assignedToDeadServers = new 
ArrayList<>();
 -    private final List<TabletLocationState> suspendedToGoneServers = new 
ArrayList<>();
 +    private final List<TabletMetadata> assignedToDeadServers = new 
ArrayList<>();
 +    private final List<TabletMetadata> suspendedToGoneServers = new 
ArrayList<>();
      private final Map<KeyExtent,UnassignedTablet> unassigned = new 
HashMap<>();
      private final Map<TServerInstance,List<Path>> logsForDeadServers = new 
TreeMap<>();
-     // read only lists of tablet servers
-     private final SortedMap<TServerInstance,TabletServerStatus> 
currentTServers;
+     // read only list of tablet servers that are not shutting down
      private final SortedMap<TServerInstance,TabletServerStatus> destinations;
  
      public TabletLists(Manager m, 
SortedMap<TServerInstance,TabletServerStatus> curTServers) {
diff --cc test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
index 8678ff5f49,6ca58a4fe1..b60c5e153c
--- a/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
@@@ -18,34 -18,27 +18,40 @@@
   */
  package org.apache.accumulo.test.functional;
  
+ import static java.util.Collections.singletonMap;
  import static java.util.concurrent.TimeUnit.SECONDS;
  import static org.junit.jupiter.api.Assertions.assertEquals;
 +import static org.junit.jupiter.api.Assertions.assertThrows;
  import static org.junit.jupiter.api.Assertions.assertTrue;
  import static org.junit.jupiter.api.Assumptions.assumeTrue;
  
  import java.time.Duration;
 +import java.util.ArrayList;
+ import java.util.Base64;
  import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.List;
  import java.util.Map;
  import java.util.Map.Entry;
+ import java.util.Random;
 +import java.util.Set;
 +import java.util.TreeSet;
 +import java.util.concurrent.Callable;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.Future;
 +import java.util.stream.Collectors;
  
  import org.apache.accumulo.core.client.Accumulo;
  import org.apache.accumulo.core.client.AccumuloClient;
 +import org.apache.accumulo.core.client.AccumuloException;
  import org.apache.accumulo.core.client.Scanner;
+ import org.apache.accumulo.core.client.admin.CompactionConfig;
  import org.apache.accumulo.core.client.admin.InstanceOperations;
  import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 +import org.apache.accumulo.core.client.admin.TabletHostingGoal;
+ import org.apache.accumulo.core.client.rfile.RFile;
+ import org.apache.accumulo.core.client.rfile.RFileWriter;
  import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
  import org.apache.accumulo.core.conf.Property;
  import org.apache.accumulo.core.data.Key;
@@@ -65,15 -56,15 +71,18 @@@ import org.apache.accumulo.test.TestIng
  import org.apache.accumulo.test.VerifyIngest;
  import org.apache.accumulo.test.VerifyIngest.VerifyParams;
  import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.Text;
  import org.junit.jupiter.api.AfterEach;
  import org.junit.jupiter.api.BeforeEach;
  import org.junit.jupiter.api.Test;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
 +import com.google.common.base.Preconditions;
 +
+ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+ 
  public class SplitIT extends AccumuloClusterHarness {
    private static final Logger log = LoggerFactory.getLogger(SplitIT.class);
  
@@@ -223,121 -213,71 +232,188 @@@
      }
    }
  
 +  @Test
 +  public void testLargeSplit() throws Exception {
 +    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
 +      String tableName = getUniqueNames(1)[0];
 +      c.tableOperations().create(tableName, new NewTableConfiguration()
 +          .setProperties(Map.of(Property.TABLE_MAX_END_ROW_SIZE.getKey(), 
"10K")));
 +
 +      byte[] okSplit = new byte[4096];
 +      for (int i = 0; i < okSplit.length; i++) {
 +        okSplit[i] = (byte) (i % 256);
 +      }
 +
 +      var splits1 = new TreeSet<Text>(List.of(new Text(okSplit)));
 +
 +      c.tableOperations().addSplits(tableName, splits1);
 +
 +      assertEquals(splits1, new 
TreeSet<>(c.tableOperations().listSplits(tableName)));
 +
 +      byte[] bigSplit = new byte[4096 * 4];
 +      for (int i = 0; i < bigSplit.length; i++) {
 +        bigSplit[i] = (byte) (i % 256);
 +      }
 +
 +      var splits2 = new TreeSet<Text>(List.of(new Text(bigSplit)));
 +      // split should fail because it exceeds the configured max split size
 +      assertThrows(AccumuloException.class,
 +          () -> c.tableOperations().addSplits(tableName, splits2));
 +
 +      // ensure the large split is not there
 +      assertEquals(splits1, new 
TreeSet<>(c.tableOperations().listSplits(tableName)));
 +    }
 +  }
 +
 +  @Test
 +  public void concurrentSplit() throws Exception {
 +    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
 +
 +      final String tableName = getUniqueNames(1)[0];
 +
 +      log.debug("Creating table {}", tableName);
 +      c.tableOperations().create(tableName);
 +
 +      final int numRows = 100_000;
 +      log.debug("Ingesting {} rows into {}", numRows, tableName);
 +      VerifyParams params = new VerifyParams(getClientProps(), tableName, 
numRows);
 +      TestIngest.ingest(c, params);
 +
 +      log.debug("Verifying {} rows ingested into {}", numRows, tableName);
 +      VerifyIngest.verifyIngest(c, params);
 +
 +      log.debug("Creating futures that add random splits to the table");
 +      ExecutorService es = Executors.newFixedThreadPool(10);
 +      final int totalFutures = 100;
 +      final int splitsPerFuture = 4;
 +      final Set<Text> totalSplits = new HashSet<>();
 +      List<Callable<Void>> tasks = new ArrayList<>(totalFutures);
 +      for (int i = 0; i < totalFutures; i++) {
 +        final Pair<Integer,Integer> splitBounds = 
getRandomSplitBounds(numRows);
 +        final TreeSet<Text> splits = 
TestIngest.getSplitPoints(splitBounds.getFirst().longValue(),
 +            splitBounds.getSecond().longValue(), splitsPerFuture);
 +        totalSplits.addAll(splits);
 +        tasks.add(() -> {
 +          c.tableOperations().addSplits(tableName, splits);
 +          return null;
 +        });
 +      }
 +
 +      log.debug("Submitting futures");
 +      List<Future<Void>> futures =
 +          tasks.parallelStream().map(es::submit).collect(Collectors.toList());
 +
 +      log.debug("Waiting for futures to complete");
 +      for (Future<?> f : futures) {
 +        f.get();
 +      }
 +      es.shutdown();
 +
 +      log.debug("Checking that {} splits were created ", totalSplits.size());
 +
 +      assertEquals(totalSplits, new 
HashSet<>(c.tableOperations().listSplits(tableName)),
 +          "Did not see expected splits");
 +
 +      // ELASTICITY_TODO the following could be removed after #3309. 
Currently scanning an ondemand
 +      // table with lots of tablets will cause the test to timeout.
 +      c.tableOperations().setTabletHostingGoal(tableName, new Range(), 
TabletHostingGoal.ALWAYS);
 +
 +      log.debug("Verifying {} rows ingested into {}", numRows, tableName);
 +      VerifyIngest.verifyIngest(c, params);
 +    }
 +  }
 +
 +  /**
 +   * Generates a pair of integers that represent the start and end of a range 
of splits. The start
 +   * and end are randomly generated between 0 and upperBound. The start is 
guaranteed to be less
 +   * than the end and the two bounds are guaranteed to be different values.
 +   *
 +   * @param upperBound the upper bound of the range of splits
 +   * @return a pair of integers that represent the start and end of a range 
of splits
 +   */
 +  private Pair<Integer,Integer> getRandomSplitBounds(int upperBound) {
 +    Preconditions.checkArgument(upperBound > 1, "upperBound must be greater 
than 1");
 +
 +    int start = random.nextInt(upperBound);
 +    int end = random.nextInt(upperBound - 1);
 +
 +    // ensure start is less than end and that end is not equal to start
 +    if (end >= start) {
 +      end += 1;
 +    } else {
 +      int tmp = start;
 +      start = end;
 +      end = tmp;
 +    }
 +
 +    return new Pair<>(start, end);
 +  }
 +
+   private String getDir() throws Exception {
+     var rootPath = getCluster().getTemporaryPath().toString();
+     String dir = rootPath + "/" + getUniqueNames(1)[0];
+     getCluster().getFileSystem().delete(new Path(dir), true);
+     return dir;
+   }
+ 
+   @SuppressFBWarnings(value = {"PREDICTABLE_RANDOM", 
"DMI_RANDOM_USED_ONLY_ONCE"},
+       justification = "predictable random with specific seed is intended for 
this test")
+   @Test
+   public void bulkImportThatCantSplitHangsCompaction() throws Exception {
+ 
+     /*
+      * There was a bug where a bulk import into a tablet with the following 
conditions would cause
+      * compactions to hang.
+      *
+      * 1. Tablet where the files sizes indicates its needs to split
+      *
+      * 2. Row with many columns in the tablet that is unsplittable
+      *
+      * This happened because the bulk import plus an attempted split would 
leave the tablet in a bad
+      * internal state for compactions.
+      */
+ 
+     try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+       String tableName = getUniqueNames(1)[0];
+ 
+       c.tableOperations().create(tableName, new NewTableConfiguration()
+           
.setProperties(singletonMap(Property.TABLE_SPLIT_THRESHOLD.getKey(), "10K")));
+ 
+       Random random = new Random();
+       byte[] val = new byte[100];
+ 
+       String dir = getDir();
+       String file = dir + "/f1.rf";
+ 
+       // create a file with a single row and lots of columns. The files size 
will exceed the split
+       // threshold configured above.
+       try (
+           RFileWriter writer = 
RFile.newWriter().to(file).withFileSystem(getFileSystem()).build()) {
+         writer.startDefaultLocalityGroup();
+         for (int i = 0; i < 1000; i++) {
+           random.nextBytes(val);
+           writer.append(new Key("r1", "f1", String.format("%09d", i)),
+               new Value(Base64.getEncoder().encodeToString(val)));
+         }
+       }
+ 
+       // import the file
+       c.tableOperations().importDirectory(dir).to(tableName).load();
+ 
+       // tablet should not be able to split
+       assertEquals(0, c.tableOperations().listSplits(tableName).size());
+ 
+       Thread.sleep(1000);
+ 
+       c.tableOperations().compact(tableName, new 
CompactionConfig().setWait(true));
+ 
+       // should have over 100K of data in the values
+       assertTrue(
+           c.createScanner(tableName).stream().mapToLong(entry -> 
entry.getValue().getSize()).sum()
+               > 100_000);
+ 
+       // should have 1000 entries
+       assertEquals(1000, c.createScanner(tableName).stream().count());
+     }
+   }
  }
diff --cc test/src/main/java/org/apache/accumulo/test/start/KeywordStartIT.java
index 1c8771fa52,bab15197eb..01298502d4
--- a/test/src/main/java/org/apache/accumulo/test/start/KeywordStartIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/start/KeywordStartIT.java
@@@ -117,7 -129,9 +128,8 @@@ public class KeywordStartIT 
      expectSet.put("admin", Admin.class);
      expectSet.put("check-compaction-config", CheckCompactionConfig.class);
      expectSet.put("check-server-config", CheckServerConfig.class);
 -    expectSet.put("compaction-coordinator", CoordinatorExecutable.class);
      expectSet.put("compactor", CompactorExecutable.class);
+     expectSet.put("create-empty", CreateEmpty.class);
      expectSet.put("create-token", CreateToken.class);
      expectSet.put("dump-zoo", DumpZookeeper.class);
      expectSet.put("ec-admin", ECAdmin.class);

Reply via email to