[ https://issues.apache.org/jira/browse/GEODE-8491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17195683#comment-17195683 ]
ASF GitHub Bot commented on GEODE-8491: --------------------------------------- DonalEvans commented on a change in pull request #5509: URL: https://github.com/apache/geode/pull/5509#discussion_r488130840 ########## File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java ########## @@ -1646,6 +1646,20 @@ public static void resumeSender(String senderId) { } } + public static void stopSenderInVMsAsync(String senderId, VM... vms) { + List<AsyncInvocation> tasks = new LinkedList<>(); + for (VM vm : vms) { + tasks.add(vm.invokeAsync(() -> stopSender(senderId))); + } + for (AsyncInvocation invocation : tasks) { Review comment: The IDE warnings here can be resolved by using `List<AsyncInvocation<Void>>`. ########## File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java ########## @@ -370,69 +371,312 @@ public void testParallelPropagationLoopBack3SitesNtoNTopologyPutFromOneDS() { */ @Test public void unstartedSenderShouldNotAddReceivedEventsIntoTmpDropped() throws Exception { - Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); - Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); // create receiver on site-ln and site-ny createCacheInVMs(lnPort, vm2, vm4); createReceiverInVMs(vm2, vm4); createCacheInVMs(nyPort, vm3, vm5); createReceiverInVMs(vm3, vm5); - // create senders on site-ln, Note: sender-id is its destination, i.e. ny + // create senders on site-ny, Note: sender-id is its destination, i.e. ny vm2.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, true)); vm4.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, true)); - // create senders on site-ny, Note: sender-id is its destination, i.e. ln + // create senders on site-ln, Note: sender-id is its destination, i.e. ln vm3.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); - // create PR on site-ln + // create PR on site-ny vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, isOffHeap())); vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, isOffHeap())); - // create PR on site-ny + // create PR on site-ln vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap())); vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap())); - // start sender on site-ln + // start sender on site-ny startSenderInVMs("ny", vm2, vm4); - // Do 100 puts on site-ln - vm2.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 100)); - // verify site-ny received the 100 events + // do 100 puts on site-ln + vm3.invoke(() -> WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 0, 100)); + + // verify site-ny have 100 entries vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); - // verify tmpDroppedEvents should be 0 at site-ny - vm3.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); - vm5.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); - - // do next 100 puts on site-ny - vm3.invoke(() -> WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 100, 200)); - - // verify site-ny have 200 entries - vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 200)); - vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 200)); - - // verify tmpDroppedEvents should be 100 at site-ny, because the sender is not started yet + // verify tmpDroppedEvents should be 100 at site-ln, because the sender is not started yet vm3.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 100)); vm5.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 100)); // verify site-ln has not received the events from site-ny yet - vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); - vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); + vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); - // start sender on site-ny + // start sender on site-ln startSenderInVMsAsync("ln", vm3, vm5); // verify tmpDroppedEvents should be 0 now at site-ny vm3.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); vm5.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); + + vm3.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + } + + /** + * Test that gateway sender's secondary queues do not keep dropped events + * by the primary gateway sender received while it was starting but was not + * started yet, after the primary finishes starting. + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=true + * LN site's sender's manual-start=true + * + * put some events from LN and start the sender in NY simultaneously + * Make sure there are no events in tmpDroppedEvents and the queues are drained. + */ + @Test + public void startedSenderReceivingEventsWhileStartingShouldDrainQueues() + throws Exception { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + + createCacheInVMs(lnPort, vm2, vm4); + createReceiverInVMs(vm2, vm4); + createCacheInVMs(nyPort, vm3, vm5); + createReceiverInVMs(vm3, vm5); + + vm2.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, true)); + vm4.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, true)); + + vm3.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + + AsyncInvocation inv = + vm2.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 1000)); + startSenderInVMsAsync("ny", vm2, vm4); + inv.join(); + + vm2.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + vm4.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + + vm2.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); } + /** + * Test that gateway sender's secondary queues do not keep dropped events + * by the primary gateway sender received while it was stopping after it is started again. + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=false + * LN site's sender's manual-start=false + * + * put some events from LN and stop the sender in NY simultaneously + * Start the sender in NY. + * Make sure there are no events in tmpDroppedEvents and the queues are drained. + */ + @Test + public void startedSenderReceivingEventsWhileStoppingShouldDrainQueues() + throws Exception { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + + createCacheInVMs(lnPort, vm2, vm4); + createReceiverInVMs(vm2, vm4); + createCacheInVMs(nyPort, vm3, vm5); + createReceiverInVMs(vm3, vm5); + + vm2.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, false)); + vm4.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, false)); + + vm3.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, false)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, false)); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + + AsyncInvocation inv = Review comment: The IDE warning here can be resolved by using `List<AsyncInvocation<Void>>`. ########## File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationsFeatureDUnitTest.java ########## @@ -297,4 +298,322 @@ public void testReplicatedSerialPropagationWithFilter_AfterAck() throws Exceptio vm4.invoke(() -> WANTestBase.waitForConcurrentSerialSenderQueueToDrain("ln")); vm5.invoke(() -> WANTestBase.waitForConcurrentSerialSenderQueueToDrain("ln")); } + + /** + * Test unstarted sender + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=true + * + * Make sure the events are sent from LN to NY and will not be added into tmpDroppedEvents + * while normal events put from NY site can still be added to tmpDroppedEvents + * Start the sender, make sure the events in tmpDroppedEvents are sent to LN finally + */ + @Test + public void unstartedSenderShouldNotAddReceivedEventsIntoTmpDropped() throws Exception { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + + // create receiver on site-ln and site-ny + createCacheInVMs(lnPort, vm2, vm4); + createReceiverInVMs(vm2, vm4); + createCacheInVMs(nyPort, vm3, vm5); + createReceiverInVMs(vm3, vm5); + + // create senders on site-ny, Note: sender-id is its destination, i.e. ny + vm2.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, true)); + vm4.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, true)); + + // create senders on site-ln, Note: sender-id is its destination, i.e. ln + vm3.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true)); + + // create PR on site-ny + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + + // create PR on site-ln + vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + + // start sender on site-ny + startSenderInVMs("ny", vm2, vm4); + + // do 100 puts on site-ln + vm3.invoke(() -> WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 0, 100)); + + // verify site-ny have 100 entries + vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); + vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); + + // verify site-ln has not received the events from site-ny yet + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); + vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); + + // start sender on site-ln + startSenderInVMsAsync("ln", vm3, vm5); + + // verify tmpDroppedEvents should be 0 now at site-ny + vm3.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); + vm5.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); + + vm3.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + } + + /** + * Test that gateway sender's secondary queues do not keep dropped events + * by the primary gateway sender received while it was starting but was not + * started yet, after the primary finishes starting. + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=true + * LN site's sender's manual-start=true + * + * put some events from LN and start the sender in NY simultaneously + * Make sure there are no events in tmpDroppedEvents and the queues are drained. + */ + @Test + public void startedSenderReceivingEventsWhileStartingShouldDrainQueues() + throws Exception { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + + createCacheInVMs(lnPort, vm2, vm4); + createReceiverInVMs(vm2, vm4); + createCacheInVMs(nyPort, vm3, vm5); + createReceiverInVMs(vm3, vm5); + + vm2.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, true)); + vm4.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, true)); + + vm3.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true)); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + + AsyncInvocation inv = Review comment: The IDE warning here can be resolved by using `List<AsyncInvocation<Void>>`. ########## File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationsFeatureDUnitTest.java ########## @@ -297,4 +298,322 @@ public void testReplicatedSerialPropagationWithFilter_AfterAck() throws Exceptio vm4.invoke(() -> WANTestBase.waitForConcurrentSerialSenderQueueToDrain("ln")); vm5.invoke(() -> WANTestBase.waitForConcurrentSerialSenderQueueToDrain("ln")); } + + /** + * Test unstarted sender + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=true + * + * Make sure the events are sent from LN to NY and will not be added into tmpDroppedEvents + * while normal events put from NY site can still be added to tmpDroppedEvents + * Start the sender, make sure the events in tmpDroppedEvents are sent to LN finally + */ + @Test + public void unstartedSenderShouldNotAddReceivedEventsIntoTmpDropped() throws Exception { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + + // create receiver on site-ln and site-ny + createCacheInVMs(lnPort, vm2, vm4); + createReceiverInVMs(vm2, vm4); + createCacheInVMs(nyPort, vm3, vm5); + createReceiverInVMs(vm3, vm5); + + // create senders on site-ny, Note: sender-id is its destination, i.e. ny + vm2.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, true)); + vm4.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, true)); + + // create senders on site-ln, Note: sender-id is its destination, i.e. ln + vm3.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true)); + + // create PR on site-ny + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + + // create PR on site-ln + vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + + // start sender on site-ny + startSenderInVMs("ny", vm2, vm4); + + // do 100 puts on site-ln + vm3.invoke(() -> WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 0, 100)); + + // verify site-ny have 100 entries + vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); + vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); + + // verify site-ln has not received the events from site-ny yet + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); + vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); + + // start sender on site-ln + startSenderInVMsAsync("ln", vm3, vm5); + + // verify tmpDroppedEvents should be 0 now at site-ny + vm3.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); + vm5.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); + + vm3.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + } + + /** + * Test that gateway sender's secondary queues do not keep dropped events + * by the primary gateway sender received while it was starting but was not + * started yet, after the primary finishes starting. + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=true + * LN site's sender's manual-start=true + * + * put some events from LN and start the sender in NY simultaneously + * Make sure there are no events in tmpDroppedEvents and the queues are drained. + */ + @Test + public void startedSenderReceivingEventsWhileStartingShouldDrainQueues() + throws Exception { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + + createCacheInVMs(lnPort, vm2, vm4); + createReceiverInVMs(vm2, vm4); + createCacheInVMs(nyPort, vm3, vm5); + createReceiverInVMs(vm3, vm5); + + vm2.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, true)); + vm4.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, true)); + + vm3.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true)); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + + AsyncInvocation inv = + vm2.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 1000)); + startSenderInVMsAsync("ny", vm2, vm4); + inv.join(); + + vm2.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + vm4.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + + vm2.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + } + + /** + * Test that gateway sender's secondary queues do not keep dropped events + * by the primary gateway sender received while it was stopping after it is started again. + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=false + * LN site's sender's manual-start=false + * + * put some events from LN and stop the sender in NY simultaneously + * Start the sender in NY. + * Make sure there are no events in tmpDroppedEvents and the queues are drained. + */ + @Test + public void startedSenderReceivingEventsWhileStoppingShouldDrainQueues() + throws Exception { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + + createCacheInVMs(lnPort, vm2, vm4); + createReceiverInVMs(vm2, vm4); + createCacheInVMs(nyPort, vm3, vm5); + createReceiverInVMs(vm3, vm5); + + vm2.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, false)); + vm4.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, false)); + + vm3.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, false)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, false)); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + + AsyncInvocation inv = + vm2.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 1000)); + stopSenderInVMsAsync("ny", vm2, vm4); + inv.join(); + + startSenderInVMsAsync("ny", vm2, vm4); + + vm2.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + vm4.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + + vm2.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + } + + /** + * Test that a stopped gateway sender receiving events + * does not store them in tmpDroppedEvents but after started + * does not leave any event in the + * gateway sender's secondary queues. + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=false + * LN site's sender's manual-start=false + * + * put some events from LN and stop the sender in NY simultaneously + * Start the sender in NY. + * Make sure there are no events in tmpDroppedEvents and the queues are drained. + */ + @Test + public void stoppedSenderShouldNotAddEventsToTmpDroppedEventsButStillDrainQueuesWhenStarted() + throws Exception { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + + createCacheInVMs(lnPort, vm2, vm4); + createReceiverInVMs(vm2, vm4); + createCacheInVMs(nyPort, vm3, vm5); + createReceiverInVMs(vm3, vm5); + + vm2.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, false)); + vm4.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, false)); + + vm3.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, false)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, false)); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + + stopSenderInVMsAsync("ny", vm2, vm4); + + vm2.invoke(() -> WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 0, 100)); + + // verify tmpDroppedEvents is 0 at site-ny + vm2.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + vm4.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); + vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); + + vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); + vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); + + + startSenderInVMsAsync("ny", vm2, vm4); + + vm2.invoke(() -> WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 100, 1000)); + + vm2.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + vm4.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000)); + vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000)); + + vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 900)); + vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 900)); + + // verify the secondary's queues are drained at site-ny + vm2.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + } + + /** + * Test that a stopped primary gateway sender receiving events + * does not store them in tmpDroppedEvents but after started + * does not leave any event in the + * gateway sender's secondary queues. + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=false + * LN site's sender's manual-start=false + * + * put some events from LN and stop one instance of the sender in NY simultaneously + * Start the stopped instance of the sender in NY. + * Make sure there are no events in tmpDroppedEvents and the queues are drained. + */ + @Test + public void stoppedPrimarySenderShouldNotAddEventsToTmpDroppedEventsButStillDrainQueuesWhenStarted() + throws Exception { Review comment: An exception is never thrown from this method, so this "throws" can be removed. ########## File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java ########## @@ -370,69 +371,312 @@ public void testParallelPropagationLoopBack3SitesNtoNTopologyPutFromOneDS() { */ @Test public void unstartedSenderShouldNotAddReceivedEventsIntoTmpDropped() throws Exception { - Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); - Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); // create receiver on site-ln and site-ny createCacheInVMs(lnPort, vm2, vm4); createReceiverInVMs(vm2, vm4); createCacheInVMs(nyPort, vm3, vm5); createReceiverInVMs(vm3, vm5); - // create senders on site-ln, Note: sender-id is its destination, i.e. ny + // create senders on site-ny, Note: sender-id is its destination, i.e. ny vm2.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, true)); vm4.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, true)); - // create senders on site-ny, Note: sender-id is its destination, i.e. ln + // create senders on site-ln, Note: sender-id is its destination, i.e. ln vm3.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); - // create PR on site-ln + // create PR on site-ny vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, isOffHeap())); vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, isOffHeap())); - // create PR on site-ny + // create PR on site-ln vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap())); vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap())); - // start sender on site-ln + // start sender on site-ny startSenderInVMs("ny", vm2, vm4); - // Do 100 puts on site-ln - vm2.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 100)); - // verify site-ny received the 100 events + // do 100 puts on site-ln + vm3.invoke(() -> WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 0, 100)); + + // verify site-ny have 100 entries vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); - // verify tmpDroppedEvents should be 0 at site-ny - vm3.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); - vm5.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); - - // do next 100 puts on site-ny - vm3.invoke(() -> WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 100, 200)); - - // verify site-ny have 200 entries - vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 200)); - vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 200)); - - // verify tmpDroppedEvents should be 100 at site-ny, because the sender is not started yet + // verify tmpDroppedEvents should be 100 at site-ln, because the sender is not started yet vm3.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 100)); vm5.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 100)); // verify site-ln has not received the events from site-ny yet - vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); - vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); + vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); - // start sender on site-ny + // start sender on site-ln startSenderInVMsAsync("ln", vm3, vm5); // verify tmpDroppedEvents should be 0 now at site-ny vm3.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); vm5.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); + + vm3.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + } + + /** + * Test that gateway sender's secondary queues do not keep dropped events + * by the primary gateway sender received while it was starting but was not + * started yet, after the primary finishes starting. + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=true + * LN site's sender's manual-start=true + * + * put some events from LN and start the sender in NY simultaneously + * Make sure there are no events in tmpDroppedEvents and the queues are drained. + */ + @Test + public void startedSenderReceivingEventsWhileStartingShouldDrainQueues() + throws Exception { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + + createCacheInVMs(lnPort, vm2, vm4); + createReceiverInVMs(vm2, vm4); + createCacheInVMs(nyPort, vm3, vm5); + createReceiverInVMs(vm3, vm5); + + vm2.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, true)); + vm4.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, true)); + + vm3.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + + AsyncInvocation inv = + vm2.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 1000)); + startSenderInVMsAsync("ny", vm2, vm4); + inv.join(); Review comment: The `AsyncInvocation.join()` method is deprecated. It should be replaced with `AsyncInvocation.await()`. ########## File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java ########## @@ -370,69 +371,312 @@ public void testParallelPropagationLoopBack3SitesNtoNTopologyPutFromOneDS() { */ @Test public void unstartedSenderShouldNotAddReceivedEventsIntoTmpDropped() throws Exception { - Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); - Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); // create receiver on site-ln and site-ny createCacheInVMs(lnPort, vm2, vm4); createReceiverInVMs(vm2, vm4); createCacheInVMs(nyPort, vm3, vm5); createReceiverInVMs(vm3, vm5); - // create senders on site-ln, Note: sender-id is its destination, i.e. ny + // create senders on site-ny, Note: sender-id is its destination, i.e. ny vm2.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, true)); vm4.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, true)); - // create senders on site-ny, Note: sender-id is its destination, i.e. ln + // create senders on site-ln, Note: sender-id is its destination, i.e. ln vm3.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); - // create PR on site-ln + // create PR on site-ny vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, isOffHeap())); vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, isOffHeap())); - // create PR on site-ny + // create PR on site-ln vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap())); vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap())); - // start sender on site-ln + // start sender on site-ny startSenderInVMs("ny", vm2, vm4); - // Do 100 puts on site-ln - vm2.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 100)); - // verify site-ny received the 100 events + // do 100 puts on site-ln + vm3.invoke(() -> WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 0, 100)); + + // verify site-ny have 100 entries vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); - // verify tmpDroppedEvents should be 0 at site-ny - vm3.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); - vm5.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); - - // do next 100 puts on site-ny - vm3.invoke(() -> WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 100, 200)); - - // verify site-ny have 200 entries - vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 200)); - vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 200)); - - // verify tmpDroppedEvents should be 100 at site-ny, because the sender is not started yet + // verify tmpDroppedEvents should be 100 at site-ln, because the sender is not started yet vm3.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 100)); vm5.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 100)); // verify site-ln has not received the events from site-ny yet - vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); - vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); + vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); - // start sender on site-ny + // start sender on site-ln startSenderInVMsAsync("ln", vm3, vm5); // verify tmpDroppedEvents should be 0 now at site-ny vm3.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); vm5.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); + + vm3.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + } + + /** + * Test that gateway sender's secondary queues do not keep dropped events + * by the primary gateway sender received while it was starting but was not + * started yet, after the primary finishes starting. + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=true + * LN site's sender's manual-start=true + * + * put some events from LN and start the sender in NY simultaneously + * Make sure there are no events in tmpDroppedEvents and the queues are drained. + */ + @Test + public void startedSenderReceivingEventsWhileStartingShouldDrainQueues() + throws Exception { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + + createCacheInVMs(lnPort, vm2, vm4); + createReceiverInVMs(vm2, vm4); + createCacheInVMs(nyPort, vm3, vm5); + createReceiverInVMs(vm3, vm5); + + vm2.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, true)); + vm4.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, true)); + + vm3.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + + AsyncInvocation inv = + vm2.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 1000)); + startSenderInVMsAsync("ny", vm2, vm4); + inv.join(); + + vm2.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + vm4.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + + vm2.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); } + /** + * Test that gateway sender's secondary queues do not keep dropped events + * by the primary gateway sender received while it was stopping after it is started again. + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=false + * LN site's sender's manual-start=false + * + * put some events from LN and stop the sender in NY simultaneously + * Start the sender in NY. + * Make sure there are no events in tmpDroppedEvents and the queues are drained. + */ + @Test + public void startedSenderReceivingEventsWhileStoppingShouldDrainQueues() + throws Exception { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + + createCacheInVMs(lnPort, vm2, vm4); + createReceiverInVMs(vm2, vm4); + createCacheInVMs(nyPort, vm3, vm5); + createReceiverInVMs(vm3, vm5); + + vm2.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, false)); + vm4.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, false)); + + vm3.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, false)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, false)); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + + AsyncInvocation inv = + vm2.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 1000)); + stopSenderInVMsAsync("ny", vm2, vm4); + inv.join(); Review comment: The `AsyncInvocation.join()` method is deprecated. It should be replaced with `AsyncInvocation.await()`. ########## File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java ########## @@ -370,69 +371,312 @@ public void testParallelPropagationLoopBack3SitesNtoNTopologyPutFromOneDS() { */ @Test public void unstartedSenderShouldNotAddReceivedEventsIntoTmpDropped() throws Exception { - Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); - Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); // create receiver on site-ln and site-ny createCacheInVMs(lnPort, vm2, vm4); createReceiverInVMs(vm2, vm4); createCacheInVMs(nyPort, vm3, vm5); createReceiverInVMs(vm3, vm5); - // create senders on site-ln, Note: sender-id is its destination, i.e. ny + // create senders on site-ny, Note: sender-id is its destination, i.e. ny vm2.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, true)); vm4.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, true)); - // create senders on site-ny, Note: sender-id is its destination, i.e. ln + // create senders on site-ln, Note: sender-id is its destination, i.e. ln vm3.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); - // create PR on site-ln + // create PR on site-ny vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, isOffHeap())); vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, isOffHeap())); - // create PR on site-ny + // create PR on site-ln vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap())); vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap())); - // start sender on site-ln + // start sender on site-ny startSenderInVMs("ny", vm2, vm4); - // Do 100 puts on site-ln - vm2.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 100)); - // verify site-ny received the 100 events + // do 100 puts on site-ln + vm3.invoke(() -> WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 0, 100)); + + // verify site-ny have 100 entries vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); - // verify tmpDroppedEvents should be 0 at site-ny - vm3.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); - vm5.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); - - // do next 100 puts on site-ny - vm3.invoke(() -> WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 100, 200)); - - // verify site-ny have 200 entries - vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 200)); - vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 200)); - - // verify tmpDroppedEvents should be 100 at site-ny, because the sender is not started yet + // verify tmpDroppedEvents should be 100 at site-ln, because the sender is not started yet vm3.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 100)); vm5.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 100)); // verify site-ln has not received the events from site-ny yet - vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); - vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); + vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); - // start sender on site-ny + // start sender on site-ln startSenderInVMsAsync("ln", vm3, vm5); // verify tmpDroppedEvents should be 0 now at site-ny vm3.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); vm5.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); + + vm3.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + } + + /** + * Test that gateway sender's secondary queues do not keep dropped events + * by the primary gateway sender received while it was starting but was not + * started yet, after the primary finishes starting. + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=true + * LN site's sender's manual-start=true + * + * put some events from LN and start the sender in NY simultaneously + * Make sure there are no events in tmpDroppedEvents and the queues are drained. + */ + @Test + public void startedSenderReceivingEventsWhileStartingShouldDrainQueues() + throws Exception { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + + createCacheInVMs(lnPort, vm2, vm4); + createReceiverInVMs(vm2, vm4); + createCacheInVMs(nyPort, vm3, vm5); + createReceiverInVMs(vm3, vm5); + + vm2.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, true)); + vm4.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, true)); + + vm3.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + + AsyncInvocation inv = + vm2.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 1000)); + startSenderInVMsAsync("ny", vm2, vm4); + inv.join(); + + vm2.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + vm4.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + + vm2.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); } + /** + * Test that gateway sender's secondary queues do not keep dropped events + * by the primary gateway sender received while it was stopping after it is started again. + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=false + * LN site's sender's manual-start=false + * + * put some events from LN and stop the sender in NY simultaneously + * Start the sender in NY. + * Make sure there are no events in tmpDroppedEvents and the queues are drained. + */ + @Test + public void startedSenderReceivingEventsWhileStoppingShouldDrainQueues() + throws Exception { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + + createCacheInVMs(lnPort, vm2, vm4); + createReceiverInVMs(vm2, vm4); + createCacheInVMs(nyPort, vm3, vm5); + createReceiverInVMs(vm3, vm5); + + vm2.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, false)); + vm4.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, false)); + + vm3.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, false)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, false)); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + + AsyncInvocation inv = + vm2.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 1000)); + stopSenderInVMsAsync("ny", vm2, vm4); + inv.join(); + + startSenderInVMsAsync("ny", vm2, vm4); + + vm2.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + vm4.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + + vm2.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + } + + /** + * Test that a stopped gateway sender receiving events + * does not store them in tmpDroppedEvents but after started + * does not leave any event in the + * gateway sender's secondary queues. + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=false + * LN site's sender's manual-start=false + * + * put some events from LN and stop the sender in NY simultaneously + * Start the sender in NY. + * Make sure there are no events in tmpDroppedEvents and the queues are drained. + */ + @Test + public void stoppedSenderShouldNotAddEventsToTmpDroppedEventsButStillDrainQueuesWhenStarted() + throws Exception { Review comment: An exception is never thrown from this method, so this can be removed. ########## File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java ########## @@ -370,69 +371,312 @@ public void testParallelPropagationLoopBack3SitesNtoNTopologyPutFromOneDS() { */ @Test public void unstartedSenderShouldNotAddReceivedEventsIntoTmpDropped() throws Exception { - Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); - Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); // create receiver on site-ln and site-ny createCacheInVMs(lnPort, vm2, vm4); createReceiverInVMs(vm2, vm4); createCacheInVMs(nyPort, vm3, vm5); createReceiverInVMs(vm3, vm5); - // create senders on site-ln, Note: sender-id is its destination, i.e. ny + // create senders on site-ny, Note: sender-id is its destination, i.e. ny vm2.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, true)); vm4.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, true)); - // create senders on site-ny, Note: sender-id is its destination, i.e. ln + // create senders on site-ln, Note: sender-id is its destination, i.e. ln vm3.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); - // create PR on site-ln + // create PR on site-ny vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, isOffHeap())); vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, isOffHeap())); - // create PR on site-ny + // create PR on site-ln vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap())); vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap())); - // start sender on site-ln + // start sender on site-ny startSenderInVMs("ny", vm2, vm4); - // Do 100 puts on site-ln - vm2.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 100)); - // verify site-ny received the 100 events + // do 100 puts on site-ln + vm3.invoke(() -> WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 0, 100)); + + // verify site-ny have 100 entries vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); - // verify tmpDroppedEvents should be 0 at site-ny - vm3.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); - vm5.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); - - // do next 100 puts on site-ny - vm3.invoke(() -> WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 100, 200)); - - // verify site-ny have 200 entries - vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 200)); - vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 200)); - - // verify tmpDroppedEvents should be 100 at site-ny, because the sender is not started yet + // verify tmpDroppedEvents should be 100 at site-ln, because the sender is not started yet vm3.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 100)); vm5.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 100)); // verify site-ln has not received the events from site-ny yet - vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); - vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); + vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); - // start sender on site-ny + // start sender on site-ln startSenderInVMsAsync("ln", vm3, vm5); // verify tmpDroppedEvents should be 0 now at site-ny vm3.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); vm5.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); + + vm3.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + } + + /** + * Test that gateway sender's secondary queues do not keep dropped events + * by the primary gateway sender received while it was starting but was not + * started yet, after the primary finishes starting. + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=true + * LN site's sender's manual-start=true + * + * put some events from LN and start the sender in NY simultaneously + * Make sure there are no events in tmpDroppedEvents and the queues are drained. + */ + @Test + public void startedSenderReceivingEventsWhileStartingShouldDrainQueues() + throws Exception { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + + createCacheInVMs(lnPort, vm2, vm4); + createReceiverInVMs(vm2, vm4); + createCacheInVMs(nyPort, vm3, vm5); + createReceiverInVMs(vm3, vm5); + + vm2.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, true)); + vm4.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, true)); + + vm3.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + + AsyncInvocation inv = Review comment: The IDE warning here can be resolved by using `List<AsyncInvocation<Void>>`. ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java ########## @@ -1118,6 +1115,18 @@ public void distribute(EnumListenerEvent operation, EntryEventImpl event, } } + private void recordDroppedEvent(EntryEventImpl event) { + final boolean isDebugEnabled = logger.isDebugEnabled(); Review comment: This line could be moved to immediately before the `if (isDebugEnabled)` check so that it's only executed if necessary rather than unnecessarily in the case that `this.eventProcessor != null` evaluates to true. ########## File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationsFeatureDUnitTest.java ########## @@ -297,4 +298,322 @@ public void testReplicatedSerialPropagationWithFilter_AfterAck() throws Exceptio vm4.invoke(() -> WANTestBase.waitForConcurrentSerialSenderQueueToDrain("ln")); vm5.invoke(() -> WANTestBase.waitForConcurrentSerialSenderQueueToDrain("ln")); } + + /** + * Test unstarted sender + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=true + * + * Make sure the events are sent from LN to NY and will not be added into tmpDroppedEvents + * while normal events put from NY site can still be added to tmpDroppedEvents + * Start the sender, make sure the events in tmpDroppedEvents are sent to LN finally + */ + @Test + public void unstartedSenderShouldNotAddReceivedEventsIntoTmpDropped() throws Exception { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + + // create receiver on site-ln and site-ny + createCacheInVMs(lnPort, vm2, vm4); + createReceiverInVMs(vm2, vm4); + createCacheInVMs(nyPort, vm3, vm5); + createReceiverInVMs(vm3, vm5); + + // create senders on site-ny, Note: sender-id is its destination, i.e. ny + vm2.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, true)); + vm4.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, true)); + + // create senders on site-ln, Note: sender-id is its destination, i.e. ln + vm3.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true)); + + // create PR on site-ny + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + + // create PR on site-ln + vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + + // start sender on site-ny + startSenderInVMs("ny", vm2, vm4); + + // do 100 puts on site-ln + vm3.invoke(() -> WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 0, 100)); + + // verify site-ny have 100 entries + vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); + vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); + + // verify site-ln has not received the events from site-ny yet + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); + vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); + + // start sender on site-ln + startSenderInVMsAsync("ln", vm3, vm5); + + // verify tmpDroppedEvents should be 0 now at site-ny + vm3.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); + vm5.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); + + vm3.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + } + + /** + * Test that gateway sender's secondary queues do not keep dropped events + * by the primary gateway sender received while it was starting but was not + * started yet, after the primary finishes starting. + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=true + * LN site's sender's manual-start=true + * + * put some events from LN and start the sender in NY simultaneously + * Make sure there are no events in tmpDroppedEvents and the queues are drained. + */ + @Test + public void startedSenderReceivingEventsWhileStartingShouldDrainQueues() + throws Exception { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + + createCacheInVMs(lnPort, vm2, vm4); + createReceiverInVMs(vm2, vm4); + createCacheInVMs(nyPort, vm3, vm5); + createReceiverInVMs(vm3, vm5); + + vm2.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, true)); + vm4.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, true)); + + vm3.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true)); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + + AsyncInvocation inv = + vm2.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 1000)); + startSenderInVMsAsync("ny", vm2, vm4); + inv.join(); + + vm2.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + vm4.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + + vm2.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + } + + /** + * Test that gateway sender's secondary queues do not keep dropped events + * by the primary gateway sender received while it was stopping after it is started again. + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=false + * LN site's sender's manual-start=false + * + * put some events from LN and stop the sender in NY simultaneously + * Start the sender in NY. + * Make sure there are no events in tmpDroppedEvents and the queues are drained. + */ + @Test + public void startedSenderReceivingEventsWhileStoppingShouldDrainQueues() + throws Exception { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + + createCacheInVMs(lnPort, vm2, vm4); + createReceiverInVMs(vm2, vm4); + createCacheInVMs(nyPort, vm3, vm5); + createReceiverInVMs(vm3, vm5); + + vm2.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, false)); + vm4.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, false)); + + vm3.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, false)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, false)); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + + AsyncInvocation inv = + vm2.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 1000)); + stopSenderInVMsAsync("ny", vm2, vm4); + inv.join(); Review comment: The `AsyncInvocation.join()` method is deprecated. It should be replaced with `AsyncInvocation.await()`. ########## File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationsFeatureDUnitTest.java ########## @@ -297,4 +298,322 @@ public void testReplicatedSerialPropagationWithFilter_AfterAck() throws Exceptio vm4.invoke(() -> WANTestBase.waitForConcurrentSerialSenderQueueToDrain("ln")); vm5.invoke(() -> WANTestBase.waitForConcurrentSerialSenderQueueToDrain("ln")); } + + /** + * Test unstarted sender + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=true + * + * Make sure the events are sent from LN to NY and will not be added into tmpDroppedEvents + * while normal events put from NY site can still be added to tmpDroppedEvents + * Start the sender, make sure the events in tmpDroppedEvents are sent to LN finally + */ + @Test + public void unstartedSenderShouldNotAddReceivedEventsIntoTmpDropped() throws Exception { Review comment: An exception is never thrown from this method, so this "throws" can be removed. ########## File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationsFeatureDUnitTest.java ########## @@ -297,4 +298,322 @@ public void testReplicatedSerialPropagationWithFilter_AfterAck() throws Exceptio vm4.invoke(() -> WANTestBase.waitForConcurrentSerialSenderQueueToDrain("ln")); vm5.invoke(() -> WANTestBase.waitForConcurrentSerialSenderQueueToDrain("ln")); } + + /** + * Test unstarted sender + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=true + * + * Make sure the events are sent from LN to NY and will not be added into tmpDroppedEvents + * while normal events put from NY site can still be added to tmpDroppedEvents + * Start the sender, make sure the events in tmpDroppedEvents are sent to LN finally + */ + @Test + public void unstartedSenderShouldNotAddReceivedEventsIntoTmpDropped() throws Exception { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + + // create receiver on site-ln and site-ny + createCacheInVMs(lnPort, vm2, vm4); + createReceiverInVMs(vm2, vm4); + createCacheInVMs(nyPort, vm3, vm5); + createReceiverInVMs(vm3, vm5); + + // create senders on site-ny, Note: sender-id is its destination, i.e. ny + vm2.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, true)); + vm4.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, true)); + + // create senders on site-ln, Note: sender-id is its destination, i.e. ln + vm3.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true)); + + // create PR on site-ny + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + + // create PR on site-ln + vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + + // start sender on site-ny + startSenderInVMs("ny", vm2, vm4); + + // do 100 puts on site-ln + vm3.invoke(() -> WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 0, 100)); + + // verify site-ny have 100 entries + vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); + vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); + + // verify site-ln has not received the events from site-ny yet + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); + vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); + + // start sender on site-ln + startSenderInVMsAsync("ln", vm3, vm5); + + // verify tmpDroppedEvents should be 0 now at site-ny + vm3.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); + vm5.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); + + vm3.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + } + + /** + * Test that gateway sender's secondary queues do not keep dropped events + * by the primary gateway sender received while it was starting but was not + * started yet, after the primary finishes starting. + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=true + * LN site's sender's manual-start=true + * + * put some events from LN and start the sender in NY simultaneously + * Make sure there are no events in tmpDroppedEvents and the queues are drained. + */ + @Test + public void startedSenderReceivingEventsWhileStartingShouldDrainQueues() + throws Exception { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + + createCacheInVMs(lnPort, vm2, vm4); + createReceiverInVMs(vm2, vm4); + createCacheInVMs(nyPort, vm3, vm5); + createReceiverInVMs(vm3, vm5); + + vm2.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, true)); + vm4.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, true)); + + vm3.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true)); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + + AsyncInvocation inv = + vm2.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 1000)); + startSenderInVMsAsync("ny", vm2, vm4); + inv.join(); Review comment: The `AsyncInvocation.join()` method is deprecated. It should be replaced with `AsyncInvocation.await()`. ########## File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationsFeatureDUnitTest.java ########## @@ -297,4 +298,322 @@ public void testReplicatedSerialPropagationWithFilter_AfterAck() throws Exceptio vm4.invoke(() -> WANTestBase.waitForConcurrentSerialSenderQueueToDrain("ln")); vm5.invoke(() -> WANTestBase.waitForConcurrentSerialSenderQueueToDrain("ln")); } + + /** + * Test unstarted sender + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=true + * + * Make sure the events are sent from LN to NY and will not be added into tmpDroppedEvents + * while normal events put from NY site can still be added to tmpDroppedEvents + * Start the sender, make sure the events in tmpDroppedEvents are sent to LN finally + */ + @Test + public void unstartedSenderShouldNotAddReceivedEventsIntoTmpDropped() throws Exception { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + + // create receiver on site-ln and site-ny + createCacheInVMs(lnPort, vm2, vm4); + createReceiverInVMs(vm2, vm4); + createCacheInVMs(nyPort, vm3, vm5); + createReceiverInVMs(vm3, vm5); + + // create senders on site-ny, Note: sender-id is its destination, i.e. ny + vm2.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, true)); + vm4.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, true)); + + // create senders on site-ln, Note: sender-id is its destination, i.e. ln + vm3.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true)); + + // create PR on site-ny + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + + // create PR on site-ln + vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + + // start sender on site-ny + startSenderInVMs("ny", vm2, vm4); + + // do 100 puts on site-ln + vm3.invoke(() -> WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 0, 100)); + + // verify site-ny have 100 entries + vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); + vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); + + // verify site-ln has not received the events from site-ny yet + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); + vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); + + // start sender on site-ln + startSenderInVMsAsync("ln", vm3, vm5); + + // verify tmpDroppedEvents should be 0 now at site-ny + vm3.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); + vm5.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); + + vm3.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + } + + /** + * Test that gateway sender's secondary queues do not keep dropped events + * by the primary gateway sender received while it was starting but was not + * started yet, after the primary finishes starting. + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=true + * LN site's sender's manual-start=true + * + * put some events from LN and start the sender in NY simultaneously + * Make sure there are no events in tmpDroppedEvents and the queues are drained. + */ + @Test + public void startedSenderReceivingEventsWhileStartingShouldDrainQueues() + throws Exception { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + + createCacheInVMs(lnPort, vm2, vm4); + createReceiverInVMs(vm2, vm4); + createCacheInVMs(nyPort, vm3, vm5); + createReceiverInVMs(vm3, vm5); + + vm2.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, true)); + vm4.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, true)); + + vm3.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true)); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + + AsyncInvocation inv = + vm2.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 1000)); + startSenderInVMsAsync("ny", vm2, vm4); + inv.join(); + + vm2.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + vm4.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + + vm2.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + } + + /** + * Test that gateway sender's secondary queues do not keep dropped events + * by the primary gateway sender received while it was stopping after it is started again. + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=false + * LN site's sender's manual-start=false + * + * put some events from LN and stop the sender in NY simultaneously + * Start the sender in NY. + * Make sure there are no events in tmpDroppedEvents and the queues are drained. + */ + @Test + public void startedSenderReceivingEventsWhileStoppingShouldDrainQueues() + throws Exception { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + + createCacheInVMs(lnPort, vm2, vm4); + createReceiverInVMs(vm2, vm4); + createCacheInVMs(nyPort, vm3, vm5); + createReceiverInVMs(vm3, vm5); + + vm2.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, false)); + vm4.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, false)); + + vm3.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, false)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, false)); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + + AsyncInvocation inv = Review comment: The IDE warning here can be resolved by using `List<AsyncInvocation<Void>>`. ########## File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationsFeatureDUnitTest.java ########## @@ -297,4 +298,322 @@ public void testReplicatedSerialPropagationWithFilter_AfterAck() throws Exceptio vm4.invoke(() -> WANTestBase.waitForConcurrentSerialSenderQueueToDrain("ln")); vm5.invoke(() -> WANTestBase.waitForConcurrentSerialSenderQueueToDrain("ln")); } + + /** + * Test unstarted sender + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=true + * + * Make sure the events are sent from LN to NY and will not be added into tmpDroppedEvents + * while normal events put from NY site can still be added to tmpDroppedEvents + * Start the sender, make sure the events in tmpDroppedEvents are sent to LN finally + */ + @Test + public void unstartedSenderShouldNotAddReceivedEventsIntoTmpDropped() throws Exception { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + + // create receiver on site-ln and site-ny + createCacheInVMs(lnPort, vm2, vm4); + createReceiverInVMs(vm2, vm4); + createCacheInVMs(nyPort, vm3, vm5); + createReceiverInVMs(vm3, vm5); + + // create senders on site-ny, Note: sender-id is its destination, i.e. ny + vm2.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, true)); + vm4.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, true)); + + // create senders on site-ln, Note: sender-id is its destination, i.e. ln + vm3.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true)); + + // create PR on site-ny + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + + // create PR on site-ln + vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + + // start sender on site-ny + startSenderInVMs("ny", vm2, vm4); + + // do 100 puts on site-ln + vm3.invoke(() -> WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 0, 100)); + + // verify site-ny have 100 entries + vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); + vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); + + // verify site-ln has not received the events from site-ny yet + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); + vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); + + // start sender on site-ln + startSenderInVMsAsync("ln", vm3, vm5); + + // verify tmpDroppedEvents should be 0 now at site-ny + vm3.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); + vm5.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); + + vm3.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + } + + /** + * Test that gateway sender's secondary queues do not keep dropped events + * by the primary gateway sender received while it was starting but was not + * started yet, after the primary finishes starting. + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=true + * LN site's sender's manual-start=true + * + * put some events from LN and start the sender in NY simultaneously + * Make sure there are no events in tmpDroppedEvents and the queues are drained. + */ + @Test + public void startedSenderReceivingEventsWhileStartingShouldDrainQueues() + throws Exception { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + + createCacheInVMs(lnPort, vm2, vm4); + createReceiverInVMs(vm2, vm4); + createCacheInVMs(nyPort, vm3, vm5); + createReceiverInVMs(vm3, vm5); + + vm2.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, true)); + vm4.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, true)); + + vm3.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true)); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + + AsyncInvocation inv = + vm2.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 1000)); + startSenderInVMsAsync("ny", vm2, vm4); + inv.join(); + + vm2.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + vm4.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + + vm2.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + } + + /** + * Test that gateway sender's secondary queues do not keep dropped events + * by the primary gateway sender received while it was stopping after it is started again. + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=false + * LN site's sender's manual-start=false + * + * put some events from LN and stop the sender in NY simultaneously + * Start the sender in NY. + * Make sure there are no events in tmpDroppedEvents and the queues are drained. + */ + @Test + public void startedSenderReceivingEventsWhileStoppingShouldDrainQueues() + throws Exception { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + + createCacheInVMs(lnPort, vm2, vm4); + createReceiverInVMs(vm2, vm4); + createCacheInVMs(nyPort, vm3, vm5); + createReceiverInVMs(vm3, vm5); + + vm2.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, false)); + vm4.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, false)); + + vm3.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, false)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, false)); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + + AsyncInvocation inv = + vm2.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 1000)); + stopSenderInVMsAsync("ny", vm2, vm4); + inv.join(); + + startSenderInVMsAsync("ny", vm2, vm4); + + vm2.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + vm4.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + + vm2.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + } + + /** + * Test that a stopped gateway sender receiving events + * does not store them in tmpDroppedEvents but after started + * does not leave any event in the + * gateway sender's secondary queues. + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=false + * LN site's sender's manual-start=false + * + * put some events from LN and stop the sender in NY simultaneously + * Start the sender in NY. + * Make sure there are no events in tmpDroppedEvents and the queues are drained. + */ + @Test + public void stoppedSenderShouldNotAddEventsToTmpDroppedEventsButStillDrainQueuesWhenStarted() + throws Exception { Review comment: An exception is never thrown from this method, so this "throws" can be removed. ########## File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java ########## @@ -370,69 +371,312 @@ public void testParallelPropagationLoopBack3SitesNtoNTopologyPutFromOneDS() { */ @Test public void unstartedSenderShouldNotAddReceivedEventsIntoTmpDropped() throws Exception { - Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); - Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); // create receiver on site-ln and site-ny createCacheInVMs(lnPort, vm2, vm4); createReceiverInVMs(vm2, vm4); createCacheInVMs(nyPort, vm3, vm5); createReceiverInVMs(vm3, vm5); - // create senders on site-ln, Note: sender-id is its destination, i.e. ny + // create senders on site-ny, Note: sender-id is its destination, i.e. ny vm2.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, true)); vm4.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, true)); - // create senders on site-ny, Note: sender-id is its destination, i.e. ln + // create senders on site-ln, Note: sender-id is its destination, i.e. ln vm3.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); - // create PR on site-ln + // create PR on site-ny vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, isOffHeap())); vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, isOffHeap())); - // create PR on site-ny + // create PR on site-ln vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap())); vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap())); - // start sender on site-ln + // start sender on site-ny startSenderInVMs("ny", vm2, vm4); - // Do 100 puts on site-ln - vm2.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 100)); - // verify site-ny received the 100 events + // do 100 puts on site-ln + vm3.invoke(() -> WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 0, 100)); + + // verify site-ny have 100 entries vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); - // verify tmpDroppedEvents should be 0 at site-ny - vm3.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); - vm5.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); - - // do next 100 puts on site-ny - vm3.invoke(() -> WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 100, 200)); - - // verify site-ny have 200 entries - vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 200)); - vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 200)); - - // verify tmpDroppedEvents should be 100 at site-ny, because the sender is not started yet + // verify tmpDroppedEvents should be 100 at site-ln, because the sender is not started yet vm3.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 100)); vm5.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 100)); // verify site-ln has not received the events from site-ny yet - vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); - vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); + vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); - // start sender on site-ny + // start sender on site-ln startSenderInVMsAsync("ln", vm3, vm5); // verify tmpDroppedEvents should be 0 now at site-ny vm3.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); vm5.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); + + vm3.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + } + + /** + * Test that gateway sender's secondary queues do not keep dropped events + * by the primary gateway sender received while it was starting but was not + * started yet, after the primary finishes starting. + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=true + * LN site's sender's manual-start=true + * + * put some events from LN and start the sender in NY simultaneously + * Make sure there are no events in tmpDroppedEvents and the queues are drained. + */ + @Test + public void startedSenderReceivingEventsWhileStartingShouldDrainQueues() + throws Exception { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + + createCacheInVMs(lnPort, vm2, vm4); + createReceiverInVMs(vm2, vm4); + createCacheInVMs(nyPort, vm3, vm5); + createReceiverInVMs(vm3, vm5); + + vm2.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, true)); + vm4.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, true)); + + vm3.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + + AsyncInvocation inv = + vm2.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 1000)); + startSenderInVMsAsync("ny", vm2, vm4); + inv.join(); + + vm2.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + vm4.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + + vm2.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); } + /** + * Test that gateway sender's secondary queues do not keep dropped events + * by the primary gateway sender received while it was stopping after it is started again. + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=false + * LN site's sender's manual-start=false + * + * put some events from LN and stop the sender in NY simultaneously + * Start the sender in NY. + * Make sure there are no events in tmpDroppedEvents and the queues are drained. + */ + @Test + public void startedSenderReceivingEventsWhileStoppingShouldDrainQueues() + throws Exception { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + + createCacheInVMs(lnPort, vm2, vm4); + createReceiverInVMs(vm2, vm4); + createCacheInVMs(nyPort, vm3, vm5); + createReceiverInVMs(vm3, vm5); + + vm2.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, false)); + vm4.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, false)); + + vm3.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, false)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, false)); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + + AsyncInvocation inv = + vm2.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 1000)); + stopSenderInVMsAsync("ny", vm2, vm4); + inv.join(); + + startSenderInVMsAsync("ny", vm2, vm4); + + vm2.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + vm4.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + + vm2.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + } + + /** + * Test that a stopped gateway sender receiving events + * does not store them in tmpDroppedEvents but after started + * does not leave any event in the + * gateway sender's secondary queues. + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=false + * LN site's sender's manual-start=false + * + * put some events from LN and stop the sender in NY simultaneously + * Start the sender in NY. + * Make sure there are no events in tmpDroppedEvents and the queues are drained. + */ + @Test + public void stoppedSenderShouldNotAddEventsToTmpDroppedEventsButStillDrainQueuesWhenStarted() + throws Exception { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + + createCacheInVMs(lnPort, vm2, vm4); + createReceiverInVMs(vm2, vm4); + createCacheInVMs(nyPort, vm3, vm5); + createReceiverInVMs(vm3, vm5); + + vm2.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, false)); + vm4.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, false)); + + vm3.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, false)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, false)); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + + stopSenderInVMsAsync("ny", vm2, vm4); + + vm2.invoke(() -> WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 0, 100)); + + // verify tmpDroppedEvents is 0 at site-ny + vm2.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + vm4.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); + vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); + + vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); + vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); + + + startSenderInVMsAsync("ny", vm2, vm4); + + vm2.invoke(() -> WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 100, 1000)); + + vm2.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + vm4.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000)); + vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000)); + + vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 900)); + vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 900)); + + // verify the secondary's queues are drained at site-ny + vm2.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + } + + /** + * Test that a stopped primary gateway sender receiving events + * does not store them in tmpDroppedEvents but after started + * does not leave any event in the + * gateway sender's secondary queues. + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=false + * LN site's sender's manual-start=false + * + * put some events from LN and stop one instance of the sender in NY simultaneously + * Start the stopped instance of the sender in NY. + * Make sure there are no events in tmpDroppedEvents and the queues are drained. + */ + @Test + public void stoppedPrimarySenderShouldNotAddEventsToTmpDroppedEventsButStillDrainQueuesWhenStarted() + throws Exception { Review comment: An exception is never thrown from this method, so this can be removed. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Events received when gateway sender is stopped use heap memory > -------------------------------------------------------------- > > Key: GEODE-8491 > URL: https://issues.apache.org/jira/browse/GEODE-8491 > Project: Geode > Issue Type: Bug > Components: wan > Affects Versions: 1.7.0, 1.8.0, 1.9.0, 1.9.1, 1.10.0, 1.9.2, 1.11.0, > 1.12.0, 1.13.0 > Reporter: Alberto Gomez > Assignee: Alberto Gomez > Priority: Major > Labels: pull-request-available > > When a gateway sender is stopped, events received are stored in a member > variable of the sender (tmpDroppedEvents) to be later sent to secondary > gateway senders to remove these dropped events from their queues. > This events use heap memory from the member and could eventually fill up the > available memory. > See RFC > [https://cwiki.apache.org/confluence/display/GEODE/Avoid+the+queuing+of+dropped+events+by+the+primary+gateway+sender+when+the+gateway+sender+is+stopped] > -- This message was sent by Atlassian Jira (v8.3.4#803005)