[ 
https://issues.apache.org/jira/browse/GEODE-8765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17247444#comment-17247444
 ] 

ASF GitHub Bot commented on GEODE-8765:
---------------------------------------

DonalEvans commented on a change in pull request #5829:
URL: https://github.com/apache/geode/pull/5829#discussion_r540364988



##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
##########
@@ -1389,6 +1383,21 @@ private void 
peekEventsFromIncompleteTransactions(List<GatewaySenderEventImpl> b
     }
   }
 
+  private Map<TransactionId, Integer> getIncompleteTransactionsInBatch(List 
batch) {

Review comment:
       The compiler warning here can be fixed if `List<GatewaySenderEventImpl>` 
is used as the method argument. This also means that the for loop below can be 
replaced with 
   ```
   for (GatewaySenderEventImpl event : batch) {
   ```
   and the cast to `GatewaySenderEventImpl` can be removed.

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
##########
@@ -1389,6 +1383,21 @@ private void 
peekEventsFromIncompleteTransactions(List<GatewaySenderEventImpl> b
     }
   }
 
+  private Map<TransactionId, Integer> getIncompleteTransactionsInBatch(List 
batch) {
+    Map<TransactionId, Integer> incompleteTransactionsInBatch = new 
HashMap<>();
+    for (Object object : batch) {
+      GatewaySenderEventImpl event = (GatewaySenderEventImpl) object;
+      if (event.getTransactionId() != null) {
+        if (event.isLastEventInTransaction()) {
+          incompleteTransactionsInBatch.remove(event.getTransactionId());
+        } else {
+          incompleteTransactionsInBatch.put(event.getTransactionId(), 
event.getBucketId());
+        }
+      }
+    }
+    return incompleteTransactionsInBatch;
+  }
+
   private boolean areAllTransactionsCompleteInBatch(Map 
incompleteTransactions) {

Review comment:
       While not part of the changes in this PR, this method could be inlined, 
since it's used in only one place and is only one line.

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
##########
@@ -534,6 +523,23 @@ private boolean areAllTransactionsCompleteInBatch(Set 
incompleteTransactions) {
     return (incompleteTransactions.size() == 0);

Review comment:
       As with the similar method in `ParallelGatewaySenderQueue`, this method 
could also be inlined.

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
##########
@@ -487,12 +476,12 @@ public Object peek() throws CacheException {
     // so no need to worry about off-heap refCount.
   }
 
-  private void peekEventsFromIncompleteTransactions(List<AsyncEvent> batch,
-      Set<TransactionId> incompleteTransactionIdsInBatch, long lastKey) {
+  private void peekEventsFromIncompleteTransactions(List<AsyncEvent> batch, 
long lastKey) {

Review comment:
       The compiler warning on this line and several others can be resolved by 
making `batch` a `List<AsyncEvent<?,?>>` here and the other places it's used. 
The lines requiring this change are 416, 421, 431-432, 438, 479 and 526.

##########
File path: 
geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java
##########
@@ -148,6 +149,7 @@ public void 
testGetElementsMatchingWithHasTransactionIdPredicateAndIsLastEventIn
     TransactionId tx3 = new TXId(null, 3);
 
     GatewaySenderEventImpl event1 = createMockGatewaySenderEvent(1, tx1, 
false);
+    GatewaySenderEventImpl eventNotInTransaction1 = 
createMockGatewaySenderEvent(8, null, false);

Review comment:
       To make the numbering a little more consistent, could this event's key 
be 2, and the subsequent events have their key values increased by 1? That way 
the calls to `bucketRegionQueue.addToQueue()` will have matching keys with the 
keys here.

##########
File path: 
geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java
##########
@@ -159,17 +161,18 @@ public void 
testGetElementsMatchingWithHasTransactionIdPredicateAndIsLastEventIn
         
.cleanUpDestroyedTokensAndMarkGIIComplete(InitialImageOperation.GIIStatus.NO_GII);
 
     this.bucketRegionQueue.addToQueue(Long.valueOf(1), event1);
-    this.bucketRegionQueue.addToQueue(Long.valueOf(2), event2);
-    this.bucketRegionQueue.addToQueue(Long.valueOf(3), event3);
-    this.bucketRegionQueue.addToQueue(Long.valueOf(4), event4);
-    this.bucketRegionQueue.addToQueue(Long.valueOf(5), event5);
-    this.bucketRegionQueue.addToQueue(Long.valueOf(6), event6);
-    this.bucketRegionQueue.addToQueue(Long.valueOf(7), event7);
+    this.bucketRegionQueue.addToQueue(Long.valueOf(2), eventNotInTransaction1);
+    this.bucketRegionQueue.addToQueue(Long.valueOf(3), event2);
+    this.bucketRegionQueue.addToQueue(Long.valueOf(4), event3);
+    this.bucketRegionQueue.addToQueue(Long.valueOf(5), event4);
+    this.bucketRegionQueue.addToQueue(Long.valueOf(6), event5);
+    this.bucketRegionQueue.addToQueue(Long.valueOf(7), event6);
+    this.bucketRegionQueue.addToQueue(Long.valueOf(8), event7);

Review comment:
       These calls to `Long.valueOf()` can be replaced with just `1L`, `2L` etc.

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
##########
@@ -1235,6 +1242,107 @@ public void 
testParallelPropagationTxNotificationsNotSentToAllRegionMembersWhenA
     vm5.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln"));
   }
 
+  @Test
+  public void 
testPartitionedParallelPropagationWithGroupTransactionEventsAndMixOfEventsInAndNotInTransactions()
+      throws Exception {
+    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(
+        () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, 
null, true, true, 2));
+    vm5.invoke(
+        () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, 
null, true, true, 2));
+    vm6.invoke(
+        () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, 
null, true, true, 2));
+    vm7.invoke(
+        () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, 
null, true, true, 2));
+
+    vm4.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 
2, 10,
+            isOffHeap()));
+    vm5.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 
2, 10,
+            isOffHeap()));
+    vm6.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 
2, 10,
+            isOffHeap()));
+    vm7.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 
2, 10,
+            isOffHeap()));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm2.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 8, 
isOffHeap()));
+    vm3.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 8, 
isOffHeap()));
+
+    int customers = 4;
+
+    int transactionsPerCustomer = 1000;
+    final Map keyValuesInTransactions = new HashMap();
+    for (int custId = 0; custId < customers; custId++) {
+      for (int i = 0; i < transactionsPerCustomer; i++) {
+        CustId custIdObject = new CustId(custId);
+        OrderId orderId = new OrderId(i, custIdObject);
+        ShipmentId shipmentId1 = new ShipmentId(i, orderId);
+        ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId);
+        ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId);
+        keyValuesInTransactions.put(orderId, new Order());
+        keyValuesInTransactions.put(shipmentId1, new Shipment());
+        keyValuesInTransactions.put(shipmentId2, new Shipment());
+        keyValuesInTransactions.put(shipmentId3, new Shipment());
+      }
+    }
+
+    int ordersPerCustomerNotInTransactions = 1000;
+
+    final Map keyValuesNotInTransactions = new HashMap();

