[ https://issues.apache.org/jira/browse/GEODE-8765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17247444#comment-17247444 ]
ASF GitHub Bot commented on GEODE-8765: --------------------------------------- DonalEvans commented on a change in pull request #5829: URL: https://github.com/apache/geode/pull/5829#discussion_r540364988 ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java ########## @@ -1389,6 +1383,21 @@ private void peekEventsFromIncompleteTransactions(List<GatewaySenderEventImpl> b } } + private Map<TransactionId, Integer> getIncompleteTransactionsInBatch(List batch) { Review comment: The compiler warning here can be fixed if `List<GatewaySenderEventImpl>` is used as the method argument. This also means that the for loop below can be replaced with ``` for (GatewaySenderEventImpl event : batch) { ``` and the cast to `GatewaySenderEventImpl` can be removed. ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java ########## @@ -1389,6 +1383,21 @@ private void peekEventsFromIncompleteTransactions(List<GatewaySenderEventImpl> b } } + private Map<TransactionId, Integer> getIncompleteTransactionsInBatch(List batch) { + Map<TransactionId, Integer> incompleteTransactionsInBatch = new HashMap<>(); + for (Object object : batch) { + GatewaySenderEventImpl event = (GatewaySenderEventImpl) object; + if (event.getTransactionId() != null) { + if (event.isLastEventInTransaction()) { + incompleteTransactionsInBatch.remove(event.getTransactionId()); + } else { + incompleteTransactionsInBatch.put(event.getTransactionId(), event.getBucketId()); + } + } + } + return incompleteTransactionsInBatch; + } + private boolean areAllTransactionsCompleteInBatch(Map incompleteTransactions) { Review comment: While not part of the changes in this PR, this method could be inlined, since it's used in only one place and is only one line. ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java ########## @@ -534,6 +523,23 @@ private boolean areAllTransactionsCompleteInBatch(Set incompleteTransactions) { return (incompleteTransactions.size() == 0); Review comment: As with the similar method in `ParallelGatewaySenderQueue`, this method could also be inlined. ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java ########## @@ -487,12 +476,12 @@ public Object peek() throws CacheException { // so no need to worry about off-heap refCount. } - private void peekEventsFromIncompleteTransactions(List<AsyncEvent> batch, - Set<TransactionId> incompleteTransactionIdsInBatch, long lastKey) { + private void peekEventsFromIncompleteTransactions(List<AsyncEvent> batch, long lastKey) { Review comment: The compiler warning on this line and several others can be resolved by making `batch` a `List<AsyncEvent<?,?>>` here and the other places it's used. The lines requiring this change are 416, 421, 431-432, 438, 479 and 526. ########## File path: geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java ########## @@ -148,6 +149,7 @@ public void testGetElementsMatchingWithHasTransactionIdPredicateAndIsLastEventIn TransactionId tx3 = new TXId(null, 3); GatewaySenderEventImpl event1 = createMockGatewaySenderEvent(1, tx1, false); + GatewaySenderEventImpl eventNotInTransaction1 = createMockGatewaySenderEvent(8, null, false); Review comment: To make the numbering a little more consistent, could this event's key be 2, and the subsequent events have their key values increased by 1? That way the calls to `bucketRegionQueue.addToQueue()` will have matching keys with the keys here. ########## File path: geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java ########## @@ -159,17 +161,18 @@ public void testGetElementsMatchingWithHasTransactionIdPredicateAndIsLastEventIn .cleanUpDestroyedTokensAndMarkGIIComplete(InitialImageOperation.GIIStatus.NO_GII); this.bucketRegionQueue.addToQueue(Long.valueOf(1), event1); - this.bucketRegionQueue.addToQueue(Long.valueOf(2), event2); - this.bucketRegionQueue.addToQueue(Long.valueOf(3), event3); - this.bucketRegionQueue.addToQueue(Long.valueOf(4), event4); - this.bucketRegionQueue.addToQueue(Long.valueOf(5), event5); - this.bucketRegionQueue.addToQueue(Long.valueOf(6), event6); - this.bucketRegionQueue.addToQueue(Long.valueOf(7), event7); + this.bucketRegionQueue.addToQueue(Long.valueOf(2), eventNotInTransaction1); + this.bucketRegionQueue.addToQueue(Long.valueOf(3), event2); + this.bucketRegionQueue.addToQueue(Long.valueOf(4), event3); + this.bucketRegionQueue.addToQueue(Long.valueOf(5), event4); + this.bucketRegionQueue.addToQueue(Long.valueOf(6), event5); + this.bucketRegionQueue.addToQueue(Long.valueOf(7), event6); + this.bucketRegionQueue.addToQueue(Long.valueOf(8), event7); Review comment: These calls to `Long.valueOf()` can be replaced with just `1L`, `2L` etc. ########## File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java ########## @@ -1235,6 +1242,107 @@ public void testParallelPropagationTxNotificationsNotSentToAllRegionMembersWhenA vm5.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln")); } + @Test + public void testPartitionedParallelPropagationWithGroupTransactionEventsAndMixOfEventsInAndNotInTransactions() + throws Exception { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke( + () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true, true, 2)); + vm5.invoke( + () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true, true, 2)); + vm6.invoke( + () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true, true, 2)); + vm7.invoke( + () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true, true, 2)); + + vm4.invoke( + () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10, + isOffHeap())); + vm5.invoke( + () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10, + isOffHeap())); + vm6.invoke( + () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10, + isOffHeap())); + vm7.invoke( + () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10, + isOffHeap())); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm2.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 8, isOffHeap())); + vm3.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 8, isOffHeap())); + + int customers = 4; + + int transactionsPerCustomer = 1000; + final Map keyValuesInTransactions = new HashMap(); + for (int custId = 0; custId < customers; custId++) { + for (int i = 0; i < transactionsPerCustomer; i++) { + CustId custIdObject = new CustId(custId); + OrderId orderId = new OrderId(i, custIdObject); + ShipmentId shipmentId1 = new ShipmentId(i, orderId); + ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId); + ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId); + keyValuesInTransactions.put(orderId, new Order()); + keyValuesInTransactions.put(shipmentId1, new Shipment()); + keyValuesInTransactions.put(shipmentId2, new Shipment()); + keyValuesInTransactions.put(shipmentId3, new Shipment()); + } + } + + int ordersPerCustomerNotInTransactions = 1000; + + final Map keyValuesNotInTransactions = new HashMap(); Review comment: Compiler warnings on this line can be fixed by using ``` final Map<Object, Object> keyValuesNotInTransactions = new HashMap<>(); ``` ########## File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java ########## @@ -1235,6 +1242,107 @@ public void testParallelPropagationTxNotificationsNotSentToAllRegionMembersWhenA vm5.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln")); } + @Test + public void testPartitionedParallelPropagationWithGroupTransactionEventsAndMixOfEventsInAndNotInTransactions() + throws Exception { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke( + () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true, true, 2)); + vm5.invoke( + () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true, true, 2)); + vm6.invoke( + () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true, true, 2)); + vm7.invoke( + () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true, true, 2)); + + vm4.invoke( + () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10, + isOffHeap())); + vm5.invoke( + () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10, + isOffHeap())); + vm6.invoke( + () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10, + isOffHeap())); + vm7.invoke( + () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10, + isOffHeap())); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm2.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 8, isOffHeap())); + vm3.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 8, isOffHeap())); + + int customers = 4; + + int transactionsPerCustomer = 1000; + final Map keyValuesInTransactions = new HashMap(); + for (int custId = 0; custId < customers; custId++) { + for (int i = 0; i < transactionsPerCustomer; i++) { + CustId custIdObject = new CustId(custId); + OrderId orderId = new OrderId(i, custIdObject); + ShipmentId shipmentId1 = new ShipmentId(i, orderId); + ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId); + ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId); + keyValuesInTransactions.put(orderId, new Order()); + keyValuesInTransactions.put(shipmentId1, new Shipment()); + keyValuesInTransactions.put(shipmentId2, new Shipment()); + keyValuesInTransactions.put(shipmentId3, new Shipment()); + } + } + + int ordersPerCustomerNotInTransactions = 1000; + + final Map keyValuesNotInTransactions = new HashMap(); + for (int custId = 0; custId < customers; custId++) { + for (int i = 0; i < ordersPerCustomerNotInTransactions; i++) { + CustId custIdObject = new CustId(custId); + OrderId orderId = new OrderId(i + transactionsPerCustomer * customers, custIdObject); + keyValuesNotInTransactions.put(orderId, new Order()); + } + } + + // eventsPerTransaction is 1 (orders) + 3 (shipments) + int eventsPerTransaction = 4; + AsyncInvocation inv1 = Review comment: The compiler warning here can be removed by using `AsyncInvocation<Void>`. ########## File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java ########## @@ -1235,6 +1242,107 @@ public void testParallelPropagationTxNotificationsNotSentToAllRegionMembersWhenA vm5.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln")); } + @Test + public void testPartitionedParallelPropagationWithGroupTransactionEventsAndMixOfEventsInAndNotInTransactions() + throws Exception { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke( + () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true, true, 2)); + vm5.invoke( + () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true, true, 2)); + vm6.invoke( + () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true, true, 2)); + vm7.invoke( + () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true, true, 2)); + + vm4.invoke( + () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10, + isOffHeap())); + vm5.invoke( + () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10, + isOffHeap())); + vm6.invoke( + () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10, + isOffHeap())); + vm7.invoke( + () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10, + isOffHeap())); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm2.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 8, isOffHeap())); + vm3.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 8, isOffHeap())); + + int customers = 4; + + int transactionsPerCustomer = 1000; + final Map keyValuesInTransactions = new HashMap(); Review comment: Compiler warnings on this line can be fixed by using ``` final Map<Object, Object> keyValuesInTransactions = new HashMap<>(); ``` ########## File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java ########## @@ -371,4 +379,107 @@ public void testPartitionedSerialPropagationWithParallelThreads() throws Excepti vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000)); vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000)); } + + @Test + public void testPartitionedSerialPropagationWithGroupTransactionEventsAndMixOfEventsInAndNotInTransactions() Review comment: All comments that apply to `ParallelWANPropagationDUnitTest` also apply to this test method, since the code in both tests is largely identical. ########## File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java ########## @@ -1737,26 +1737,37 @@ public static GatewaySenderFactory configureGateway(DiskStoreFactory dsf, File[] return gateway; } + public static void createSender(String dsName, int remoteDsId, boolean isParallel, + Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent, + GatewayEventFilter filter, boolean isManualStart, boolean groupTransactionEvents) { + createSender(dsName, remoteDsId, isParallel, maxMemory, batchSize, isConflation, isPersistent, + filter, isManualStart, groupTransactionEvents, 0); + } + public static void createSender(String dsName, int remoteDsId, boolean isParallel, Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent, GatewayEventFilter filter, boolean isManualStart) { createSender(dsName, remoteDsId, isParallel, maxMemory, batchSize, isConflation, isPersistent, - filter, isManualStart, false); + filter, isManualStart, false, 0); } public static void createSender(String dsName, int remoteDsId, boolean isParallel, Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent, - GatewayEventFilter filter, boolean isManualStart, boolean groupTransactionEvents) { + GatewayEventFilter filter, boolean isManualStart, boolean groupTransactionEvents, + int dispatcherThreads) { Review comment: Is it necessary to create a new overloaded method here? There is already a method `setNumDispatcherThreadsForTheRun()` in `WANTestBase` that allows this value to be set to whatever is desired. ########## File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java ########## @@ -1235,6 +1242,107 @@ public void testParallelPropagationTxNotificationsNotSentToAllRegionMembersWhenA vm5.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln")); } + @Test + public void testPartitionedParallelPropagationWithGroupTransactionEventsAndMixOfEventsInAndNotInTransactions() + throws Exception { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke( + () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true, true, 2)); + vm5.invoke( + () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true, true, 2)); + vm6.invoke( + () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true, true, 2)); + vm7.invoke( + () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true, true, 2)); + + vm4.invoke( + () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10, + isOffHeap())); + vm5.invoke( + () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10, + isOffHeap())); + vm6.invoke( + () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10, + isOffHeap())); + vm7.invoke( + () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 2, 10, + isOffHeap())); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm2.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 8, isOffHeap())); + vm3.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 8, isOffHeap())); + + int customers = 4; + + int transactionsPerCustomer = 1000; + final Map keyValuesInTransactions = new HashMap(); + for (int custId = 0; custId < customers; custId++) { + for (int i = 0; i < transactionsPerCustomer; i++) { + CustId custIdObject = new CustId(custId); + OrderId orderId = new OrderId(i, custIdObject); + ShipmentId shipmentId1 = new ShipmentId(i, orderId); + ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId); + ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId); + keyValuesInTransactions.put(orderId, new Order()); + keyValuesInTransactions.put(shipmentId1, new Shipment()); + keyValuesInTransactions.put(shipmentId2, new Shipment()); + keyValuesInTransactions.put(shipmentId3, new Shipment()); + } + } + + int ordersPerCustomerNotInTransactions = 1000; + + final Map keyValuesNotInTransactions = new HashMap(); + for (int custId = 0; custId < customers; custId++) { + for (int i = 0; i < ordersPerCustomerNotInTransactions; i++) { + CustId custIdObject = new CustId(custId); + OrderId orderId = new OrderId(i + transactionsPerCustomer * customers, custIdObject); + keyValuesNotInTransactions.put(orderId, new Order()); + } + } + + // eventsPerTransaction is 1 (orders) + 3 (shipments) + int eventsPerTransaction = 4; + AsyncInvocation inv1 = + vm7.invokeAsync( + () -> WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValuesInTransactions, + eventsPerTransaction)); + + AsyncInvocation inv2 = Review comment: The compiler warning here can be removed by using `AsyncInvocation<Void>`. ########## File path: geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java ########## @@ -138,7 +139,7 @@ public void testBasicDestroyConflationEnabledAndValueNotInRegion() { } @Test - public void testGetElementsMatchingWithHasTransactionIdPredicateAndIsLastEventInTransactionPredicate() + public void testGetElementsMatchingWithParallelGatewaySenderQueuePredicatesAndSomeEventsNotInTransactions() throws ForceReattemptException { ParallelGatewaySenderEventProcessor processor = ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender); Review comment: This variable is never used, so these lines could be just ``` ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender); ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > NullPointerException if group-transaction-events enabled and mix of puts with > transactions and without transactions > ------------------------------------------------------------------------------------------------------------------- > > Key: GEODE-8765 > URL: https://issues.apache.org/jira/browse/GEODE-8765 > Project: Geode > Issue Type: Bug > Components: wan > Affects Versions: 1.14.0 > Reporter: Alberto Gomez > Assignee: Alberto Gomez > Priority: Major > Labels: pull-request-available > > When group-transaction-events is enabled and Geode receives a mix of puts, > some inside transactions and some not in transactions, in the case it is > needed to add extra events to the batch in order to have all the events for > each transaction in the given batch, if the sender runs into an event not > belonging to a transaction while looking for events in the queue, a > NullPointerException is thrown when trying to get the transactionId for the > event. The exception is caught by the sender processor and a warning message > is written in the log but some undesired effects are provoked: > > * In parallel gateway senders, this situation provokes that the batch is > sent without completing the transactions and also some events are left in the > queues forever without ever being drained (although all the events are sent > to the other side). > * In serial gateway senders, this situation provokes that once the exception > is thrown, no more events are sent to the other side as the events in the > last batch that could not be sent because the exception was thrown are tried > to be sent over and over without success. -- This message was sent by Atlassian Jira (v8.3.4#803005)