This is an automated email from the ASF dual-hosted git repository.
domgarguilo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 8d17a99733 Improve BulkSplitOptimizationIT (#5179)
8d17a99733 is described below
commit 8d17a99733ad5070bf8f733ad4c8775280ce8c2e
Author: Dom G. <[email protected]>
AuthorDate: Fri Jan 17 12:16:51 2025 -0500
Improve BulkSplitOptimizationIT (#5179)
* Improve BulkSplitOptimizationIT
* Add timing to test logging
---
.../test/functional/BulkSplitOptimizationIT.java | 94 ++++++++++++++++------
1 file changed, 71 insertions(+), 23 deletions(-)
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java
index de2ed8475f..bd29f6e10e 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java
@@ -18,49 +18,81 @@
*/
package org.apache.accumulo.test.functional;
-import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.admin.TabletAvailability;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.test.VerifyIngest;
import org.apache.accumulo.test.VerifyIngest.VerifyParams;
+import org.apache.accumulo.test.util.Wait;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This test verifies that when a lot of files are bulk imported into a table
with one tablet and
* then splits that not all data files go to the children tablets.
*/
public class BulkSplitOptimizationIT extends AccumuloClusterHarness {
+ private static final Logger log =
LoggerFactory.getLogger(BulkSplitOptimizationIT.class);
+
+ Path testDir;
@Override
protected Duration defaultTimeout() {
- return Duration.ofMinutes(2);
+ return Duration.ofMinutes(5);
}
@BeforeEach
public void alterConfig() throws Exception {
try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
+ final int initialTserverCount =
+
client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size();
+ log.info("Tserver count: {}", initialTserverCount);
+ Timer timer = Timer.startNew();
getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
+ Wait.waitFor(
+ () ->
client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).isEmpty(),
+ 120_000);
+ log.info("Took {} ms to stop all tservers", timer.elapsed(MILLISECONDS));
+ timer.restart();
getClusterControl().startAllServers(ServerType.TABLET_SERVER);
+ Wait.waitFor(() ->
client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size()
+ < initialTserverCount, 120_000);
+ log.info("Took {} ms to start all tservers",
timer.elapsed(MILLISECONDS));
+
+ FileSystem fs = cluster.getFileSystem();
+ testDir = new Path(cluster.getTemporaryPath(), "testmf");
+ fs.deleteOnExit(testDir);
+
+ timer.restart();
+ FunctionalTestUtils.createRFiles(client, fs, testDir.toString(), ROWS,
SPLITS, 8);
+ long elapsed = timer.elapsed(MILLISECONDS);
+ FileStatus[] stats = fs.listStatus(testDir);
+ log.info("Generated {} files in {} ms", stats.length, elapsed);
}
}
@AfterEach
public void resetConfig() throws Exception {
- try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
- getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
- getClusterControl().startAllServers(ServerType.TABLET_SERVER);
- }
+ getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
+ getClusterControl().startAllServers(ServerType.TABLET_SERVER);
}
private static final int ROWS = 100000;
@@ -68,35 +100,51 @@ public class BulkSplitOptimizationIT extends
AccumuloClusterHarness {
@Test
public void testBulkSplitOptimization() throws Exception {
+ log.info("Starting BulkSplitOptimizationIT test");
try (AccumuloClient c =
Accumulo.newClient().from(getClientProps()).build()) {
- final String tableName = getUniqueNames(1)[0];
- c.tableOperations().create(tableName);
- c.tableOperations().setProperty(tableName,
Property.TABLE_MAJC_RATIO.getKey(), "1000");
- c.tableOperations().setProperty(tableName,
Property.TABLE_FILE_MAX.getKey(), "1000");
- c.tableOperations().setProperty(tableName,
Property.TABLE_SPLIT_THRESHOLD.getKey(), "1G");
- FileSystem fs = cluster.getFileSystem();
- Path testDir = new Path(cluster.getTemporaryPath(), "testmf");
- fs.deleteOnExit(testDir);
- FunctionalTestUtils.createRFiles(c, fs, testDir.toString(), ROWS,
SPLITS, 8);
- FileStatus[] stats = fs.listStatus(testDir);
- System.out.println("Number of generated files: " + stats.length);
+ final String tableName = getUniqueNames(1)[0];
+ Map<String,String> tableProps = new HashMap<>();
+ tableProps.put(Property.TABLE_MAJC_RATIO.getKey(), "1000");
+ tableProps.put(Property.TABLE_FILE_MAX.getKey(), "1000");
+ tableProps.put(Property.TABLE_SPLIT_THRESHOLD.getKey(), "1G");
+
+ log.info("Creating table {}", tableName);
+ Timer timer = Timer.startNew();
+ c.tableOperations().create(tableName, new
NewTableConfiguration().setProperties(tableProps)
+ .withInitialTabletAvailability(TabletAvailability.HOSTED));
+ log.info("Created table in {} ms. Starting bulk import",
timer.elapsed(MILLISECONDS));
+
+ timer.restart();
c.tableOperations().importDirectory(testDir.toString()).to(tableName).load();
+ log.info("Imported into table {} in {} ms", tableName,
timer.elapsed(MILLISECONDS));
+ timer.restart();
FunctionalTestUtils.checkSplits(c, tableName, 0, 0);
FunctionalTestUtils.checkRFiles(c, tableName, 1, 1, 100, 100);
+ log.info("Checked splits and rfiles in {} ms",
timer.elapsed(MILLISECONDS));
- // initiate splits
+ log.info("Lowering split threshold to 100K to initiate splits");
c.tableOperations().setProperty(tableName,
Property.TABLE_SPLIT_THRESHOLD.getKey(), "100K");
- Thread.sleep(SECONDS.toMillis(2));
+ timer.restart();
// wait until over split threshold -- should be 78 splits
- while (c.tableOperations().listSplits(tableName).size() < 50) {
- Thread.sleep(500);
- }
+ Wait.waitFor(() -> {
+ try {
+ FunctionalTestUtils.checkSplits(c, tableName, 50, 100);
+ } catch (Exception e) {
+ if (e.getMessage().contains("splits points out of range")) {
+ return false;
+ } else {
+ throw e;
+ }
+ }
+ return true;
+ });
+
+ log.info("Took {} ms for split count to reach expected range",
timer.elapsed(MILLISECONDS));
- FunctionalTestUtils.checkSplits(c, tableName, 50, 100);
VerifyParams params = new VerifyParams(getClientProps(), tableName,
ROWS);
params.timestamp = 1;
params.dataSize = 50;