Review comment:
       Compiler warnings on this line can be fixed by using
   ```
   final Map<Object, Object> keyValuesNotInTransactions = new HashMap<>();
   ```

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
##########
@@ -1235,6 +1242,107 @@ public void 
testParallelPropagationTxNotificationsNotSentToAllRegionMembersWhenA
     vm5.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln"));
   }
 
+  @Test
+  public void 
testPartitionedParallelPropagationWithGroupTransactionEventsAndMixOfEventsInAndNotInTransactions()
+      throws Exception {
+    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(
+        () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, 
null, true, true, 2));
+    vm5.invoke(
+        () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, 
null, true, true, 2));
+    vm6.invoke(
+        () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, 
null, true, true, 2));
+    vm7.invoke(
+        () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, 
null, true, true, 2));
+
+    vm4.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 
2, 10,
+            isOffHeap()));
+    vm5.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 
2, 10,
+            isOffHeap()));
+    vm6.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 
2, 10,
+            isOffHeap()));
+    vm7.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 
2, 10,
+            isOffHeap()));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm2.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 8, 
isOffHeap()));
+    vm3.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 8, 
isOffHeap()));
+
+    int customers = 4;
+
+    int transactionsPerCustomer = 1000;
+    final Map keyValuesInTransactions = new HashMap();
+    for (int custId = 0; custId < customers; custId++) {
+      for (int i = 0; i < transactionsPerCustomer; i++) {
+        CustId custIdObject = new CustId(custId);
+        OrderId orderId = new OrderId(i, custIdObject);
+        ShipmentId shipmentId1 = new ShipmentId(i, orderId);
+        ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId);
+        ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId);
+        keyValuesInTransactions.put(orderId, new Order());
+        keyValuesInTransactions.put(shipmentId1, new Shipment());
+        keyValuesInTransactions.put(shipmentId2, new Shipment());
+        keyValuesInTransactions.put(shipmentId3, new Shipment());
+      }
+    }
+
+    int ordersPerCustomerNotInTransactions = 1000;
+
+    final Map keyValuesNotInTransactions = new HashMap();
+    for (int custId = 0; custId < customers; custId++) {
+      for (int i = 0; i < ordersPerCustomerNotInTransactions; i++) {
+        CustId custIdObject = new CustId(custId);
+        OrderId orderId = new OrderId(i + transactionsPerCustomer * customers, 
custIdObject);
+        keyValuesNotInTransactions.put(orderId, new Order());
+      }
+    }
+
+    // eventsPerTransaction is 1 (orders) + 3 (shipments)
+    int eventsPerTransaction = 4;
+    AsyncInvocation inv1 =

