[ https://issues.apache.org/jira/browse/GEODE-8971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17298459#comment-17298459 ]
ASF GitHub Bot commented on GEODE-8971: --------------------------------------- DonalEvans commented on a change in pull request #6052: URL: https://github.com/apache/geode/pull/6052#discussion_r590864147 ########## File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java ########## @@ -348,7 +349,201 @@ public void testReplicatedSerialPropagationWithBatchRedistWithGroupTransactionEv } @Test - public void testReplicatedSerialPropagationWithMultipleDispatchers() throws Exception { + public void testReplicatedSerialPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted() + throws InterruptedException { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + + String regionName = testName + "_RR"; + + createCacheInVMs(nyPort, vm2); + createReceiverInVMs(vm2); + vm2.invoke(() -> WANTestBase.createReplicatedRegion(regionName, null, isOffHeap())); + + createCacheInVMs(lnPort, vm4, vm5); + vm4.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap())); + vm5.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap())); + + boolean groupTransactionEvents = true; + int batchSize = 10; + vm4.invoke( + () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, false, + groupTransactionEvents)); + vm5.invoke( + () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, false, + groupTransactionEvents)); + + int eventsPerTransaction = batchSize + 1; + // The number of entries must be big enough so that not all entries + // are replicated before the sender is stopped and also divisible by eventsPerTransaction + int entries = 2200; + // Execute some transactions + AsyncInvocation<Void> inv1 = + asyncExecuteTransactions(regionName, eventsPerTransaction, entries); + + // wait for batches to be distributed and then stop the sender + vm4.invoke(() -> await() + .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0)); + + // These exceptions are ignored here because it could happen that when an event + // is to be handled, the sender is stopped. The sender, when stopped, shuts down + // the thread pool that would handle the event and this could provoke the exception. + addIgnoredException("Exception occurred in CacheListener"); + addIgnoredException(RejectedExecutionException.class); + + // Stop the sender + stopSenderInVMsAsync("ln", vm4, vm5); + + // Wait for transactions to finish + inv1.await(); + vm4.invoke(() -> WANTestBase.validateRegionSize(regionName, entries)); + vm5.invoke(() -> WANTestBase.validateRegionSize(regionName, entries)); + + // Check + checkOnlyCompleteTransactionsAreReplicatedAfterSenderStopped(regionName, + eventsPerTransaction); + + // Start the sender + startSenderInVMsAsync("ln", vm4, vm5); + + // Check + checkOnlyCompleteTransactionsAreReplicatedAfterSenderRestarted(regionName, + eventsPerTransaction); + } + + @Test + public void testReplicatedSerialPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStartedReceiverStopped() + throws InterruptedException { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + + String regionName = testName + "_RR"; + + createCacheInVMs(nyPort, vm2); + createReceiverInVMs(vm2); + vm2.invoke(() -> WANTestBase.createReplicatedRegion(regionName, null, isOffHeap())); + vm2.invoke(WANTestBase::stopReceivers); + + createCacheInVMs(lnPort, vm4, vm5); + vm4.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap())); + vm5.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap())); + + boolean groupTransactionEvents = true; + int batchSize = 10; + vm4.invoke( + () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, false, + groupTransactionEvents)); + vm5.invoke( + () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, false, + groupTransactionEvents)); + + int eventsPerTransaction = batchSize + 1; + // The number of entries must be big enough so that not all entries + // are replicated before the sender is stopped and also divisible by eventsPerTransaction + int entries = 2200; + // Execute some transactions + AsyncInvocation<Void> inv1 = + asyncExecuteTransactions(regionName, eventsPerTransaction, entries); + + // wait for batches to be redistributed and then stop the sender + vm4.invoke(() -> await() + .until(() -> WANTestBase.getSenderStats("ln", -1).get(5) > 0)); + + // Stop the sender + stopSenderInVMsAsync("ln", vm4, vm5); + + // Wait for transactions to finish + inv1.await(); + vm4.invoke(() -> WANTestBase.validateRegionSize(regionName, entries)); + vm5.invoke(() -> WANTestBase.validateRegionSize(regionName, entries)); + + // Start the receiver and the sender + vm2.invoke(WANTestBase::startReceivers); + startSenderInVMsAsync("ln", vm4, vm5); + + // Check + checkOnlyCompleteTransactionsAreReplicatedAfterSenderRestarted(regionName, + eventsPerTransaction); + } + + private void checkOnlyCompleteTransactionsAreReplicatedAfterSenderStopped(String regionName, + int eventsPerTransaction) { + waitForBatchesToBeAppliedInTheReceiver(regionName, eventsPerTransaction); + + List<Integer> v4List = + vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1)); + List<Integer> v5List = + vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1)); + + // batches with incomplete transactions must be 0 + assertEquals(0, (int) v4List.get(13)); + assertEquals(0, (int) v5List.get(13)); + + int batchesDistributed = v4List.get(4) + v5List.get(4); + checkOnlyCompleteTransactionsAreReplicated(regionName, eventsPerTransaction, + batchesDistributed); + } + + private void checkOnlyCompleteTransactionsAreReplicatedAfterSenderRestarted(String regionName, + int eventsPerTransaction) { + // Wait for sender queues to be empty + List<Integer> v4List = + vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v5List = + vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + + assertEquals(0, v4List.get(0) + v5List.get(0)); + + // batches with incomplete transactions must be 0 + assertEquals(0, (int) v4List.get(13)); + assertEquals(0, (int) v5List.get(13)); + + waitForBatchesToBeAppliedInTheReceiver(regionName, eventsPerTransaction); + + int batchesDistributed = v4List.get(4) + v5List.get(4); + checkOnlyCompleteTransactionsAreReplicated(regionName, eventsPerTransaction, + batchesDistributed); + } + + private void checkOnlyCompleteTransactionsAreReplicated(String regionName, + int eventsPerTransaction, int batchesDistributed) { + int regionSize = vm2.invoke(() -> getRegionSize(regionName)); + + // The number of entries must be divisible by the number of events per transaction + assertEquals(0, regionSize % eventsPerTransaction); + + // Check the entries replicated against the number of batches distributed + vm2.invoke(() -> WANTestBase.validateRegionSize(regionName, + batchesDistributed * eventsPerTransaction)); + } + + private AsyncInvocation<Void> asyncExecuteTransactions(String regionName, + int eventsPerTransaction, int entries) { + final Map<Object, Object> keyValues = new LinkedHashMap<>(); + for (int i = 0; i < entries; i++) { + keyValues.put(i, i + "_Value"); + } + + return vm4.invokeAsync( + () -> WANTestBase.doPutsInsideTransactions(regionName, keyValues, + eventsPerTransaction)); + } + + private void waitForBatchesToBeAppliedInTheReceiver(String regionName, int eventsPerTransaction) { + int batchesSentTotal = vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1)).get(4) + + vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1)).get(4); + + // Wait for all batches to be received by the sender Review comment: I think that this should probably be "received by the receiver" ########## File path: geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImplTest.java ########## @@ -88,5 +97,36 @@ public void testStartWithCleanQueue() { assertTrue(((ConcurrentParallelGatewaySenderQueue) queue).getCleanQueues()); } + @Test + public void whenStoppedTwiceCloseInTimeWithGroupTransactionEventsPreStopWaitsTwice() { + gatewaysender.start(); + + long start = System.currentTimeMillis(); + + Thread t1 = new Thread(this::stopGatewaySenderAndCheckTime); + Thread t2 = new Thread(this::stopGatewaySenderAndCheckTime); + t1.start(); + t2.start(); + try { + t1.join(); + t2.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + long finish = System.currentTimeMillis(); + long timeElapsed = finish - start; + // Each call to preStop waits for 1 second but they are not serialized Review comment: I think this comment would be clearer if it read "Each call to preStop waits for 1 second but these waits execute in parallel" ########## File path: geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImplTest.java ########## @@ -102,6 +102,8 @@ private SerialGatewaySenderImpl createSerialGatewaySenderImplSpy() { doReturn(null).when(spySerialGatewaySender).getQueues(); + doReturn(true).when(spySerialGatewaySender).mustGroupTransactionEvents(); Review comment: The value returned here should only be `true` in tests that specifically want to test the behaviour of group transaction events, since the default value is `false`. Maybe the desired value could be passed as an argument to this `createSerialGatewaySenderImplSpy()` method ########## File path: geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImplTest.java ########## @@ -51,11 +54,17 @@ public void setUp() { attrs = new GatewaySenderAttributes(); attrs.isParallel = true; attrs.id = "sender"; + attrs.groupTransactionEvents = true; Review comment: I think it would be best not to set this value in the `setUp()` method for this class, as it's not a default value, and could lead to unexpected behaviour if someone else tries to add tests to this class in the future. Only the test cases that specifically rely on `groupTransactionEvents` being `true` should have it set. ########## File path: geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImplTest.java ########## @@ -132,4 +134,37 @@ public void whenStoppedShouldResetTheEventProcessor() { assertThat(serialGatewaySender.getEventProcessor()).isNull(); } + @Test + public void whenStoppedTwiceCloseInTimeWithGroupTransactionEventsPreStopWaitsTwice() { + serialGatewaySender = createSerialGatewaySenderImplSpy(); + + long start = System.currentTimeMillis(); + + Thread t1 = new Thread(this::stopGatewaySenderAndCheckTime); + Thread t2 = new Thread(this::stopGatewaySenderAndCheckTime); + t1.start(); + t2.start(); + try { + t1.join(); + t2.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + long finish = System.currentTimeMillis(); + long timeElapsed = finish - start; + + // Each call to preStop waits for 1 second but they are not serialized Review comment: I think this comment would be clearer if it read "Each call to preStop waits for 1 second but these waits execute in parallel" ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Batches with incomplete transactions when stopping the gateway sender > --------------------------------------------------------------------- > > Key: GEODE-8971 > URL: https://issues.apache.org/jira/browse/GEODE-8971 > Project: Geode > Issue Type: Improvement > Components: wan > Affects Versions: 1.14.0 > Reporter: Alberto Gomez > Assignee: Alberto Gomez > Priority: Major > Labels: pull-request-available > > When the gateway sender is stopped there is a high probability that batches > with incomplete transactions are sent even if group-transaction-events is > enabled. > The reason is that once the stop command reaches the gateway sender, it > immediately stops queueing events, and this could happen in the middle of > receiving events for the same transaction. If this is the case, some events > for the transaction may have reached the queue right before the stop command > was received and the rest of events for that transaction would not make it to > the queue (they would be dropped) because they arrived right after the stop > command was received at the gateway sender. -- This message was sent by Atlassian Jira (v8.3.4#803005)