[ https://issues.apache.org/jira/browse/GEODE-7971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17103023#comment-17103023 ]
ASF GitHub Bot commented on GEODE-7971: --------------------------------------- DonalEvans commented on a change in pull request #4928: URL: https://github.com/apache/geode/pull/4928#discussion_r422427262 ########## File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java ########## @@ -299,19 +312,316 @@ public void testPartitionedRegionParallelPropagation_AfterDispatch_NoRedundancy( (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)); // queue size - assertEquals(NUM_PUTS, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); // eventsReceived + assertEquals(NUM_PUTS, + v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); // eventsReceived assertEquals(NUM_PUTS, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); // events - // queued + // queued assertEquals(NUM_PUTS, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); // events - // distributed + // distributed assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10); // batches - // distributed + // distributed assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); // batches - // redistributed + // redistributed vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS)); } + @Test + public void testPartitionedRegionParallelPropagation_NoGroupTransactionEvents() + throws Exception { + Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + + createCacheInVMs(nyPort, vm2); + createReceiverInVMs(vm2); + + int batchTimeInterval = 10000; + createSenders(lnPort, false, batchTimeInterval); + + createReceiverCustomerOrderShipmentPR(vm2, 0); + + createSenderCustomerOrderShipmentPRs(0); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + final Map custKeyValue = new HashMap(); + int intCustId = 1; + CustId custId = new CustId(intCustId); + custKeyValue.put(custId, new Customer()); + vm4.invoke(() -> WANTestBase.putGivenKeyValue(customerRegionName, custKeyValue)); + + int transactions = 3; + final Map keyValues = new HashMap(); + for (int i = 0; i < transactions; i++) { + OrderId orderId = new OrderId(i, custId); + ShipmentId shipmentId1 = new ShipmentId(i, orderId); + ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId); + ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId); + keyValues.put(orderId, new Order()); + keyValues.put(shipmentId1, new Shipment()); + keyValues.put(shipmentId2, new Shipment()); + keyValues.put(shipmentId3, new Shipment()); + } + int eventsPerTransaction = 4; + vm4.invoke(() -> WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValues, + eventsPerTransaction)); + + int entries = (transactions * eventsPerTransaction) + 1; + + vm4.invoke(() -> WANTestBase.validateRegionSize(customerRegionName, 1)); + vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, transactions)); + vm4.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, transactions * 3)); + + ArrayList<Integer> v4List = + (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + ArrayList<Integer> v5List = + (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + ArrayList<Integer> v6List = + (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + ArrayList<Integer> v7List = + (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + + assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)); // queue size + assertEquals(entries, + v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); // eventsReceived + assertEquals(entries, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); // events + // queued + assertEquals(entries, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); // events + // distributed + assertEquals(2, v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4)); // batches + // distributed + assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); // batches + // redistributed + } + + @Test + public void testPartitionedRegionParallelPropagation_GroupTransactionEvents() Review comment: This test name could be more descriptive. Also, the `throws` is not needed. ########## File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java ########## @@ -503,17 +815,16 @@ public void testParallelPropagationWithRemoteRegionDestroy() throws Exception { ArrayList<Integer> v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", -1)); - assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 1); // batches - // distributed : - // its quite - // possible that - // vm4 has - // distributed - // some of the - // batches. + // distributed : + // its quite + // possible that + // vm4 has + // distributed + // some of the + // batches. assertTrue(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5) >= 1); // batches - // redistributed + // redistributed Review comment: More untidy comments. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Gateway sender to deliver transaction events atomically to gateway receivers > ---------------------------------------------------------------------------- > > Key: GEODE-7971 > URL: https://issues.apache.org/jira/browse/GEODE-7971 > Project: Geode > Issue Type: Improvement > Components: wan > Reporter: Alberto Gomez > Assignee: Alberto Gomez > Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > > The goal of this ticket is to implement the necessary changes in the gateway > sender to prevent that events belonging to the same transaction are spread > across different batches. In other words, to ensure that events from the same > transaction are sent inside the same batch. > This will be an optional feature on gateway senders to be enabled via a new > parameter (--group-transaction-events) and will be restricted to serial > gateway senders with just one dispatcher thread or to parallel gateway > senders. > Apart from the above restriction, grouping of events for a transaction inside > the same batch may only be attained if the regions to which the events belong > are replicated by the same set of gateway senders with the > --group-transaction-events flag enabled. If this condition is not met, the > events will be correctly delivered by the gateway senders but it will not be > guaranteed that all events will always be sent inside the same batch. > For more details see: > [https://cwiki.apache.org/confluence/display/GEODE/Gw+sender+to+deliver+transaction+events+atomically+to+receivers] > -- This message was sent by Atlassian Jira (v8.3.4#803005)