Review comment:
       The compiler warning here can be removed by using 
`AsyncInvocation<Void>`.

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
##########
@@ -1235,6 +1242,107 @@ public void 
testParallelPropagationTxNotificationsNotSentToAllRegionMembersWhenA
     vm5.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln"));
   }
 
+  @Test
+  public void 
testPartitionedParallelPropagationWithGroupTransactionEventsAndMixOfEventsInAndNotInTransactions()
+      throws Exception {
+    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(
+        () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, 
null, true, true, 2));
+    vm5.invoke(
+        () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, 
null, true, true, 2));
+    vm6.invoke(
+        () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, 
null, true, true, 2));
+    vm7.invoke(
+        () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, 
null, true, true, 2));
+
+    vm4.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 
2, 10,
+            isOffHeap()));
+    vm5.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 
2, 10,
+            isOffHeap()));
+    vm6.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 
2, 10,
+            isOffHeap()));
+    vm7.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 
2, 10,
+            isOffHeap()));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm2.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 8, 
isOffHeap()));
+    vm3.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 8, 
isOffHeap()));
+
+    int customers = 4;
+
+    int transactionsPerCustomer = 1000;
+    final Map keyValuesInTransactions = new HashMap();

Review comment:
       Compiler warnings on this line can be fixed by using
   ```
   final Map<Object, Object> keyValuesInTransactions = new HashMap<>();
   ```

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java
##########
@@ -371,4 +379,107 @@ public void 
testPartitionedSerialPropagationWithParallelThreads() throws Excepti
     vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + 
"_PR", 1000));
     vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + 
"_PR", 1000));
   }
+
+  @Test
+  public void 
testPartitionedSerialPropagationWithGroupTransactionEventsAndMixOfEventsInAndNotInTransactions()

Review comment:
       All comments that apply to `ParallelWANPropagationDUnitTest` also apply 
to this test method, since the code in both tests is largely identical.

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
##########
@@ -1737,26 +1737,37 @@ public static GatewaySenderFactory 
configureGateway(DiskStoreFactory dsf, File[]
     return gateway;
   }
 
+  public static void createSender(String dsName, int remoteDsId, boolean 
isParallel,
+      Integer maxMemory, Integer batchSize, boolean isConflation, boolean 
isPersistent,
+      GatewayEventFilter filter, boolean isManualStart, boolean 
groupTransactionEvents) {
+    createSender(dsName, remoteDsId, isParallel, maxMemory, batchSize, 
isConflation, isPersistent,
+        filter, isManualStart, groupTransactionEvents, 0);
+  }
+
   public static void createSender(String dsName, int remoteDsId, boolean 
isParallel,
       Integer maxMemory, Integer batchSize, boolean isConflation, boolean 
isPersistent,
       GatewayEventFilter filter, boolean isManualStart) {
     createSender(dsName, remoteDsId, isParallel, maxMemory, batchSize, 
isConflation, isPersistent,
-        filter, isManualStart, false);
+        filter, isManualStart, false, 0);
   }
 
   public static void createSender(String dsName, int remoteDsId, boolean 
isParallel,
       Integer maxMemory, Integer batchSize, boolean isConflation, boolean 
isPersistent,
-      GatewayEventFilter filter, boolean isManualStart, boolean 
groupTransactionEvents) {
+      GatewayEventFilter filter, boolean isManualStart, boolean 
groupTransactionEvents,
+      int dispatcherThreads) {

Review comment:
       Is it necessary to create a new overloaded method here? There is already 
a method `setNumDispatcherThreadsForTheRun()` in `WANTestBase` that allows this 
value to be set to whatever is desired.

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
##########
@@ -1235,6 +1242,107 @@ public void 
testParallelPropagationTxNotificationsNotSentToAllRegionMembersWhenA
     vm5.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln"));
   }
 
