This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 8d17a99733 Improve BulkSplitOptimizationIT (#5179)
8d17a99733 is described below

commit 8d17a99733ad5070bf8f733ad4c8775280ce8c2e
Author: Dom G. <domgargu...@apache.org>
AuthorDate: Fri Jan 17 12:16:51 2025 -0500

    Improve BulkSplitOptimizationIT (#5179)
    
    * Improve BulkSplitOptimizationIT
    * Add timing to test logging
---
 .../test/functional/BulkSplitOptimizationIT.java   | 94 ++++++++++++++++------
 1 file changed, 71 insertions(+), 23 deletions(-)

diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java
index de2ed8475f..bd29f6e10e 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java
@@ -18,49 +18,81 @@
  */
 package org.apache.accumulo.test.functional;
 
-import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
 import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.admin.TabletAvailability;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.util.Timer;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.test.VerifyIngest;
 import org.apache.accumulo.test.VerifyIngest.VerifyParams;
+import org.apache.accumulo.test.util.Wait;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This test verifies that when a lot of files are bulk imported into a table 
with one tablet and
  * then splits that not all data files go to the children tablets.
  */
 public class BulkSplitOptimizationIT extends AccumuloClusterHarness {
+  private static final Logger log = 
LoggerFactory.getLogger(BulkSplitOptimizationIT.class);
+
+  Path testDir;
 
   @Override
   protected Duration defaultTimeout() {
-    return Duration.ofMinutes(2);
+    return Duration.ofMinutes(5);
   }
 
   @BeforeEach
   public void alterConfig() throws Exception {
     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+      final int initialTserverCount =
+          
client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size();
+      log.info("Tserver count: {}", initialTserverCount);
+      Timer timer = Timer.startNew();
       getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
+      Wait.waitFor(
+          () -> 
client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).isEmpty(),
+          120_000);
+      log.info("Took {} ms to stop all tservers", timer.elapsed(MILLISECONDS));
+      timer.restart();
       getClusterControl().startAllServers(ServerType.TABLET_SERVER);
+      Wait.waitFor(() -> 
client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size()
+          < initialTserverCount, 120_000);
+      log.info("Took {} ms to start all tservers", 
timer.elapsed(MILLISECONDS));
+
+      FileSystem fs = cluster.getFileSystem();
+      testDir = new Path(cluster.getTemporaryPath(), "testmf");
+      fs.deleteOnExit(testDir);
+
+      timer.restart();
+      FunctionalTestUtils.createRFiles(client, fs, testDir.toString(), ROWS, 
SPLITS, 8);
+      long elapsed = timer.elapsed(MILLISECONDS);
+      FileStatus[] stats = fs.listStatus(testDir);
+      log.info("Generated {} files in {} ms", stats.length, elapsed);
     }
   }
 
   @AfterEach
   public void resetConfig() throws Exception {
-    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
-      getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
-      getClusterControl().startAllServers(ServerType.TABLET_SERVER);
-    }
+    getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
+    getClusterControl().startAllServers(ServerType.TABLET_SERVER);
   }
 
   private static final int ROWS = 100000;
@@ -68,35 +100,51 @@ public class BulkSplitOptimizationIT extends 
AccumuloClusterHarness {
 
   @Test
   public void testBulkSplitOptimization() throws Exception {
+    log.info("Starting BulkSplitOptimizationIT test");
     try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
-      final String tableName = getUniqueNames(1)[0];
-      c.tableOperations().create(tableName);
-      c.tableOperations().setProperty(tableName, 
Property.TABLE_MAJC_RATIO.getKey(), "1000");
-      c.tableOperations().setProperty(tableName, 
Property.TABLE_FILE_MAX.getKey(), "1000");
-      c.tableOperations().setProperty(tableName, 
Property.TABLE_SPLIT_THRESHOLD.getKey(), "1G");
-      FileSystem fs = cluster.getFileSystem();
-      Path testDir = new Path(cluster.getTemporaryPath(), "testmf");
-      fs.deleteOnExit(testDir);
-      FunctionalTestUtils.createRFiles(c, fs, testDir.toString(), ROWS, 
SPLITS, 8);
-      FileStatus[] stats = fs.listStatus(testDir);
 
-      System.out.println("Number of generated files: " + stats.length);
+      final String tableName = getUniqueNames(1)[0];
+      Map<String,String> tableProps = new HashMap<>();
+      tableProps.put(Property.TABLE_MAJC_RATIO.getKey(), "1000");
+      tableProps.put(Property.TABLE_FILE_MAX.getKey(), "1000");
+      tableProps.put(Property.TABLE_SPLIT_THRESHOLD.getKey(), "1G");
+
+      log.info("Creating table {}", tableName);
+      Timer timer = Timer.startNew();
+      c.tableOperations().create(tableName, new 
NewTableConfiguration().setProperties(tableProps)
+          .withInitialTabletAvailability(TabletAvailability.HOSTED));
+      log.info("Created table in {} ms. Starting bulk import", 
timer.elapsed(MILLISECONDS));
+
+      timer.restart();
       
c.tableOperations().importDirectory(testDir.toString()).to(tableName).load();
+      log.info("Imported into table {} in {} ms", tableName, 
timer.elapsed(MILLISECONDS));
 
+      timer.restart();
       FunctionalTestUtils.checkSplits(c, tableName, 0, 0);
       FunctionalTestUtils.checkRFiles(c, tableName, 1, 1, 100, 100);
+      log.info("Checked splits and rfiles in {} ms", 
timer.elapsed(MILLISECONDS));
 
-      // initiate splits
+      log.info("Lowering split threshold to 100K to initiate splits");
       c.tableOperations().setProperty(tableName, 
Property.TABLE_SPLIT_THRESHOLD.getKey(), "100K");
 
-      Thread.sleep(SECONDS.toMillis(2));
+      timer.restart();
 
       // wait until over split threshold -- should be 78 splits
-      while (c.tableOperations().listSplits(tableName).size() < 50) {
-        Thread.sleep(500);
-      }
+      Wait.waitFor(() -> {
+        try {
+          FunctionalTestUtils.checkSplits(c, tableName, 50, 100);
+        } catch (Exception e) {
+          if (e.getMessage().contains("splits points out of range")) {
+            return false;
+          } else {
+            throw e;
+          }
+        }
+        return true;
+      });
+
+      log.info("Took {} ms for split count to reach expected range", 
timer.elapsed(MILLISECONDS));
 
-      FunctionalTestUtils.checkSplits(c, tableName, 50, 100);
       VerifyParams params = new VerifyParams(getClientProps(), tableName, 
ROWS);
       params.timestamp = 1;
       params.dataSize = 50;

Reply via email to