This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch 3.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/3.1 by this push: new 55c0d4ccee Improvements to ExternalCompactionsIT (#5297) 55c0d4ccee is described below commit 55c0d4ccee250b8ee8596d073eafeac2679febc3 Author: Dom G. <domgargu...@apache.org> AuthorDate: Tue Feb 4 10:18:16 2025 -0500 Improvements to ExternalCompactionsIT (#5297) --- .../compaction/ExternalCompactionTestUtils.java | 6 ++++ .../test/compaction/ExternalCompaction_1_IT.java | 39 ++++++++-------------- .../test/compaction/ExternalCompaction_2_IT.java | 22 ++++++------ 3 files changed, 30 insertions(+), 37 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java index a0b470cadf..1b6981f679 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java @@ -99,6 +99,12 @@ public class ExternalCompactionTestUtils { .filter(state -> state.getExtent().tableId().equals(tid)); } + public static long getFinalStateForTableCount(AccumuloCluster cluster, TableId tid) { + try (var finalStatesForTable = getFinalStatesForTable(cluster, tid)) { + return finalStatesForTable.count(); + } + } + public static void compact(final AccumuloClient client, String table1, int modulus, String expectedQueue, boolean wait) throws AccumuloSecurityException, TableNotFoundException, AccumuloException { diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java index 695c97ebbf..7bf53e10ca 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java @@ -31,6 +31,7 @@ import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QU import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE8; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.getFinalStateForTableCount; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.getFinalStatesForTable; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.row; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.verify; @@ -56,7 +57,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.accumulo.compactor.Compactor; import org.apache.accumulo.compactor.ExtCEnv.CompactorIterEnv; @@ -90,20 +90,19 @@ import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState; -import org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState.FinalState; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metadata.schema.ExternalCompactionMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher; -import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.harness.MiniClusterConfigurationCallback; import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.test.functional.SlowIterator; +import org.apache.accumulo.test.util.Wait; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.junit.jupiter.api.AfterEach; @@ -245,12 +244,7 @@ public class ExternalCompaction_1_IT extends SharedMiniClusterBase { getCluster().getClusterControl().stop(ServerType.COMPACTOR); // DeadCompactionDetector in the CompactionCoordinator should fail the compaction. - long count = 0; - while (count == 0) { - count = getFinalStatesForTable(getCluster(), tid) - .filter(state -> state.getFinalState().equals(FinalState.FAILED)).count(); - UtilWaitThread.sleep(250); - } + Wait.waitFor(() -> getFinalStateForTableCount(getCluster(), tid) > 0); // We need to cancel the compaction or delete the table here because we initiate a user // compaction above in the test. Even though the external compaction was cancelled @@ -411,12 +405,10 @@ public class ExternalCompaction_1_IT extends SharedMiniClusterBase { // metadata table entries to show up. LOG.info("Waiting for external compaction to complete."); TableId tid = getCluster().getServerContext().getTableId(table3); - Stream<ExternalCompactionFinalState> fs = getFinalStatesForTable(getCluster(), tid); - while (fs.findAny().isEmpty()) { + Wait.waitFor(() -> { LOG.info("Waiting for compaction completed marker to appear"); - UtilWaitThread.sleep(250); - fs = getFinalStatesForTable(getCluster(), tid); - } + return getFinalStateForTableCount(getCluster(), tid) > 0; + }, 120_000, 250); LOG.info("Validating metadata table contents."); try (TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets() @@ -424,11 +416,11 @@ public class ExternalCompaction_1_IT extends SharedMiniClusterBase { TabletMetadata m = tm.stream().collect(onlyElement()); Map<ExternalCompactionId,ExternalCompactionMetadata> em = m.getExternalCompactions(); assertEquals(1, em.size()); - List<ExternalCompactionFinalState> finished = new ArrayList<>(); - getFinalStatesForTable(getCluster(), tid).forEach(f -> finished.add(f)); - assertEquals(1, finished.size()); - assertEquals(em.entrySet().iterator().next().getKey(), - finished.get(0).getExternalCompactionId()); + try (var finalStates = getFinalStatesForTable(getCluster(), tid) + .map(ExternalCompactionFinalState::getExternalCompactionId)) { + ExternalCompactionId actual = finalStates.collect(onlyElement()); + assertEquals(em.entrySet().iterator().next().getKey(), actual); + } } // Force a flush on the metadata table before killing our tserver @@ -445,13 +437,10 @@ public class ExternalCompaction_1_IT extends SharedMiniClusterBase { getCluster().getClusterControl().start(ServerType.TABLET_SERVER); // Wait for the compaction to be committed. - LOG.info("Waiting for compaction completed marker to disappear"); - Stream<ExternalCompactionFinalState> fs2 = getFinalStatesForTable(getCluster(), tid); - while (fs2.findAny().isPresent()) { + Wait.waitFor(() -> { LOG.info("Waiting for compaction completed marker to disappear"); - UtilWaitThread.sleep(500); - fs2 = getFinalStatesForTable(getCluster(), tid); - } + return getFinalStateForTableCount(getCluster(), tid) == 0; + }, 120_000, 500); verify(client, table3, 2); // We need to cancel the compaction or delete the table here because we initiate a user diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java index 3b902a5eab..b10c372cb3 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java @@ -28,7 +28,7 @@ import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.co import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.confirmCompactionCompleted; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.confirmCompactionRunning; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable; -import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.getFinalStatesForTable; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.getFinalStateForTableCount; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.getRunningCompactions; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.row; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.waitForCompactionStartAndReturnEcids; @@ -60,6 +60,7 @@ import org.apache.accumulo.harness.MiniClusterConfigurationCallback; import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.util.Wait; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.junit.jupiter.api.AfterEach; @@ -165,8 +166,8 @@ public class ExternalCompaction_2_IT extends SharedMiniClusterBase { .startCoordinator(TestCompactionCoordinatorForOfflineTable.class); TableId tid = getCluster().getServerContext().getTableId(table1); - // Confirm that no final state is in the metadata table - assertEquals(0, getFinalStatesForTable(getCluster(), tid).count()); + assertEquals(0, getFinalStateForTableCount(getCluster(), tid), + "Expected no final state is in the metadata table"); // Offline the table when the compaction starts final AtomicBoolean succeededInTakingOffline = new AtomicBoolean(false); @@ -209,20 +210,17 @@ public class ExternalCompaction_2_IT extends SharedMiniClusterBase { confirmCompactionCompleted(getCluster().getServerContext(), ecids, TCompactionState.SUCCEEDED); - // Confirm that final state is in the metadata table - assertEquals(1, getFinalStatesForTable(getCluster(), tid).count()); + assertEquals(1, getFinalStateForTableCount(getCluster(), tid), + "Expected 1 final state in the metadata table"); // Online the table client.tableOperations().online(table1); // wait for compaction to be committed by tserver or test timeout - long finalStateCount = getFinalStatesForTable(getCluster(), tid).count(); - while (finalStateCount > 0) { - finalStateCount = getFinalStatesForTable(getCluster(), tid).count(); - if (finalStateCount > 0) { - UtilWaitThread.sleep(50); - } - } + Wait.waitFor(() -> { + LOG.info("Waiting for compaction completed marker to disappear"); + return getFinalStateForTableCount(getCluster(), tid) == 0; + }, 120_000, 500); // We need to cancel the compaction or delete the table here because we initiate a user // compaction above in the test. Even though the external compaction was cancelled