+  @Test
+  public void 
testPartitionedParallelPropagationWithGroupTransactionEventsAndMixOfEventsInAndNotInTransactions()
+      throws Exception {
+    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(
+        () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, 
null, true, true, 2));
+    vm5.invoke(
+        () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, 
null, true, true, 2));
+    vm6.invoke(
+        () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, 
null, true, true, 2));
+    vm7.invoke(
+        () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, 
null, true, true, 2));
+
+    vm4.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 
2, 10,
+            isOffHeap()));
+    vm5.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 
2, 10,
+            isOffHeap()));
+    vm6.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 
2, 10,
+            isOffHeap()));
+    vm7.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 
2, 10,
+            isOffHeap()));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm2.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 8, 
isOffHeap()));
+    vm3.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 8, 
isOffHeap()));
+
+    int customers = 4;
+
+    int transactionsPerCustomer = 1000;
+    final Map keyValuesInTransactions = new HashMap();
+    for (int custId = 0; custId < customers; custId++) {
+      for (int i = 0; i < transactionsPerCustomer; i++) {
+        CustId custIdObject = new CustId(custId);
+        OrderId orderId = new OrderId(i, custIdObject);
+        ShipmentId shipmentId1 = new ShipmentId(i, orderId);
+        ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId);
+        ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId);
+        keyValuesInTransactions.put(orderId, new Order());
+        keyValuesInTransactions.put(shipmentId1, new Shipment());
+        keyValuesInTransactions.put(shipmentId2, new Shipment());
+        keyValuesInTransactions.put(shipmentId3, new Shipment());
+      }
+    }
+
+    int ordersPerCustomerNotInTransactions = 1000;
+
+    final Map keyValuesNotInTransactions = new HashMap();
+    for (int custId = 0; custId < customers; custId++) {
+      for (int i = 0; i < ordersPerCustomerNotInTransactions; i++) {
+        CustId custIdObject = new CustId(custId);
+        OrderId orderId = new OrderId(i + transactionsPerCustomer * customers, 
custIdObject);
+        keyValuesNotInTransactions.put(orderId, new Order());
+      }
+    }
+
+    // eventsPerTransaction is 1 (orders) + 3 (shipments)
+    int eventsPerTransaction = 4;
+    AsyncInvocation inv1 =
+        vm7.invokeAsync(
+            () -> 
WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValuesInTransactions,
+                eventsPerTransaction));
+
+    AsyncInvocation inv2 =

Review comment:
       The compiler warning here can be removed by using 
`AsyncInvocation<Void>`.

##########
File path: 
geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java
##########
@@ -138,7 +139,7 @@ public void 
testBasicDestroyConflationEnabledAndValueNotInRegion() {
   }
 
   @Test
-  public void 
testGetElementsMatchingWithHasTransactionIdPredicateAndIsLastEventInTransactionPredicate()
+  public void 
testGetElementsMatchingWithParallelGatewaySenderQueuePredicatesAndSomeEventsNotInTransactions()
       throws ForceReattemptException {
     ParallelGatewaySenderEventProcessor processor =
         
ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender);

Review comment:
       This variable is never used, so these lines could be just
   ```
   
ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender);
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> NullPointerException if group-transaction-events enabled and mix of puts with 
> transactions and without transactions
> -------------------------------------------------------------------------------------------------------------------
>
>                 Key: GEODE-8765
>                 URL: https://issues.apache.org/jira/browse/GEODE-8765
>             Project: Geode
>          Issue Type: Bug
>          Components: wan
>    Affects Versions: 1.14.0
>            Reporter: Alberto Gomez
>            Assignee: Alberto Gomez
>            Priority: Major
>              Labels: pull-request-available
>
> When group-transaction-events is enabled and Geode receives a mix of puts, 
> some inside transactions and some not in transactions, in the case it is 
> needed to add extra events to the batch in order to have all the events for 
> each transaction in the given batch, if the sender runs into an event not 
> belonging to a transaction while looking for events in the queue, a 
> NullPointerException is thrown when trying to get the transactionId for the 
> event. The exception is caught by the sender processor and a warning message 
> is written in the log but some undesired effects are provoked:
>  
>  * In parallel gateway senders, this situation provokes that the batch is 
> sent without completing the transactions and also some events are left in the 
> queues forever without ever being drained (although all the events are sent 
> to the other side).
>  * In serial gateway senders, this situation provokes that once the exception 
> is thrown, no more events are sent to the other side as the events in the 
> last batch that could not be sent because the exception was thrown are tried 
> to be sent over and over without success.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to