[ https://issues.apache.org/jira/browse/GEODE-8971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17299021#comment-17299021 ]
ASF GitHub Bot commented on GEODE-8971: --------------------------------------- DonalEvans commented on a change in pull request #6052: URL: https://github.com/apache/geode/pull/6052#discussion_r591745355 ########## File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java ########## @@ -348,7 +349,201 @@ public void testReplicatedSerialPropagationWithBatchRedistWithGroupTransactionEv } @Test - public void testReplicatedSerialPropagationWithMultipleDispatchers() throws Exception { + public void testReplicatedSerialPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted() + throws InterruptedException { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + + String regionName = testName + "_RR"; + + createCacheInVMs(nyPort, vm2); + createReceiverInVMs(vm2); + vm2.invoke(() -> WANTestBase.createReplicatedRegion(regionName, null, isOffHeap())); + + createCacheInVMs(lnPort, vm4, vm5); + vm4.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap())); + vm5.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap())); + + boolean groupTransactionEvents = true; + int batchSize = 10; + vm4.invoke( + () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, false, + groupTransactionEvents)); + vm5.invoke( + () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, false, + groupTransactionEvents)); + + int eventsPerTransaction = batchSize + 1; + // The number of entries must be big enough so that not all entries + // are replicated before the sender is stopped and also divisible by eventsPerTransaction + int entries = 2200; + // Execute some transactions + AsyncInvocation<Void> inv1 = + asyncExecuteTransactions(regionName, eventsPerTransaction, entries); + + // wait for batches to be distributed and then stop the sender + vm4.invoke(() -> await() + .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0)); + + // These exceptions are ignored here because it could happen that when an event + // is to be handled, the sender is stopped. The sender, when stopped, shuts down + // the thread pool that would handle the event and this could provoke the exception. + addIgnoredException("Exception occurred in CacheListener"); + addIgnoredException(RejectedExecutionException.class); + + // Stop the sender + stopSenderInVMsAsync("ln", vm4, vm5); + + // Wait for transactions to finish + inv1.await(); + vm4.invoke(() -> WANTestBase.validateRegionSize(regionName, entries)); + vm5.invoke(() -> WANTestBase.validateRegionSize(regionName, entries)); + + // Check + checkOnlyCompleteTransactionsAreReplicatedAfterSenderStopped(regionName, + eventsPerTransaction); + + // Start the sender + startSenderInVMsAsync("ln", vm4, vm5); + + // Check + checkOnlyCompleteTransactionsAreReplicatedAfterSenderRestarted(regionName, + eventsPerTransaction); + } + + @Test + public void testReplicatedSerialPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStartedReceiverStopped() + throws InterruptedException { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + + String regionName = testName + "_RR"; + + createCacheInVMs(nyPort, vm2); + createReceiverInVMs(vm2); + vm2.invoke(() -> WANTestBase.createReplicatedRegion(regionName, null, isOffHeap())); + vm2.invoke(WANTestBase::stopReceivers); + + createCacheInVMs(lnPort, vm4, vm5); + vm4.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap())); + vm5.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap())); + + boolean groupTransactionEvents = true; + int batchSize = 10; + vm4.invoke( + () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, false, + groupTransactionEvents)); + vm5.invoke( + () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, false, + groupTransactionEvents)); + + int eventsPerTransaction = batchSize + 1; + // The number of entries must be big enough so that not all entries + // are replicated before the sender is stopped and also divisible by eventsPerTransaction + int entries = 2200; + // Execute some transactions + AsyncInvocation<Void> inv1 = + asyncExecuteTransactions(regionName, eventsPerTransaction, entries); + + // wait for batches to be redistributed and then stop the sender + vm4.invoke(() -> await() + .until(() -> WANTestBase.getSenderStats("ln", -1).get(5) > 0)); + + // Stop the sender + stopSenderInVMsAsync("ln", vm4, vm5); + + // Wait for transactions to finish + inv1.await(); + vm4.invoke(() -> WANTestBase.validateRegionSize(regionName, entries)); + vm5.invoke(() -> WANTestBase.validateRegionSize(regionName, entries)); + + // Start the receiver and the sender + vm2.invoke(WANTestBase::startReceivers); + startSenderInVMsAsync("ln", vm4, vm5); + + // Check + checkOnlyCompleteTransactionsAreReplicatedAfterSenderRestarted(regionName, + eventsPerTransaction); + } + + private void checkOnlyCompleteTransactionsAreReplicatedAfterSenderStopped(String regionName, + int eventsPerTransaction) { + waitForBatchesToBeAppliedInTheReceiver(regionName, eventsPerTransaction); + + List<Integer> v4List = + vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1)); + List<Integer> v5List = + vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1)); + + // batches with incomplete transactions must be 0 + assertEquals(0, (int) v4List.get(13)); + assertEquals(0, (int) v5List.get(13)); + + int batchesDistributed = v4List.get(4) + v5List.get(4); + checkOnlyCompleteTransactionsAreReplicated(regionName, eventsPerTransaction, + batchesDistributed); + } + + private void checkOnlyCompleteTransactionsAreReplicatedAfterSenderRestarted(String regionName, + int eventsPerTransaction) { + // Wait for sender queues to be empty + List<Integer> v4List = + vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v5List = + vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + + assertEquals(0, v4List.get(0) + v5List.get(0)); + + // batches with incomplete transactions must be 0 + assertEquals(0, (int) v4List.get(13)); + assertEquals(0, (int) v5List.get(13)); + + waitForBatchesToBeAppliedInTheReceiver(regionName, eventsPerTransaction); + + int batchesDistributed = v4List.get(4) + v5List.get(4); + checkOnlyCompleteTransactionsAreReplicated(regionName, eventsPerTransaction, + batchesDistributed); + } + + private void checkOnlyCompleteTransactionsAreReplicated(String regionName, + int eventsPerTransaction, int batchesDistributed) { + int regionSize = vm2.invoke(() -> getRegionSize(regionName)); + + // The number of entries must be divisible by the number of events per transaction + assertEquals(0, regionSize % eventsPerTransaction); + + // Check the entries replicated against the number of batches distributed + vm2.invoke(() -> WANTestBase.validateRegionSize(regionName, + batchesDistributed * eventsPerTransaction)); + } + + private AsyncInvocation<Void> asyncExecuteTransactions(String regionName, + int eventsPerTransaction, int entries) { + final Map<Object, Object> keyValues = new LinkedHashMap<>(); + for (int i = 0; i < entries; i++) { + keyValues.put(i, i + "_Value"); + } + + return vm4.invokeAsync( + () -> WANTestBase.doPutsInsideTransactions(regionName, keyValues, + eventsPerTransaction)); + } + + private void waitForBatchesToBeAppliedInTheReceiver(String regionName, int eventsPerTransaction) { + int batchesSentTotal = vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1)).get(4) + + vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1)).get(4); + + // Wait for all batches to be received by the sender Review comment: It looks like this got missed in the last set of changed, but other than that, everything looks good. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Batches with incomplete transactions when stopping the gateway sender > --------------------------------------------------------------------- > > Key: GEODE-8971 > URL: https://issues.apache.org/jira/browse/GEODE-8971 > Project: Geode > Issue Type: Improvement > Components: wan > Affects Versions: 1.14.0 > Reporter: Alberto Gomez > Assignee: Alberto Gomez > Priority: Major > Labels: pull-request-available > > When the gateway sender is stopped there is a high probability that batches > with incomplete transactions are sent even if group-transaction-events is > enabled. > The reason is that once the stop command reaches the gateway sender, it > immediately stops queueing events, and this could happen in the middle of > receiving events for the same transaction. If this is the case, some events > for the transaction may have reached the queue right before the stop command > was received and the rest of events for that transaction would not make it to > the queue (they would be dropped) because they arrived right after the stop > command was received at the gateway sender. -- This message was sent by Atlassian Jira (v8.3.4#803005)