[ https://issues.apache.org/jira/browse/GEODE-8971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17298575#comment-17298575 ]
ASF GitHub Bot commented on GEODE-8971: --------------------------------------- albertogpz commented on a change in pull request #6052: URL: https://github.com/apache/geode/pull/6052#discussion_r591130988 ########## File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java ########## @@ -472,6 +473,200 @@ public void testPRParallelPropagationWithGroupTransactionEventsSendsBatchesWithC } + @Test + public void testPRParallelPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted() + throws InterruptedException { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + + createCacheInVMs(nyPort, vm2); + createReceiverCustomerOrderShipmentPR(vm2); + createReceiverInVMs(vm2); + + createCacheInVMs(lnPort, vm4, vm5); + createSenderCustomerOrderShipmentPRs(vm4); + createSenderCustomerOrderShipmentPRs(vm5); + + int batchSize = 10; + vm4.invoke( + () -> WANTestBase.createSender("ln", 2, true, 100, batchSize, false, true, null, false, + true)); + vm5.invoke( + () -> WANTestBase.createSender("ln", 2, true, 100, batchSize, false, true, null, false, + true)); + + int customers = 4; + int transactionsPerCustomer = 100; + // Each transaction will contain one order plus the following shipments + int shipmentsPerTransaction = batchSize; + AsyncInvocation<Void> inv1 = asyncExecuteCustomerTransactions(vm4, customers, + transactionsPerCustomer, shipmentsPerTransaction); + + // wait for some batches to be distributed and then stop the sender + vm4.invoke(() -> await() + .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0)); + + stopSenderInVMsAsync("ln", vm4, vm5); + + // Wait for customer transactions to finish + inv1.await(); + int orderEntries = transactionsPerCustomer * customers; + int shipmentEntries = orderEntries * 10; + vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, orderEntries)); + vm5.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, orderEntries)); + vm4.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, shipmentEntries)); + vm5.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, shipmentEntries)); + + checkOnlyCompleteTransactionsAreReplicatedAfterSenderStopped( + shipmentsPerTransaction); + + // Start sender to validate that queued events do not contain incomplete transactions after + // restart + startSenderInVMsAsync("ln", vm4, vm5); + + checkOnlyCompleteTransactionsAreReplicatedWithSenderRestarted(shipmentsPerTransaction); + } + + @Test + public void testPRParallelPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStartedReceiverStopped() + throws InterruptedException { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + + createCacheInVMs(nyPort, vm2); + createReceiverCustomerOrderShipmentPR(vm2); + createReceiverInVMs(vm2); + vm2.invoke(WANTestBase::stopReceivers); + + createCacheInVMs(lnPort, vm4, vm5); + createSenderCustomerOrderShipmentPRs(vm4); + createSenderCustomerOrderShipmentPRs(vm5); + + int batchSize = 10; + vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, false, + true)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, false, + true)); + + int customers = 4; + int transactionsPerCustomer = 100; + // Each transaction will contain one order plus the following shipments + int shipmentsPerTransaction = batchSize; + + AsyncInvocation<Void> inv1 = asyncExecuteCustomerTransactions(vm4, customers, + transactionsPerCustomer, shipmentsPerTransaction); + + // wait for some batches to be redistributed and then stop the sender + vm4.invoke(() -> await() + .until(() -> WANTestBase.getSenderStats("ln", -1).get(5) > 0)); + + stopSenderInVMsAsync("ln", vm4, vm5); + + // Wait for the customer transactions to finish + inv1.await(); + int orderEntries = transactionsPerCustomer * customers; + int shipmentEntries = orderEntries * 10; + vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, orderEntries)); + vm5.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, orderEntries)); + vm4.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, shipmentEntries)); + vm5.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, shipmentEntries)); + + // Start receiver and sender + vm2.invoke(WANTestBase::startReceivers); + startSenderInVMsAsync("ln", vm4, vm5); + + checkOnlyCompleteTransactionsAreReplicatedWithSenderRestarted(shipmentsPerTransaction); + } + + private AsyncInvocation<Void> asyncExecuteCustomerTransactions(VM vm, int customers, + int transactionsPerCustomer, int shipmentsPerTransaction) { + final Map<Object, Object> keyValuesInTransactions = new LinkedHashMap<>(); + for (int custId = 0; custId < customers; custId++) { + for (int i = 0; i < transactionsPerCustomer; i++) { + CustId custIdObject = new CustId(custId); + OrderId orderId = new OrderId(i, custIdObject); + keyValuesInTransactions.put(orderId, new Order()); + for (int j = 0; j < shipmentsPerTransaction; j++) { + ShipmentId shipmentId = new ShipmentId(i + j, orderId); + keyValuesInTransactions.put(shipmentId, new Shipment()); + } + } + } + int eventsPerTransaction = 1 + shipmentsPerTransaction; + return vm.invokeAsync( + () -> WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValuesInTransactions, + eventsPerTransaction)); + } + + private void checkOnlyCompleteTransactionsAreReplicatedAfterSenderStopped( + int shipmentsPerTransaction) { + waitForBatchesToBeAppliedInTheReceiver(shipmentsPerTransaction); + + List<Integer> v4List = + vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1)); + List<Integer> v5List = + vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1)); + + // batches with incomplete transactions must be 0 + assertEquals(0, (int) v4List.get(13)); + assertEquals(0, (int) v5List.get(13)); + + // Check the entries replicated against the number of batches distributed + int batchesDistributed = v4List.get(4) + v5List.get(4); + checkOnlyCompleteTransactionsAreReplicated(shipmentsPerTransaction, batchesDistributed); + } + + private void checkOnlyCompleteTransactionsAreReplicatedWithSenderRestarted( + int shipmentsPerTransaction) { + // Wait for sender queues to be drained + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + + List<Integer> v4List = + vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + List<Integer> v5List = + vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + assertEquals(0, v4List.get(0) + v5List.get(0)); + + // batches with incomplete transactions must be 0 + assertEquals(0, (int) v4List.get(13)); + assertEquals(0, (int) v5List.get(13)); + + waitForBatchesToBeAppliedInTheReceiver(shipmentsPerTransaction); + + // Check the entries replicated against the number of batches distributed + int batchesDistributed = v4List.get(4) + v5List.get(4); + checkOnlyCompleteTransactionsAreReplicated(shipmentsPerTransaction, batchesDistributed); + } + + private void checkOnlyCompleteTransactionsAreReplicated(int shipmentsPerTransaction, + int batchesDistributed) { + // Only complete transactions (1 order + 10 shipments) must be replicated + int orderRegionSize = vm2.invoke(() -> getRegionSize(orderRegionName)); + int shipmentRegionSize = vm2.invoke(() -> getRegionSize(shipmentRegionName)); + assertEquals(shipmentRegionSize, 10 * orderRegionSize); Review comment: Agree ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Batches with incomplete transactions when stopping the gateway sender > --------------------------------------------------------------------- > > Key: GEODE-8971 > URL: https://issues.apache.org/jira/browse/GEODE-8971 > Project: Geode > Issue Type: Improvement > Components: wan > Affects Versions: 1.14.0 > Reporter: Alberto Gomez > Assignee: Alberto Gomez > Priority: Major > Labels: pull-request-available > > When the gateway sender is stopped there is a high probability that batches > with incomplete transactions are sent even if group-transaction-events is > enabled. > The reason is that once the stop command reaches the gateway sender, it > immediately stops queueing events, and this could happen in the middle of > receiving events for the same transaction. If this is the case, some events > for the transaction may have reached the queue right before the stop command > was received and the rest of events for that transaction would not make it to > the queue (they would be dropped) because they arrived right after the stop > command was received at the gateway sender. -- This message was sent by Atlassian Jira (v8.3.4#803005)