[ https://issues.apache.org/jira/browse/GEODE-8455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17185431#comment-17185431 ]
ASF GitHub Bot commented on GEODE-8455: --------------------------------------- DonalEvans commented on a change in pull request #5476: URL: https://github.com/apache/geode/pull/5476#discussion_r477509049 ########## File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java ########## @@ -3197,6 +3114,18 @@ public static Boolean killSender(String senderId) { } } + private static AbstractGatewaySender getAbstractGatewaySender(String senderId) { Review comment: This method almost entirely duplicates the existing `getGatewaySender()` method, so it can be removed and calls to it replaced with `(AbstractGatewaySender) getGatewaySender(senderId)` ########## File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java ########## @@ -1131,6 +1141,111 @@ public void testParallelPropagationWithFilter_AfterAck() throws Exception { } + /** + * Test that, when a parallel gateway sender is added to a partitioned region through attributes + * mutator, transaction events are not sent to all region members but only to those who are + * hosting the bucket for the event and thus, events are not stored in the bucketToTempQueueMap + * member of the ParallelGatewaySenderQueue. + * Redundancy = 1 in the partitioned region. + * + */ + @Test + public void testParallelPropagationTxNotificationsNotSentToAllRegionMembersWhenAddingParallelGatewaySenderThroughAttributesMutator() + throws Exception { + Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + + createCacheInVMs(nyPort, vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); + vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); + vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); + + vm4.invoke(createPartitionedRegionRedundancy1RunnableNoSenders()); + vm5.invoke(createPartitionedRegionRedundancy1RunnableNoSenders()); + vm6.invoke(createPartitionedRegionRedundancy1RunnableNoSenders()); + vm7.invoke(createPartitionedRegionRedundancy1RunnableNoSenders()); + + vm2.invoke(createReceiverPartitionedRegionRedundancy1()); + vm3.invoke(createReceiverPartitionedRegionRedundancy1()); + + vm4.invoke(() -> addSenderThroughAttributesMutator(getTestMethodName() + "_PR", "ln")); + vm5.invoke(() -> addSenderThroughAttributesMutator(getTestMethodName() + "_PR", "ln")); + vm6.invoke(() -> addSenderThroughAttributesMutator(getTestMethodName() + "_PR", "ln")); + vm7.invoke(() -> addSenderThroughAttributesMutator(getTestMethodName() + "_PR", "ln")); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm4.invoke(waitForSenderRunnable()); + vm5.invoke(waitForSenderRunnable()); + vm6.invoke(waitForSenderRunnable()); + vm7.invoke(waitForSenderRunnable()); + + vm4.invoke(() -> WANTestBase.doTxPuts(getTestMethodName() + "_PR")); + + vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 3)); + vm4.invoke(() -> WANTestBase.verifyQueueSize("ln", 3)); + + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); + + vm4.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln")); + vm5.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln")); + vm6.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln")); + vm7.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln")); + } + + /** + * Test that, when a parallel gateway sender is added to a partitioned region through attributes + * mutator, transaction events are not sent to all region members but only to those who are + * hosting the bucket for the event and thus, events are not stored in the bucketToTempQueueMap + * member of the ParallelGatewaySenderQueue. + * No redundancy in the partitioned region. + * + */ + @Test + public void testParallelPropagationTxNotificationsNotSentToAllRegionMembersWhenAddingParallelGatewaySenderThroughAttributesMutatorNoRedundancy() + throws Exception { + Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); Review comment: These casts to `Integer` are redundant and can be removed. ########## File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java ########## @@ -190,6 +190,16 @@ protected SerializableRunnableIF createPartitionedRegionRedundancy1Runnable() { isOffHeap()); } + protected SerializableRunnableIF createPartitionedRegionRedundancy1RunnableNoSenders() { + return () -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 100, + isOffHeap()); + } + + protected SerializableRunnableIF createPartitionedRegionRedundancy0RunnableNoSenders() { + return () -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 0, 100, + isOffHeap()); + } + Review comment: These methods are very similar. Would it be possible to replace them with one method which takes an `int` argument for the desired redundancy level? ########## File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java ########## @@ -1131,6 +1141,111 @@ public void testParallelPropagationWithFilter_AfterAck() throws Exception { } + /** + * Test that, when a parallel gateway sender is added to a partitioned region through attributes + * mutator, transaction events are not sent to all region members but only to those who are + * hosting the bucket for the event and thus, events are not stored in the bucketToTempQueueMap + * member of the ParallelGatewaySenderQueue. + * Redundancy = 1 in the partitioned region. + * + */ + @Test + public void testParallelPropagationTxNotificationsNotSentToAllRegionMembersWhenAddingParallelGatewaySenderThroughAttributesMutator() + throws Exception { + Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); Review comment: These casts to `Integer` are redundant and can be removed. ########## File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java ########## @@ -3599,21 +3444,42 @@ public static Long getNumberOfEntriesInVM(final String senderId) { } public static void verifyTmpDroppedEventSize(String senderId, int size) { - Set<GatewaySender> senders = cache.getGatewaySenders(); - GatewaySender sender = null; - for (GatewaySender s : senders) { - if (s.getId().equals(senderId)) { - sender = s; - break; - } - } + GatewaySender sender = getGatewaySender(senderId); AbstractGatewaySender ags = (AbstractGatewaySender) sender; await().untilAsserted(() -> assertEquals("Expected tmpDroppedEvents size: " + size + " but actual size: " + ags.getTmpDroppedEventSize(), size, ags.getTmpDroppedEventSize())); } - public static void verifyQueueSize(String senderId, int size) { + /** + * Checks that the bucketToTempQueueMap for a partitioned region + * that holds events for buckets that are not available locally, is empty. + */ + public static void validateEmptyBucketToTempQueueMap(String senderId) { + GatewaySender sender = getGatewaySender(senderId); + + int size = 0; + Set queues = ((AbstractGatewaySender) sender).getQueues(); Review comment: This should be `Set<RegionQueue>` to avoid compiler warnings. ########## File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java ########## @@ -3599,21 +3444,42 @@ public static Long getNumberOfEntriesInVM(final String senderId) { } public static void verifyTmpDroppedEventSize(String senderId, int size) { - Set<GatewaySender> senders = cache.getGatewaySenders(); - GatewaySender sender = null; - for (GatewaySender s : senders) { - if (s.getId().equals(senderId)) { - sender = s; - break; - } - } + GatewaySender sender = getGatewaySender(senderId); AbstractGatewaySender ags = (AbstractGatewaySender) sender; await().untilAsserted(() -> assertEquals("Expected tmpDroppedEvents size: " + size + " but actual size: " + ags.getTmpDroppedEventSize(), size, ags.getTmpDroppedEventSize())); } - public static void verifyQueueSize(String senderId, int size) { + /** + * Checks that the bucketToTempQueueMap for a partitioned region + * that holds events for buckets that are not available locally, is empty. + */ + public static void validateEmptyBucketToTempQueueMap(String senderId) { + GatewaySender sender = getGatewaySender(senderId); + + int size = 0; + Set queues = ((AbstractGatewaySender) sender).getQueues(); + for (Object queue : queues) { + PartitionedRegion region = + (PartitionedRegion) ((ConcurrentParallelGatewaySenderQueue) queue).getRegion(); + int buckets = region.getTotalNumberOfBuckets(); + for (int bucket = 0; bucket < buckets; bucket++) { + BlockingQueue newQueue = + ((ConcurrentParallelGatewaySenderQueue) queue).getBucketTmpQueue((int) bucket); Review comment: The cast to `int` here is unnecessary. ########## File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java ########## @@ -3599,21 +3444,42 @@ public static Long getNumberOfEntriesInVM(final String senderId) { } public static void verifyTmpDroppedEventSize(String senderId, int size) { - Set<GatewaySender> senders = cache.getGatewaySenders(); - GatewaySender sender = null; - for (GatewaySender s : senders) { - if (s.getId().equals(senderId)) { - sender = s; - break; - } - } + GatewaySender sender = getGatewaySender(senderId); AbstractGatewaySender ags = (AbstractGatewaySender) sender; await().untilAsserted(() -> assertEquals("Expected tmpDroppedEvents size: " + size + " but actual size: " + ags.getTmpDroppedEventSize(), size, ags.getTmpDroppedEventSize())); } - public static void verifyQueueSize(String senderId, int size) { + /** + * Checks that the bucketToTempQueueMap for a partitioned region + * that holds events for buckets that are not available locally, is empty. + */ + public static void validateEmptyBucketToTempQueueMap(String senderId) { + GatewaySender sender = getGatewaySender(senderId); + + int size = 0; + Set queues = ((AbstractGatewaySender) sender).getQueues(); + for (Object queue : queues) { + PartitionedRegion region = + (PartitionedRegion) ((ConcurrentParallelGatewaySenderQueue) queue).getRegion(); + int buckets = region.getTotalNumberOfBuckets(); + for (int bucket = 0; bucket < buckets; bucket++) { + BlockingQueue newQueue = Review comment: This should be `BlockingQueue<GatewaySenderEventImpl>` to avoid compiler warnings. ########## File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java ########## @@ -1131,6 +1141,111 @@ public void testParallelPropagationWithFilter_AfterAck() throws Exception { } + /** + * Test that, when a parallel gateway sender is added to a partitioned region through attributes + * mutator, transaction events are not sent to all region members but only to those who are + * hosting the bucket for the event and thus, events are not stored in the bucketToTempQueueMap + * member of the ParallelGatewaySenderQueue. + * Redundancy = 1 in the partitioned region. + * + */ + @Test + public void testParallelPropagationTxNotificationsNotSentToAllRegionMembersWhenAddingParallelGatewaySenderThroughAttributesMutator() + throws Exception { + Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + + createCacheInVMs(nyPort, vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); + vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); + vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); + + vm4.invoke(createPartitionedRegionRedundancy1RunnableNoSenders()); + vm5.invoke(createPartitionedRegionRedundancy1RunnableNoSenders()); + vm6.invoke(createPartitionedRegionRedundancy1RunnableNoSenders()); + vm7.invoke(createPartitionedRegionRedundancy1RunnableNoSenders()); + + vm2.invoke(createReceiverPartitionedRegionRedundancy1()); + vm3.invoke(createReceiverPartitionedRegionRedundancy1()); + + vm4.invoke(() -> addSenderThroughAttributesMutator(getTestMethodName() + "_PR", "ln")); + vm5.invoke(() -> addSenderThroughAttributesMutator(getTestMethodName() + "_PR", "ln")); + vm6.invoke(() -> addSenderThroughAttributesMutator(getTestMethodName() + "_PR", "ln")); + vm7.invoke(() -> addSenderThroughAttributesMutator(getTestMethodName() + "_PR", "ln")); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm4.invoke(waitForSenderRunnable()); + vm5.invoke(waitForSenderRunnable()); + vm6.invoke(waitForSenderRunnable()); + vm7.invoke(waitForSenderRunnable()); + + vm4.invoke(() -> WANTestBase.doTxPuts(getTestMethodName() + "_PR")); + + vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 3)); + vm4.invoke(() -> WANTestBase.verifyQueueSize("ln", 3)); + + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); + + vm4.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln")); + vm5.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln")); + vm6.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln")); + vm7.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln")); + } + + /** + * Test that, when a parallel gateway sender is added to a partitioned region through attributes + * mutator, transaction events are not sent to all region members but only to those who are + * hosting the bucket for the event and thus, events are not stored in the bucketToTempQueueMap + * member of the ParallelGatewaySenderQueue. + * No redundancy in the partitioned region. + * + */ + @Test + public void testParallelPropagationTxNotificationsNotSentToAllRegionMembersWhenAddingParallelGatewaySenderThroughAttributesMutatorNoRedundancy() + throws Exception { Review comment: No exception is thrown from this method, so this can be removed. ########## File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java ########## @@ -1131,6 +1141,111 @@ public void testParallelPropagationWithFilter_AfterAck() throws Exception { } + /** + * Test that, when a parallel gateway sender is added to a partitioned region through attributes + * mutator, transaction events are not sent to all region members but only to those who are + * hosting the bucket for the event and thus, events are not stored in the bucketToTempQueueMap + * member of the ParallelGatewaySenderQueue. + * Redundancy = 1 in the partitioned region. + * + */ + @Test + public void testParallelPropagationTxNotificationsNotSentToAllRegionMembersWhenAddingParallelGatewaySenderThroughAttributesMutator() + throws Exception { Review comment: No exception is thrown from this method, so this can be removed. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Different behavior in transactions on partitioned regions between creating > the region with a parallel gateway sender vs altering the region to add the > parallel gateway sender > ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ > > Key: GEODE-8455 > URL: https://issues.apache.org/jira/browse/GEODE-8455 > Project: Geode > Issue Type: Bug > Components: core > Reporter: Alberto Gomez > Assignee: Alberto Gomez > Priority: Major > Labels: pull-request-available > > When creating a partitioned region and then altering it by adding a parallel > gateway sender, I have observed that, when sending transactions to the > partitioned region, an event (GatewaySenderEvent) for each transaction is > sent to every server hosting buckets for the partitioned region and not only > to that/those hosting the bucket where the data in the transaction belongs. > So, for example, in a partitioned region provided by two cache servers where > redundancy is zero, when a transaction is sent, the put is done on the server > hosting the bucket where the data in the transaction is to be stored but an > event for the put is sent to the two members. This provokes in the server not > hosting locally the bucket for the data in the event that the event > (GatewaySenderEvent) is stored in the bucketToTempQueueMap member variable of > the ParallelGatewaySenderQueue. > Those events are eventually removed from that member as events are sent by > the gateway sender to the remote site. Nevertheless, if the remote site > cannot be reached, the events are kept in that member which could provoke a > heap exhaustion problem if there is a sufficient amount of incoming > transactions and the remote site cannot be reached for the required amount of > time. > Events for a transaction should only be sent to the servers hosting the > bucket for the data in the transaction and not to all servers. -- This message was sent by Atlassian Jira (v8.3.4#803005)