This is an automated email from the ASF dual-hosted git repository. jbarrett pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
commit 7514b7c53c6fee87896badec387158ba85b8a4c8 Author: Jacob Barrett <[email protected]> AuthorDate: Fri Apr 8 14:50:11 2022 -0700 GEODE-8228: Await on stats. --- .../geode/internal/cache/wan/WANTestBase.java | 81 ++- ...rallelGatewaySenderAlertThresholdDUnitTest.java | 41 +- .../ParallelGatewaySenderOperationsDUnitTest.java | 22 +- .../parallel/ParallelWANConflationDUnitTest.java | 101 ++- .../wan/parallel/ParallelWANStatsDUnitTest.java | 745 +++++++++++---------- .../serial/SerialGatewaySenderQueueDUnitTest.java | 14 +- .../wan/serial/SerialWANConflationDUnitTest.java | 54 +- .../cache/wan/serial/SerialWANStatsDUnitTest.java | 42 +- 8 files changed, 544 insertions(+), 556 deletions(-) diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java index 62f4ca9749..16ea5f6b91 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java @@ -1159,11 +1159,8 @@ public class WANTestBase extends DistributedTestCase { public static void checkQueueSizeInStats(String senderId, final int expectedQueueSize) { AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId); GatewaySenderStats statistics = sender.getStatistics(); - await() - .untilAsserted(() -> assertThat(statistics.getEventQueueSize()).as( - "Expected queue size: " + expectedQueueSize - + " but actual size: " + statistics.getEventQueueSize()) - .isEqualTo(expectedQueueSize)); + await().untilAsserted(() -> assertThat(statistics.getEventQueueSize()) + .isEqualTo(expectedQueueSize)); } public static void checkConnectionStats(String senderId) { @@ -1186,11 +1183,7 @@ public class WANTestBase extends DistributedTestCase { (ConcurrentParallelGatewaySenderQueue) regionQueue; parallelGatewaySenderQueue.getRegions(); } - await() - .untilAsserted(() -> assertThat(regionQueue.size()).as( - "Expected queue entries: " + expectedQueueSize - + " but actual entries: " + regionQueue.size()) - .isEqualTo(expectedQueueSize)); + await().untilAsserted(() -> assertThat(regionQueue.size()).isEqualTo(expectedQueueSize)); } ArrayList<Integer> stats = new ArrayList<>(); stats.add(statistics.getEventQueueSize()); @@ -1237,10 +1230,12 @@ public class WANTestBase extends DistributedTestCase { public static void checkQueueStats(String senderId, final int queueSize, final int eventsReceived, final int eventsQueued, final int eventsDistributed) { GatewaySenderStats statistics = getGatewaySenderStats(senderId); - assertThat(statistics.getEventQueueSize()).isEqualTo(queueSize); - assertThat(statistics.getEventsReceived()).isEqualTo(eventsReceived); - assertThat(statistics.getEventsQueued()).isEqualTo(eventsQueued); - assert (statistics.getEventsDistributed() >= eventsDistributed); + await().untilAsserted(() -> { + assertThat(statistics.getEventQueueSize()).isEqualTo(queueSize); + assertThat(statistics.getEventsReceived()).isEqualTo(eventsReceived); + assertThat(statistics.getEventsQueued()).isEqualTo(eventsQueued); + assertThat(statistics.getEventsDistributed()).isGreaterThanOrEqualTo(eventsDistributed); + }); } public static void checkGatewayReceiverStats(int processBatches, int eventsReceived, @@ -1322,19 +1317,23 @@ public class WANTestBase extends DistributedTestCase { public static void checkEventFilteredStats(String senderId, final int eventsFiltered) { GatewaySenderStats statistics = getGatewaySenderStats(senderId); - assertThat(statistics.getEventsFiltered()).isEqualTo(eventsFiltered); + await() + .untilAsserted(() -> assertThat(statistics.getEventsFiltered()).isEqualTo(eventsFiltered)); } public static void checkConflatedStats(String senderId, final int eventsConflated) { GatewaySenderStats statistics = getGatewaySenderStats(senderId); - assertThat(statistics.getEventsNotQueuedConflated()).isEqualTo(eventsConflated); + await().untilAsserted( + () -> assertThat(statistics.getEventsNotQueuedConflated()).isEqualTo(eventsConflated)); } public static void checkStats_Failover(String senderId, final int eventsReceived) { GatewaySenderStats statistics = getGatewaySenderStats(senderId); - assertThat(statistics.getEventsReceived()).isEqualTo(eventsReceived); - assertThat((statistics.getEventsQueued() + statistics.getUnprocessedTokensAddedByPrimary() - + statistics.getUnprocessedEventsRemovedByPrimary())).isEqualTo(eventsReceived); + await().untilAsserted(() -> { + assertThat(statistics.getEventsReceived()).isEqualTo(eventsReceived); + assertThat((statistics.getEventsQueued() + statistics.getUnprocessedTokensAddedByPrimary() + + statistics.getUnprocessedEventsRemovedByPrimary())).isEqualTo(eventsReceived); + }); } public static void checkBatchStats(String senderId, final int batches) { @@ -1343,38 +1342,46 @@ public class WANTestBase extends DistributedTestCase { public static void checkBatchStats(String senderId, final int batches, boolean isExact) { GatewaySenderStats statistics = getGatewaySenderStats(senderId); - if (isExact) { - assert (statistics.getBatchesDistributed() == batches); - } else { - assert (statistics.getBatchesDistributed() >= batches); - } - assertThat(statistics.getBatchesRedistributed()).isEqualTo(0); + await().untilAsserted(() -> { + if (isExact) { + assertThat(statistics.getBatchesDistributed()).isEqualTo(batches); + } else { + assertThat(statistics.getBatchesDistributed()).isGreaterThanOrEqualTo(batches); + } + assertThat(statistics.getBatchesRedistributed()).isEqualTo(0); + }); } public static void checkBatchStats(String senderId, final int batches, boolean isExact, final boolean batchesRedistributed) { GatewaySenderStats statistics = getGatewaySenderStats(senderId); - if (isExact) { - assert (statistics.getBatchesDistributed() == batches); - } else { - assert (statistics.getBatchesDistributed() >= batches); - } - assertThat((statistics.getBatchesRedistributed() > 0)).isEqualTo(batchesRedistributed); + await().untilAsserted(() -> { + if (isExact) { + assertThat(statistics.getBatchesDistributed()).isEqualTo(batches); + } else { + assertThat(statistics.getBatchesDistributed()).isGreaterThanOrEqualTo(batches); + } + assertThat((statistics.getBatchesRedistributed() > 0)).isEqualTo(batchesRedistributed); + }); } public static void checkBatchStats(String senderId, final boolean batchesDistributed, final boolean batchesRedistributed) { GatewaySenderStats statistics = getGatewaySenderStats(senderId); - assertThat((statistics.getBatchesDistributed() > 0)).isEqualTo(batchesDistributed); - assertThat((statistics.getBatchesRedistributed() > 0)).isEqualTo(batchesRedistributed); + await().untilAsserted(() -> { + assertThat((statistics.getBatchesDistributed() > 0)).isEqualTo(batchesDistributed); + assertThat((statistics.getBatchesRedistributed() > 0)).isEqualTo(batchesRedistributed); + }); } public static void checkUnProcessedStats(String senderId, int events) { GatewaySenderStats statistics = getGatewaySenderStats(senderId); - assertThat((statistics.getUnprocessedEventsAddedBySecondary() - + statistics.getUnprocessedTokensRemovedBySecondary())).isEqualTo(events); - assertThat((statistics.getUnprocessedEventsRemovedByPrimary() - + statistics.getUnprocessedTokensAddedByPrimary())).isEqualTo(events); + await().untilAsserted(() -> { + assertThat((statistics.getUnprocessedEventsAddedBySecondary() + + statistics.getUnprocessedTokensRemovedBySecondary())).isEqualTo(events); + assertThat((statistics.getUnprocessedEventsRemovedByPrimary() + + statistics.getUnprocessedTokensAddedByPrimary())).isEqualTo(events); + }); } public static GatewaySenderStats getGatewaySenderStats(String senderId) { diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderAlertThresholdDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderAlertThresholdDUnitTest.java index 2b0c332e4a..f7e2f77f85 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderAlertThresholdDUnitTest.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderAlertThresholdDUnitTest.java @@ -17,7 +17,7 @@ package org.apache.geode.internal.cache.wan.parallel; import static org.apache.geode.test.awaitility.GeodeAwaitility.await; import static org.assertj.core.api.Assertions.assertThat; -import java.util.ArrayList; +import java.util.List; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -86,17 +86,16 @@ public class ParallelGatewaySenderAlertThresholdDUnitTest extends WANTestBase { vm2.invoke(serializableRunnableIF); vm3.invoke(serializableRunnableIF); - ArrayList<Integer> v4List = - (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1)); - ArrayList<Integer> v5List = - (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1)); - ArrayList<Integer> v6List = - (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", -1)); - ArrayList<Integer> v7List = - (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", -1)); + await().untilAsserted(() -> { + List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1)); + List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1)); + List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", -1)); + List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", -1)); - assertThat((v4List.get(12) + v5List.get(12) + v6List.get(12) + v7List.get(12)) > 0).as( - "GatewaySenders Stats should contain number of EventsExceedingAlertThreshold > 0").isTrue(); + assertThat((v4List.get(12) + v5List.get(12) + v6List.get(12) + v7List.get(12)) > 0).as( + "GatewaySenders Stats should contain number of EventsExceedingAlertThreshold > 0") + .isTrue(); + }); int v4alert = vm4.invoke( ParallelGatewaySenderAlertThresholdDUnitTest::checkSenderMBeanAlertThreshold); @@ -161,18 +160,16 @@ public class ParallelGatewaySenderAlertThresholdDUnitTest extends WANTestBase { vm2.invoke(serializableRunnableIF); vm3.invoke(serializableRunnableIF); - ArrayList<Integer> v4List = - (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1)); - ArrayList<Integer> v5List = - (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1)); - ArrayList<Integer> v6List = - (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", -1)); - ArrayList<Integer> v7List = - (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", -1)); + await().untilAsserted(() -> { + List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1)); + List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1)); + List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", -1)); + List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", -1)); - assertThat(0).as( - "GatewaySenders Stats should contain number of EventsExceedingAlertThreshold = 0") - .isEqualTo((v4List.get(12) + v5List.get(12) + v6List.get(12) + v7List.get(12))); + assertThat(0).as( + "GatewaySenders Stats should contain number of EventsExceedingAlertThreshold = 0") + .isEqualTo((v4List.get(12) + v5List.get(12) + v6List.get(12) + v7List.get(12))); + }); int v4alert = vm4.invoke( ParallelGatewaySenderAlertThresholdDUnitTest::checkSenderMBeanAlertThreshold); diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java index a9fe2e04c3..78e2aa2360 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java @@ -479,14 +479,10 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7); async.join(); - ArrayList<Integer> vm4List = - (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln")); - ArrayList<Integer> vm5List = - (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln")); - ArrayList<Integer> vm6List = - (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln")); - ArrayList<Integer> vm7List = - (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln")); + List<Integer> vm4List = vm4.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln")); + List<Integer> vm5List = vm5.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln")); + List<Integer> vm6List = vm6.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln")); + List<Integer> vm7List = vm7.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln")); if (vm4List.get(0) + vm5List.get(0) + vm6List.get(0) + vm7List.get(0) > 0) { foundEventsDroppedDueToPrimarySenderNotRunning = true; } @@ -1258,12 +1254,10 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { .isEqualTo(100); await().untilAsserted(() -> { - int vm4SecondarySize = vm4.invoke(() -> getSecondaryQueueSizeInStats("ln")); - int vm5SecondarySize = vm5.invoke(() -> getSecondaryQueueSizeInStats("ln")); - int vm6SecondarySize = vm6.invoke(() -> getSecondaryQueueSizeInStats("ln")); - int vm7SecondarySize = vm7.invoke(() -> getSecondaryQueueSizeInStats("ln")); - assertThat(vm4SecondarySize + vm5SecondarySize + vm6SecondarySize + vm7SecondarySize) - .isEqualTo(0); + assertThat(vm4.invoke(() -> getSecondaryQueueSizeInStats("ln"))).isZero(); + assertThat(vm5.invoke(() -> getSecondaryQueueSizeInStats("ln"))).isZero(); + assertThat(vm6.invoke(() -> getSecondaryQueueSizeInStats("ln"))).isZero(); + assertThat(vm7.invoke(() -> getSecondaryQueueSizeInStats("ln"))).isZero(); }); } diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java index e7dafb0e95..88fa8a43b3 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java @@ -17,8 +17,8 @@ package org.apache.geode.internal.cache.wan.parallel; import static org.apache.geode.test.awaitility.GeodeAwaitility.await; import static org.assertj.core.api.Assertions.assertThat; -import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Random; @@ -120,26 +120,26 @@ public class ParallelWANConflationDUnitTest extends WANTestBase { vm6.invoke(() -> enableConflation("ln")); vm7.invoke(() -> enableConflation("ln")); - ArrayList<Integer> v4List = - (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 100)); - ArrayList<Integer> v5List = - (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 100)); - ArrayList<Integer> v6List = - (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 100)); - ArrayList<Integer> v7List = - (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 100)); - assertThat((v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)) == 100).as( - "Event in secondary queue should be 100").isTrue(); + await().untilAsserted(() -> { + List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 100)); + List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 100)); + List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 100)); + List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 100)); + assertThat((v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)) == 100).as( + "Event in secondary queue should be 100").isTrue(); + }); resumeSenders(); - v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + await().untilAsserted(() -> { + List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - assertThat((v4List.get(8) + v5List.get(8) + v6List.get(8) + v7List.get(8)) > 0).as( - "No events conflated in batch").isTrue(); + assertThat((v4List.get(8) + v5List.get(8) + v6List.get(8) + v7List.get(8)) > 0).as( + "No events conflated in batch").isTrue(); + }); verifySecondaryEventQueuesDrained(); vm2.invoke(() -> validateRegionSize(getTestMethodName(), 10)); @@ -148,15 +148,10 @@ public class ParallelWANConflationDUnitTest extends WANTestBase { private void verifySecondaryEventQueuesDrained() { await().untilAsserted(() -> { - int vm4SecondarySize = vm4.invoke(() -> getSecondaryQueueSizeInStats("ln")); - int vm5SecondarySize = vm5.invoke(() -> getSecondaryQueueSizeInStats("ln")); - int vm6SecondarySize = vm6.invoke(() -> getSecondaryQueueSizeInStats("ln")); - int vm7SecondarySize = vm7.invoke(() -> getSecondaryQueueSizeInStats("ln")); - - assertThat(vm4SecondarySize + vm5SecondarySize + vm6SecondarySize + vm7SecondarySize).as( - "Event in secondary queue should be 0 after dispatched, but actual is " + vm4SecondarySize - + ":" + vm5SecondarySize + ":" + vm6SecondarySize + ":" + vm7SecondarySize) - .isEqualTo(0); + assertThat(vm4.invoke(() -> getSecondaryQueueSizeInStats("ln"))).isZero(); + assertThat(vm5.invoke(() -> getSecondaryQueueSizeInStats("ln"))).isZero(); + assertThat(vm6.invoke(() -> getSecondaryQueueSizeInStats("ln"))).isZero(); + assertThat(vm7.invoke(() -> getSecondaryQueueSizeInStats("ln"))).isZero(); }); } @@ -217,38 +212,34 @@ public class ParallelWANConflationDUnitTest extends WANTestBase { } private void validateSecondaryEventQueueSize(int expectedNum, int redundancy) { - ArrayList<Integer> vm4List = - (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum)); - ArrayList<Integer> vm5List = - (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum)); - ArrayList<Integer> vm6List = - (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum)); - ArrayList<Integer> vm7List = - (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum)); - assertThat( - (vm4List.get(10) + vm5List.get(10) + vm6List.get(10) + vm7List.get(10)) == expectedNum - * redundancy).as( - "Event in secondary queue should be " + (expectedNum * redundancy) + ", but is " - + (vm4List.get(10) + vm5List.get(10) + vm6List.get(10) + vm7List.get(10))) - .isTrue(); + await().untilAsserted(() -> { + List<Integer> vm4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum)); + List<Integer> vm5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum)); + List<Integer> vm6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum)); + List<Integer> vm7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum)); + assertThat( + (vm4List.get(10) + vm5List.get(10) + vm6List.get(10) + vm7List.get(10)) == expectedNum + * redundancy).as( + "Event in secondary queue should be " + (expectedNum * redundancy) + ", but is " + + (vm4List.get(10) + vm5List.get(10) + vm6List.get(10) + vm7List.get(10))) + .isTrue(); + }); } private void validateEventsProcessedByPQRM(int expectedNum, int redundancy) { - ArrayList<Integer> vm4List = - (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - ArrayList<Integer> vm5List = - (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - ArrayList<Integer> vm6List = - (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - ArrayList<Integer> vm7List = - (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - assertThat( - (vm4List.get(11) + vm5List.get(11) + vm6List.get(11) + vm7List.get(11)) == expectedNum - * redundancy).as( - "Event processed by queue removal message should be " + (expectedNum * redundancy) - + ", but is " - + (vm4List.get(11) + vm5List.get(11) + vm6List.get(11) + vm7List.get(11))) - .isTrue(); + await().untilAsserted(() -> { + List<Integer> vm4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> vm5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> vm6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> vm7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + assertThat( + (vm4List.get(11) + vm5List.get(11) + vm6List.get(11) + vm7List.get(11)) == expectedNum + * redundancy).as( + "Event processed by queue removal message should be " + (expectedNum * redundancy) + + ", but is " + + (vm4List.get(11) + vm5List.get(11) + vm6List.get(11) + vm7List.get(11))) + .isTrue(); + }); } @Test diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java index 79f7afb438..ce7eb03b8b 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java @@ -102,38 +102,41 @@ public class ParallelWANStatsDUnitTest extends WANTestBase { createReceiverPR(vm2, 1); putKeyValues(); - ArrayList<Integer> v4List = - (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); - ArrayList<Integer> v5List = - (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); - ArrayList<Integer> v6List = - (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); - ArrayList<Integer> v7List = - (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); - - assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(NUM_PUTS); // queue - // size - assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo( - NUM_PUTS * 2); // eventsReceived - assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo( - NUM_PUTS * 2); // events queued - assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(0); // events - // distributed - assertThat(v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)).isEqualTo( - NUM_PUTS); // secondary queue size + await().untilAsserted(() -> { + List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + + // queue size + assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(NUM_PUTS); + // eventsReceived + assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo( + NUM_PUTS * 2); + // events queued + assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo( + NUM_PUTS * 2); + // events distributed + assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(0); + // secondary queue size + assertThat(v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)).isEqualTo( + NUM_PUTS); + + System.out.println("Current secondary queue sizes:" + v4List.get(10) + ":" + v5List.get(10) + + ":" + v6List.get(10) + ":" + v7List.get(10)); + }); // stop vm7 to trigger rebalance and move some primary buckets - System.out.println("Current secondary queue sizes:" + v4List.get(10) + ":" + v5List.get(10) - + ":" + v6List.get(10) + ":" + v7List.get(10)); vm7.invoke(WANTestBase::closeCache); await().untilAsserted(() -> { int v4secondarySize = vm4.invoke(() -> WANTestBase.getSecondaryQueueSizeInStats("ln")); int v5secondarySize = vm5.invoke(() -> WANTestBase.getSecondaryQueueSizeInStats("ln")); int v6secondarySize = vm6.invoke(() -> WANTestBase.getSecondaryQueueSizeInStats("ln")); assertThat(v4secondarySize + v5secondarySize + v6secondarySize).isEqualTo(NUM_PUTS); + System.out + .println("New secondary queue sizes:" + v4secondarySize + ":" + v5secondarySize + ":" + + v6secondarySize); }); - System.out.println("New secondary queue sizes:" + v4List.get(10) + ":" + v5List.get(10) + ":" - + v6List.get(10)); vm7.invoke(() -> WANTestBase.createCache(lnPort)); vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, true, false, null, true)); @@ -141,16 +144,17 @@ public class ParallelWANStatsDUnitTest extends WANTestBase { startSenderInVMs("ln", vm7); vm7.invoke(() -> pauseSender("ln")); - v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); - v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); - v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); - v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); - assertThat(v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)).isEqualTo( - NUM_PUTS); // secondary - // queue - // size - System.out.println("After restart vm7, secondary queue sizes:" + v4List.get(10) + ":" - + v5List.get(10) + ":" + v6List.get(10) + ":" + v7List.get(10)); + await().untilAsserted(() -> { + List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + // secondary queue size + assertThat(v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)).isEqualTo( + NUM_PUTS); + System.out.println("After restart vm7, secondary queue sizes:" + v4List.get(10) + ":" + + v5List.get(10) + ":" + v6List.get(10) + ":" + v7List.get(10)); + }); vm4.invoke(() -> WANTestBase.resumeSender("ln")); vm5.invoke(() -> WANTestBase.resumeSender("ln")); @@ -162,15 +166,17 @@ public class ParallelWANStatsDUnitTest extends WANTestBase { vm4.invoke(() -> WANTestBase.checkQueueSize("ln", 0)); - v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - - // events distributed: - assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(NUM_PUTS); - // secondary queue size: - assertThat(v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)).isEqualTo(0); + await().untilAsserted(() -> { + List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + + // events distributed: + assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(NUM_PUTS); + // secondary queue size: + assertThat(v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)).isEqualTo(0); + }); } // TODO: add a test without redundancy for primary switch @@ -192,28 +198,26 @@ public class ParallelWANStatsDUnitTest extends WANTestBase { putKeyValues(); - ArrayList<Integer> v4List = - (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); - ArrayList<Integer> v5List = - (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); - ArrayList<Integer> v6List = - (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); - ArrayList<Integer> v7List = - (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); - - // queue size: - assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(NUM_PUTS); - // events received: - assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo( - NUM_PUTS * 2); - // events queued: - assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo( - NUM_PUTS * 2); - // events distributed: - assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(0); - // secondary queue size: - assertThat(v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)).isEqualTo( - NUM_PUTS); + await().untilAsserted(() -> { + List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + + // queue size: + assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(NUM_PUTS); + // events received: + assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo( + NUM_PUTS * 2); + // events queued: + assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo( + NUM_PUTS * 2); + // events distributed: + assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(0); + // secondary queue size: + assertThat(v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)).isEqualTo( + NUM_PUTS); + }); vm4.invoke(() -> WANTestBase.resumeSender("ln")); vm5.invoke(() -> WANTestBase.resumeSender("ln")); @@ -224,15 +228,17 @@ public class ParallelWANStatsDUnitTest extends WANTestBase { vm4.invoke(() -> WANTestBase.checkQueueSize("ln", 0)); - v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - - // events distributed: - assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(NUM_PUTS); - // secondary queue size: - assertThat(v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)).isEqualTo(0); + await().untilAsserted(() -> { + List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + + // events distributed: + assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(NUM_PUTS); + // secondary queue size: + assertThat(v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)).isEqualTo(0); + }); } @Test @@ -254,27 +260,25 @@ public class ParallelWANStatsDUnitTest extends WANTestBase { putKeyValues(); - ArrayList<Integer> v4List = - (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); - ArrayList<Integer> v5List = - (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); - ArrayList<Integer> v6List = - (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); - ArrayList<Integer> v7List = - (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); - - // queue size: - assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(NUM_PUTS); - // events received: - assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(NUM_PUTS); - // events queued: - assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(NUM_PUTS); - // events distributed - assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(0); - // batches distributed: - assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4)).isEqualTo(0); - // batches redistributed - assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0); + await().untilAsserted(() -> { + List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS)); + + // queue size: + assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(NUM_PUTS); + // events received: + assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(NUM_PUTS); + // events queued: + assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(NUM_PUTS); + // events distributed + assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(0); + // batches distributed: + assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4)).isEqualTo(0); + // batches redistributed + assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0); + }); } @Test @@ -297,27 +301,25 @@ public class ParallelWANStatsDUnitTest extends WANTestBase { vm2.invoke(() -> WANTestBase.validateRegionSize(testName, NUM_PUTS)); - ArrayList<Integer> v4List = - (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - ArrayList<Integer> v5List = - (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - ArrayList<Integer> v6List = - (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - ArrayList<Integer> v7List = - (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - - // queue size: - assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0); - // eventsReceived: - assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(NUM_PUTS); - // events queued: - assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(NUM_PUTS); - // events distributed: - assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(NUM_PUTS); - // batches distributed: - assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10).isTrue(); - // batches redistributed: - assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0); + await().untilAsserted(() -> { + List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + + // queue size: + assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0); + // eventsReceived: + assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(NUM_PUTS); + // events queued: + assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(NUM_PUTS); + // events distributed: + assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(NUM_PUTS); + // batches distributed: + assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10).isTrue(); + // batches redistributed: + assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0); + }); vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS)); } @@ -369,27 +371,25 @@ public class ParallelWANStatsDUnitTest extends WANTestBase { vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, transactions)); vm4.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, transactions * 3)); - ArrayList<Integer> v4List = - (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - ArrayList<Integer> v5List = - (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - ArrayList<Integer> v6List = - (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - ArrayList<Integer> v7List = - (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - - // queue size: - assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0); - // eventsReceived: - assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(entries); - // events queued: - assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(entries); - // events distributed: - assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(entries); - // batches distributed: - assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4)).isEqualTo(2); - // batches redistributed: - assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0); + await().untilAsserted(() -> { + List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + + // queue size: + assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0); + // eventsReceived: + assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(entries); + // events queued: + assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(entries); + // events distributed: + assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(entries); + // batches distributed: + assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4)).isEqualTo(2); + // batches redistributed: + assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0); + }); } @Test @@ -496,26 +496,24 @@ public class ParallelWANStatsDUnitTest extends WANTestBase { private void checkQueuesAreEmptyAndOnlyCompleteTransactionsAreReplicated( boolean isBatchesRedistributed) { - ArrayList<Integer> v4List = - (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - ArrayList<Integer> v5List = - (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - ArrayList<Integer> v6List = - (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - ArrayList<Integer> v7List = - (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - - // queue size: - assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0); - // batches redistributed: - int batchesRedistributed = v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5); - if (isBatchesRedistributed) { - assertThat(batchesRedistributed).isGreaterThan(0); - } else { - assertThat(batchesRedistributed).isEqualTo(0); - } - // batches with incomplete transactions - assertThat(v4List.get(13) + v5List.get(13) + v6List.get(13) + v7List.get(7)).isEqualTo(0); + await().untilAsserted(() -> { + List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + + // queue size: + assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0); + // batches redistributed: + int batchesRedistributed = v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5); + if (isBatchesRedistributed) { + assertThat(batchesRedistributed).isGreaterThan(0); + } else { + assertThat(batchesRedistributed).isEqualTo(0); + } + // batches with incomplete transactions + assertThat(v4List.get(13) + v5List.get(13) + v6List.get(13) + v7List.get(7)).isEqualTo(0); + }); vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); @@ -577,31 +575,29 @@ public class ParallelWANStatsDUnitTest extends WANTestBase { vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, transactions)); vm4.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, transactions * 3)); - ArrayList<Integer> v4List = - (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - ArrayList<Integer> v5List = - (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - ArrayList<Integer> v6List = - (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - ArrayList<Integer> v7List = - (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - - // queue size: - assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0); - // eventsReceived: - assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(entries); - // events queued: - assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(entries); - // events distributed: - assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(entries); - // batches distributed: - assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4)).isEqualTo(1); - // batches redistributed: - assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0); - // events not queued conflated: - assertThat(v4List.get(7) + v5List.get(7) + v6List.get(7) + v7List.get(7)).isEqualTo(0); - // batches with incomplete transactions - assertThat((int) v4List.get(13)).isEqualTo(0); + await().untilAsserted(() -> { + List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + + // queue size: + assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0); + // eventsReceived: + assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(entries); + // events queued: + assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(entries); + // events distributed: + assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(entries); + // batches distributed: + assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4)).isEqualTo(1); + // batches redistributed: + assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0); + // events not queued conflated: + assertThat(v4List.get(7) + v5List.get(7) + v6List.get(7) + v7List.get(7)).isEqualTo(0); + // batches with incomplete transactions + assertThat((int) v4List.get(13)).isEqualTo(0); + }); } @@ -641,29 +637,31 @@ public class ParallelWANStatsDUnitTest extends WANTestBase { vm4.invoke(() -> WANTestBase.validateRegionSize(testName, entries)); - ArrayList<Integer> v4List = - (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - // The number of batches will be 4 because each // dispatcher thread (there are 2) will send half the number of entries, // each on 2 batches. - int batches = 4; - // queue size: - assertThat((int) v4List.get(0)).isEqualTo(0); - // eventsReceived: - assertThat((int) v4List.get(1)).isEqualTo(entries); - // events queued: - assertThat((int) v4List.get(2)).isEqualTo(entries); - // events distributed: - assertThat((int) v4List.get(3)).isEqualTo(entries); - // batches distributed: - assertThat((int) v4List.get(4)).isEqualTo(batches); - // batches redistributed: - assertThat((int) v4List.get(5)).isEqualTo(0); - // events not queued conflated: - assertThat((int) v4List.get(7)).isEqualTo(0); - // batches with incomplete transactions - assertThat((int) v4List.get(13)).isEqualTo(batches); + final int batches = 4; + + await().untilAsserted(() -> { + List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + + // queue size: + assertThat((int) v4List.get(0)).isEqualTo(0); + // eventsReceived: + assertThat((int) v4List.get(1)).isEqualTo(entries); + // events queued: + assertThat((int) v4List.get(2)).isEqualTo(entries); + // events distributed: + assertThat((int) v4List.get(3)).isEqualTo(entries); + // batches distributed: + assertThat((int) v4List.get(4)).isEqualTo(batches); + // batches redistributed: + assertThat((int) v4List.get(5)).isEqualTo(0); + // events not queued conflated: + assertThat((int) v4List.get(7)).isEqualTo(0); + // batches with incomplete transactions + assertThat((int) v4List.get(13)).isEqualTo(batches); + }); vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(batches, entries, entries)); } @@ -718,21 +716,22 @@ public class ParallelWANStatsDUnitTest extends WANTestBase { createReceiverInVMs(vm2); - ArrayList<Integer> v4List = - (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - - // queue size: - assertThat((int) v4List.get(0)).isEqualTo(0); - // events received: - assertThat((int) v4List.get(1)).isEqualTo(entries); - // events queued: - assertThat((int) v4List.get(2)).isEqualTo(entries); - // events distributed: - assertThat((int) v4List.get(3)).isEqualTo(entries); - // batches distributed: - assertThat((int) v4List.get(4)).isEqualTo(3); - // batches redistributed: - assertThat((v4List.get(5)) > 0).as("Batch was not redistributed").isTrue(); + await().untilAsserted(() -> { + List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + + // queue size: + assertThat((int) v4List.get(0)).isEqualTo(0); + // events received: + assertThat((int) v4List.get(1)).isEqualTo(entries); + // events queued: + assertThat((int) v4List.get(2)).isEqualTo(entries); + // events distributed: + assertThat((int) v4List.get(3)).isEqualTo(entries); + // batches distributed: + assertThat((int) v4List.get(4)).isEqualTo(3); + // batches redistributed: + assertThat((v4List.get(5)) > 0).as("Batch was not redistributed").isTrue(); + }); } @Test @@ -793,23 +792,24 @@ public class ParallelWANStatsDUnitTest extends WANTestBase { createReceiverInVMs(vm2); - ArrayList<Integer> v4List = - (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - - // queue size: - assertThat((int) v4List.get(0)).isEqualTo(0); - // events received: - assertThat((int) v4List.get(1)).isEqualTo(entries); - // events queued: - assertThat((int) v4List.get(2)).isEqualTo(entries); - // events distributed: - assertThat((int) v4List.get(3)).isEqualTo(entries); - // batches distributed: - assertThat((int) v4List.get(4)).isEqualTo(2); - // batches redistributed: - assertThat((v4List.get(5)) > 0).as("Batch was not redistributed").isTrue(); - // events not queued conflated: - assertThat((int) v4List.get(7)).isEqualTo(0); + await().untilAsserted(() -> { + List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + + // queue size: + assertThat((int) v4List.get(0)).isEqualTo(0); + // events received: + assertThat((int) v4List.get(1)).isEqualTo(entries); + // events queued: + assertThat((int) v4List.get(2)).isEqualTo(entries); + // events distributed: + assertThat((int) v4List.get(3)).isEqualTo(entries); + // batches distributed: + assertThat((int) v4List.get(4)).isEqualTo(2); + // batches redistributed: + assertThat((v4List.get(5)) > 0).as("Batch was not redistributed").isTrue(); + // events not queued conflated: + assertThat((int) v4List.get(7)).isEqualTo(0); + }); } @Test @@ -832,27 +832,25 @@ public class ParallelWANStatsDUnitTest extends WANTestBase { vm2.invoke(() -> WANTestBase.validateRegionSize(testName, NUM_PUTS)); - ArrayList<Integer> v4List = - (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - ArrayList<Integer> v5List = - (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - ArrayList<Integer> v6List = - (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - ArrayList<Integer> v7List = - (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - - // queue size: - assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0); - // events received: - assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(400); - // events queued: - assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(400); - // events distributed - assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(NUM_PUTS); - // batches distributed: - assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10).isTrue(); - // batches redistributed: - assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0); + await().untilAsserted(() -> { + List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + + // queue size: + assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0); + // events received: + assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(400); + // events queued: + assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(400); + // events distributed + assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(NUM_PUTS); + // batches distributed: + assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10).isTrue(); + // batches redistributed: + assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0); + }); vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS)); } @@ -888,24 +886,25 @@ public class ParallelWANStatsDUnitTest extends WANTestBase { vm2.invoke(() -> WANTestBase.validateRegionSize(testName, NUM_PUTS)); vm3.invoke(() -> WANTestBase.validateRegionSize(testName, NUM_PUTS)); - ArrayList<Integer> v4Sender1List = - (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln1", 0)); - ArrayList<Integer> v4Sender2List = - (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln2", 0)); - - assertThat(v4Sender1List.get(0).intValue()).isEqualTo(0); // queue size - assertThat(v4Sender1List.get(1).intValue()).isEqualTo(NUM_PUTS); // eventsReceived - assertThat(v4Sender1List.get(2).intValue()).isEqualTo(NUM_PUTS); // events queued - assertThat(v4Sender1List.get(3).intValue()).isEqualTo(NUM_PUTS); // events distributed - assertThat(v4Sender1List.get(4) >= 10).isTrue(); // batches distributed - assertThat(v4Sender1List.get(5).intValue()).isEqualTo(0); // batches redistributed - - assertThat(v4Sender2List.get(0).intValue()).isEqualTo(0); // queue size - assertThat(v4Sender2List.get(1).intValue()).isEqualTo(NUM_PUTS); // eventsReceived - assertThat(v4Sender2List.get(2).intValue()).isEqualTo(NUM_PUTS); // events queued - assertThat(v4Sender2List.get(3).intValue()).isEqualTo(NUM_PUTS); // events distributed - assertThat(v4Sender2List.get(4) >= 10).isTrue(); // batches distributed - assertThat(v4Sender2List.get(5).intValue()).isEqualTo(0); // batches redistributed + await().untilAsserted(() -> { + List<Integer> v4Sender1List = vm4.invoke(() -> WANTestBase.getSenderStats("ln1", 0)); + assertThat(v4Sender1List.get(0).intValue()).isEqualTo(0); // queue size + assertThat(v4Sender1List.get(1).intValue()).isEqualTo(NUM_PUTS); // eventsReceived + assertThat(v4Sender1List.get(2).intValue()).isEqualTo(NUM_PUTS); // events queued + assertThat(v4Sender1List.get(3).intValue()).isEqualTo(NUM_PUTS); // events distributed + assertThat(v4Sender1List.get(4) >= 10).isTrue(); // batches distributed + assertThat(v4Sender1List.get(5).intValue()).isEqualTo(0); // batches redistributed + }); + + await().untilAsserted(() -> { + List<Integer> v4Sender2List = vm4.invoke(() -> WANTestBase.getSenderStats("ln2", 0)); + assertThat(v4Sender2List.get(0).intValue()).isEqualTo(0); // queue size + assertThat(v4Sender2List.get(1).intValue()).isEqualTo(NUM_PUTS); // eventsReceived + assertThat(v4Sender2List.get(2).intValue()).isEqualTo(NUM_PUTS); // events queued + assertThat(v4Sender2List.get(3).intValue()).isEqualTo(NUM_PUTS); // events distributed + assertThat(v4Sender2List.get(4) >= 10).isTrue(); // batches distributed + assertThat(v4Sender2List.get(5).intValue()).isEqualTo(0); // batches redistributed + }); vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS)); vm3.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS)); @@ -944,25 +943,27 @@ public class ParallelWANStatsDUnitTest extends WANTestBase { vm2.invoke(() -> WANTestBase.validateRegionSize(testName, 1000)); - ArrayList<Integer> v5List = - (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - ArrayList<Integer> v6List = - (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - ArrayList<Integer> v7List = - (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - - assertThat(v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0); // queue size - int receivedEvents = v5List.get(1) + v6List.get(1) + v7List.get(1); - // We may see a single retried event on all members due to the kill - assertThat(3000 <= receivedEvents && 3003 >= receivedEvents).as("Received " + receivedEvents) - .isTrue(); // eventsReceived - int queuedEvents = v5List.get(2) + v6List.get(2) + v7List.get(2); - assertThat(3000 <= queuedEvents && 3003 >= queuedEvents).as("Queued " + queuedEvents).isTrue(); // eventsQueued - // assertTrue(10000 <= v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed : - // its quite possible that vm4 has distributed some of the events - // assertTrue(v5List.get(4) + v6List.get(4) + v7List.get(4) > 1000); //batches distributed : its - // quite possible that vm4 has distributed some of the batches. - assertThat(v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0); // batches redistributed + await().untilAsserted(() -> { + List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + + assertThat(v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0); // queue size + int receivedEvents = v5List.get(1) + v6List.get(1) + v7List.get(1); + // We may see a single retried event on all members due to the kill + assertThat(3000 <= receivedEvents && 3003 >= receivedEvents).as("Received " + receivedEvents) + .isTrue(); // eventsReceived + int queuedEvents = v5List.get(2) + v6List.get(2) + v7List.get(2); + assertThat(3000 <= queuedEvents && 3003 >= queuedEvents).as("Queued " + queuedEvents) + .isTrue(); // eventsQueued + // assertTrue(10000 <= v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed : + // its quite possible that vm4 has distributed some of the events + // assertTrue(v5List.get(4) + v6List.get(4) + v7List.get(4) > 1000); //batches distributed : + // its + // quite possible that vm4 has distributed some of the batches. + assertThat(v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0); // batches + // redistributed + }); vm2.invoke(() -> WANTestBase.checkGatewayReceiverStatsHA(NUM_PUTS, 1000, 1000)); } @@ -1012,22 +1013,23 @@ public class ParallelWANStatsDUnitTest extends WANTestBase { vm2.invoke(() -> WANTestBase.validateRegionSize(testName, 2000)); - ArrayList<Integer> v5List = - (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - ArrayList<Integer> v6List = - (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - ArrayList<Integer> v7List = - (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - - assertThat(v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0); // queue size - int receivedEvents = v5List.get(1) + v6List.get(1) + v7List.get(1); - // We may see two retried events (as transactions are made of 2 events) on all members due to - // the kill - assertThat(6000 <= receivedEvents && 6006 >= receivedEvents).as("Received " + receivedEvents) - .isTrue(); // eventsReceived - int queuedEvents = v5List.get(2) + v6List.get(2) + v7List.get(2); - assertThat(6000 <= queuedEvents && 6006 >= queuedEvents).as("Queued " + queuedEvents).isTrue(); // eventsQueued - assertThat(v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0); // batches redistributed + await().untilAsserted(() -> { + List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + + assertThat(v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0); // queue size + int receivedEvents = v5List.get(1) + v6List.get(1) + v7List.get(1); + // We may see two retried events (as transactions are made of 2 events) on all members due to + // the kill + assertThat(6000 <= receivedEvents && 6006 >= receivedEvents).as("Received " + receivedEvents) + .isTrue(); // eventsReceived + int queuedEvents = v5List.get(2) + v6List.get(2) + v7List.get(2); + assertThat(6000 <= queuedEvents && 6006 >= queuedEvents).as("Queued " + queuedEvents) + .isTrue(); // eventsQueued + assertThat(v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0); // batches + // redistributed + }); // batchesReceived is equal to numberOfEntries/(batchSize+1) // As transactions are 2 events long, for each batch it will always be necessary to @@ -1065,19 +1067,17 @@ public class ParallelWANStatsDUnitTest extends WANTestBase { // verify that all is well in local site. All the events should be present in local region vm4.invoke(() -> WANTestBase.validateRegionSize(testName, 2000)); - ArrayList<Integer> v4List = - (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1)); - ArrayList<Integer> v5List = - (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1)); - ArrayList<Integer> v6List = - (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", -1)); - ArrayList<Integer> v7List = - (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", -1)); - - // batches distributed: it's quite possible that vm4 has distributed some of the batches. - assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 1).isTrue(); - // batches redistributed: - assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5) >= 1).isTrue(); + await().untilAsserted(() -> { + List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1)); + List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1)); + List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", -1)); + List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", -1)); + + // batches distributed: it's quite possible that vm4 has distributed some of the batches. + assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 1).isTrue(); + // batches redistributed: + assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5) >= 1).isTrue(); + }); } @Test @@ -1111,29 +1111,28 @@ public class ParallelWANStatsDUnitTest extends WANTestBase { vm2.invoke(() -> WANTestBase.validateRegionSize(testName, 800)); - ArrayList<Integer> v4List = - (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - ArrayList<Integer> v5List = - (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - ArrayList<Integer> v6List = - (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - ArrayList<Integer> v7List = - (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - - // queue size: - assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0); - // events received: - assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(1000); - // events queued: - assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(900); - // events distributed: - assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(800); - // batches distributed: - assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 80).isTrue(); - // batches redistributed: - assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0); - // events filtered: - assertThat(v4List.get(6) + v5List.get(6) + v6List.get(6) + v7List.get(6)).isEqualTo(200); + await().untilAsserted(() -> { + List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + + // queue size: + assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0); + // events received: + assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(1000); + // events queued: + assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(900); + // events distributed: + assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(800); + // batches distributed: + assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 80).isTrue(); + // batches redistributed: + assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0); + // events filtered: + assertThat(v4List.get(6) + v5List.get(6) + v6List.get(6) + v7List.get(6)).isEqualTo(200); + }); + vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(80, 800, 800)); } @@ -1198,28 +1197,30 @@ public class ParallelWANStatsDUnitTest extends WANTestBase { vm4.invoke(() -> WANTestBase.checkQueueSize("ln", 0)); - List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - - // Verify final stats - // 0 -> eventQueueSize - // 1 -> eventsReceived - // 2 -> eventsQueued - // 3 -> eventsDistributed - // 4 -> batchesDistributed - // 5 -> batchesRedistributed - // 7 -> eventsNotQueuedConflated - // 9 -> conflationIndexesMapSize - assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0); - assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(200); - assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(200); - assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(150); - assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10).isTrue(); - assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0); - assertThat(v4List.get(7) + v5List.get(7) + v6List.get(7) + v7List.get(7)).isEqualTo(50); - assertThat(v4List.get(9) + v5List.get(9) + v6List.get(9) + v7List.get(9)).isEqualTo(0); + await().untilAsserted(() -> { + List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + + // Verify final stats + // 0 -> eventQueueSize + // 1 -> eventsReceived + // 2 -> eventsQueued + // 3 -> eventsDistributed + // 4 -> batchesDistributed + // 5 -> batchesRedistributed + // 7 -> eventsNotQueuedConflated + // 9 -> conflationIndexesMapSize + assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0); + assertThat(v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)).isEqualTo(200); + assertThat(v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)).isEqualTo(200); + assertThat(v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)).isEqualTo(150); + assertThat(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10).isTrue(); + assertThat(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)).isEqualTo(0); + assertThat(v4List.get(7) + v5List.get(7) + v6List.get(7) + v7List.get(7)).isEqualTo(50); + assertThat(v4List.get(9) + v5List.get(9) + v6List.get(9) + v7List.get(9)).isEqualTo(0); + }); } @Test @@ -1441,12 +1442,14 @@ public class ParallelWANStatsDUnitTest extends WANTestBase { } private void verifyConflationIndexesSize(String senderId, int expectedSize, VM... vms) { - int actualSize = 0; - for (VM vm : vms) { - List<Integer> stats = vm.invoke(() -> WANTestBase.getSenderStats(senderId, -1)); - actualSize += stats.get(9); - } - assertThat(actualSize).isEqualTo(expectedSize); + await().untilAsserted(() -> { + int actualSize = 0; + for (VM vm : vms) { + List<Integer> stats = vm.invoke(() -> WANTestBase.getSenderStats(senderId, -1)); + actualSize += stats.get(9); + } + assertThat(actualSize).isEqualTo(expectedSize); + }); } private void putSameEntry(String regionName, int numIterations) { diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java index 7fc0d15fd9..aac27e1393 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java @@ -21,7 +21,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; import java.io.File; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; @@ -177,12 +176,13 @@ public class SerialGatewaySenderQueueDUnitTest extends WANTestBase { vm4.invoke(() -> WANTestBase.pauseSender("ln")); vm6.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000)); - ArrayList<Integer> v4List = - (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 1000)); - ArrayList<Integer> v5List = - (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 1000)); - // secondary queue size stats in serial queue should be 0 - assertThat(v4List.get(10) + v5List.get(10)).isEqualTo(0); + + await().untilAsserted(() -> { + List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 1000)); + List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 1000)); + // secondary queue size stats in serial queue should be 0 + assertThat(v4List.get(10) + v5List.get(10)).isEqualTo(0); + }); Map<String, List<?>> primarySenderUpdates = vm4.invoke(WANTestBase::checkQueue); Map<String, List<?>> secondarySenderUpdates = vm5.invoke(WANTestBase::checkQueue); diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java index 4e5358b478..53d52b7f49 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java @@ -26,8 +26,8 @@ import java.io.File; import java.io.IOException; import java.io.UncheckedIOException; import java.nio.file.Files; -import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Properties; @@ -202,17 +202,15 @@ public class SerialWANConflationDUnitTest extends WANTestBase { vm6.invoke(() -> resumeSender("ln")); vm7.invoke(() -> resumeSender("ln")); - ArrayList<Integer> v4List = - (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - ArrayList<Integer> v5List = - (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - ArrayList<Integer> v6List = - (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - ArrayList<Integer> v7List = - (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - - assertThat((v4List.get(8) + v5List.get(8) + v6List.get(8) + v7List.get(8)) > 0).as( - "No events conflated in batch").isTrue(); + await().untilAsserted(() -> { + List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + + assertThat((v4List.get(8) + v5List.get(8) + v6List.get(8) + v7List.get(8)) > 0).as( + "No events conflated in batch").isTrue(); + }); } @Test @@ -256,28 +254,28 @@ public class SerialWANConflationDUnitTest extends WANTestBase { vm4.invoke(() -> putGivenKeyValue(getTestMethodName(), keyValues)); } - ArrayList<Integer> v4List = - (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 20)); - assertThat((int) v4List.get(0)).as( - "After conflation during enqueue, there should be only 20 events").isEqualTo(20); + await().untilAsserted(() -> { + List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 20)); + assertThat((int) v4List.get(0)).as( + "After conflation during enqueue, there should be only 20 events").isEqualTo(20); + }); vm4.invoke(() -> resumeSender("ln")); vm5.invoke(() -> resumeSender("ln")); vm6.invoke(() -> resumeSender("ln")); vm7.invoke(() -> resumeSender("ln")); - v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - ArrayList<Integer> v5List = - (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - ArrayList<Integer> v6List = - (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - ArrayList<Integer> v7List = - (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - - assertThat((v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10))).as( - "No events in secondary queue stats since it's serial sender").isEqualTo(0); - assertThat((v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2))).as( - "Total queued events should be 100").isEqualTo(100); + await().untilAsserted(() -> { + List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + + assertThat((v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10))).as( + "No events in secondary queue stats since it's serial sender").isEqualTo(0); + assertThat((v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2))).as( + "Total queued events should be 100").isEqualTo(100); + }); vm2.invoke(() -> validateRegionSize(getTestMethodName(), 10)); diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java index de4a19abaa..a027fdd274 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java @@ -921,28 +921,26 @@ public class SerialWANStatsDUnitTest extends WANTestBase { private void checkQueuesAreEmptyAndOnlyCompleteTransactionsAreReplicated( boolean isBatchesRedistributed) { - // Wait for sender queues to be empty - List<Integer> v4List = - vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - List<Integer> v5List = - vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - List<Integer> v6List = - vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - List<Integer> v7List = - vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - - // queue size must be 0 - assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0); - - // batches redistributed: - int batchesRedistributed = v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5); - if (isBatchesRedistributed) { - assertThat(batchesRedistributed).isGreaterThan(0); - } else { - assertThat(batchesRedistributed).isEqualTo(0); - } + await().untilAsserted(() -> { + // Wait for sender queues to be empty + List<Integer> v4List = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v5List = vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v6List = vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v7List = vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + + // queue size must be 0 + assertThat(v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)).isEqualTo(0); + + // batches redistributed: + int batchesRedistributed = v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5); + if (isBatchesRedistributed) { + assertThat(batchesRedistributed).isGreaterThan(0); + } else { + assertThat(batchesRedistributed).isEqualTo(0); + } - // batches with incomplete transactions must be 0 - assertThat(v4List.get(13) + v5List.get(13) + v6List.get(13) + v7List.get(13)).isEqualTo(0); + // batches with incomplete transactions must be 0 + assertThat(v4List.get(13) + v5List.get(13) + v6List.get(13) + v7List.get(13)).isEqualTo(0); + }); } }
