This is an automated email from the ASF dual-hosted git repository.
domgarguilo pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/3.1 by this push:
new 55c0d4ccee Improvements to ExternalCompactionsIT (#5297)
55c0d4ccee is described below
commit 55c0d4ccee250b8ee8596d073eafeac2679febc3
Author: Dom G. <[email protected]>
AuthorDate: Tue Feb 4 10:18:16 2025 -0500
Improvements to ExternalCompactionsIT (#5297)
---
.../compaction/ExternalCompactionTestUtils.java | 6 ++++
.../test/compaction/ExternalCompaction_1_IT.java | 39 ++++++++--------------
.../test/compaction/ExternalCompaction_2_IT.java | 22 ++++++------
3 files changed, 30 insertions(+), 37 deletions(-)
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
index a0b470cadf..1b6981f679 100644
---
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
@@ -99,6 +99,12 @@ public class ExternalCompactionTestUtils {
.filter(state -> state.getExtent().tableId().equals(tid));
}
+ public static long getFinalStateForTableCount(AccumuloCluster cluster,
TableId tid) {
+ try (var finalStatesForTable = getFinalStatesForTable(cluster, tid)) {
+ return finalStatesForTable.count();
+ }
+ }
+
public static void compact(final AccumuloClient client, String table1, int
modulus,
String expectedQueue, boolean wait)
throws AccumuloSecurityException, TableNotFoundException,
AccumuloException {
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
index 695c97ebbf..7bf53e10ca 100644
---
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
@@ -31,6 +31,7 @@ import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QU
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE8;
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact;
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable;
+import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.getFinalStateForTableCount;
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.getFinalStatesForTable;
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.row;
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.verify;
@@ -56,7 +57,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
import org.apache.accumulo.compactor.Compactor;
import org.apache.accumulo.compactor.ExtCEnv.CompactorIterEnv;
@@ -90,20 +90,19 @@ import
org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.metadata.AccumuloTable;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState;
-import
org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState.FinalState;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionMetadata;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher;
-import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
import org.apache.accumulo.harness.SharedMiniClusterBase;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.functional.SlowIterator;
+import org.apache.accumulo.test.util.Wait;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.AfterEach;
@@ -245,12 +244,7 @@ public class ExternalCompaction_1_IT extends
SharedMiniClusterBase {
getCluster().getClusterControl().stop(ServerType.COMPACTOR);
// DeadCompactionDetector in the CompactionCoordinator should fail the
compaction.
- long count = 0;
- while (count == 0) {
- count = getFinalStatesForTable(getCluster(), tid)
- .filter(state ->
state.getFinalState().equals(FinalState.FAILED)).count();
- UtilWaitThread.sleep(250);
- }
+ Wait.waitFor(() -> getFinalStateForTableCount(getCluster(), tid) > 0);
// We need to cancel the compaction or delete the table here because we
initiate a user
// compaction above in the test. Even though the external compaction was
cancelled
@@ -411,12 +405,10 @@ public class ExternalCompaction_1_IT extends
SharedMiniClusterBase {
// metadata table entries to show up.
LOG.info("Waiting for external compaction to complete.");
TableId tid = getCluster().getServerContext().getTableId(table3);
- Stream<ExternalCompactionFinalState> fs =
getFinalStatesForTable(getCluster(), tid);
- while (fs.findAny().isEmpty()) {
+ Wait.waitFor(() -> {
LOG.info("Waiting for compaction completed marker to appear");
- UtilWaitThread.sleep(250);
- fs = getFinalStatesForTable(getCluster(), tid);
- }
+ return getFinalStateForTableCount(getCluster(), tid) > 0;
+ }, 120_000, 250);
LOG.info("Validating metadata table contents.");
try (TabletsMetadata tm =
getCluster().getServerContext().getAmple().readTablets()
@@ -424,11 +416,11 @@ public class ExternalCompaction_1_IT extends
SharedMiniClusterBase {
TabletMetadata m = tm.stream().collect(onlyElement());
Map<ExternalCompactionId,ExternalCompactionMetadata> em =
m.getExternalCompactions();
assertEquals(1, em.size());
- List<ExternalCompactionFinalState> finished = new ArrayList<>();
- getFinalStatesForTable(getCluster(), tid).forEach(f ->
finished.add(f));
- assertEquals(1, finished.size());
- assertEquals(em.entrySet().iterator().next().getKey(),
- finished.get(0).getExternalCompactionId());
+ try (var finalStates = getFinalStatesForTable(getCluster(), tid)
+ .map(ExternalCompactionFinalState::getExternalCompactionId)) {
+ ExternalCompactionId actual = finalStates.collect(onlyElement());
+ assertEquals(em.entrySet().iterator().next().getKey(), actual);
+ }
}
// Force a flush on the metadata table before killing our tserver
@@ -445,13 +437,10 @@ public class ExternalCompaction_1_IT extends
SharedMiniClusterBase {
getCluster().getClusterControl().start(ServerType.TABLET_SERVER);
// Wait for the compaction to be committed.
- LOG.info("Waiting for compaction completed marker to disappear");
- Stream<ExternalCompactionFinalState> fs2 =
getFinalStatesForTable(getCluster(), tid);
- while (fs2.findAny().isPresent()) {
+ Wait.waitFor(() -> {
LOG.info("Waiting for compaction completed marker to disappear");
- UtilWaitThread.sleep(500);
- fs2 = getFinalStatesForTable(getCluster(), tid);
- }
+ return getFinalStateForTableCount(getCluster(), tid) == 0;
+ }, 120_000, 500);
verify(client, table3, 2);
// We need to cancel the compaction or delete the table here because we
initiate a user
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java
index 3b902a5eab..b10c372cb3 100644
---
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java
@@ -28,7 +28,7 @@ import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.co
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.confirmCompactionCompleted;
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.confirmCompactionRunning;
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable;
-import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.getFinalStatesForTable;
+import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.getFinalStateForTableCount;
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.getRunningCompactions;
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.row;
import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.waitForCompactionStartAndReturnEcids;
@@ -60,6 +60,7 @@ import
org.apache.accumulo.harness.MiniClusterConfigurationCallback;
import org.apache.accumulo.harness.SharedMiniClusterBase;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.util.Wait;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.AfterEach;
@@ -165,8 +166,8 @@ public class ExternalCompaction_2_IT extends
SharedMiniClusterBase {
.startCoordinator(TestCompactionCoordinatorForOfflineTable.class);
TableId tid = getCluster().getServerContext().getTableId(table1);
- // Confirm that no final state is in the metadata table
- assertEquals(0, getFinalStatesForTable(getCluster(), tid).count());
+ assertEquals(0, getFinalStateForTableCount(getCluster(), tid),
+ "Expected no final state is in the metadata table");
// Offline the table when the compaction starts
final AtomicBoolean succeededInTakingOffline = new AtomicBoolean(false);
@@ -209,20 +210,17 @@ public class ExternalCompaction_2_IT extends
SharedMiniClusterBase {
confirmCompactionCompleted(getCluster().getServerContext(), ecids,
TCompactionState.SUCCEEDED);
- // Confirm that final state is in the metadata table
- assertEquals(1, getFinalStatesForTable(getCluster(), tid).count());
+ assertEquals(1, getFinalStateForTableCount(getCluster(), tid),
+ "Expected 1 final state in the metadata table");
// Online the table
client.tableOperations().online(table1);
// wait for compaction to be committed by tserver or test timeout
- long finalStateCount = getFinalStatesForTable(getCluster(), tid).count();
- while (finalStateCount > 0) {
- finalStateCount = getFinalStatesForTable(getCluster(), tid).count();
- if (finalStateCount > 0) {
- UtilWaitThread.sleep(50);
- }
- }
+ Wait.waitFor(() -> {
+ LOG.info("Waiting for compaction completed marker to disappear");
+ return getFinalStateForTableCount(getCluster(), tid) == 0;
+ }, 120_000, 500);
// We need to cancel the compaction or delete the table here because we
initiate a user
// compaction above in the test. Even though the external compaction was
cancelled