This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 6f57745ce6 GEODE-10242: Colocated buckets share primary move lock 
(#7630)
6f57745ce6 is described below

commit 6f57745ce65583d49459d5cb120985d3b72e26af
Author: Eric Shu <[email protected]>
AuthorDate: Tue May 3 10:48:00 2022 -0700

    GEODE-10242: Colocated buckets share primary move lock (#7630)
    
     * All colocated buckets now share the same primaryMoveReadWriteLock.
       When parent bucket is being moved, no operations will be executed on
       child buckets as well. So moving primary for all colocated buckets
       shold be faster, and there is no need to hold parent locks anymore.
    
    * Added.a dunit test to validate the fix works.
---
 .../apache/geode/internal/cache/BucketAdvisor.java |  43 +++---
 .../apache/geode/internal/cache/BucketRegion.java  |  39 +----
 .../geode/internal/cache/BucketAdvisorTest.java    |  32 ++---
 .../geode/internal/cache/wan/WANTestBase.java      |  13 +-
 .../ParallelWANConflationDistributedTest.java      | 159 +++++++++++++++++++++
 5 files changed, 210 insertions(+), 76 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
index 2b70f868d2..e6dcd3fb8f 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
@@ -142,15 +142,15 @@ public class BucketAdvisor extends 
CacheDistributionAdvisor {
    * A read/write lock to prevent making this bucket not primary while a write 
is in progress on the
    * bucket.
    */
-  private final ReadWriteLock primaryMoveReadWriteLock = new 
ReentrantReadWriteLock();
-  private final Lock primaryMoveReadLock = primaryMoveReadWriteLock.readLock();
-  private final Lock primaryMoveWriteLock = 
primaryMoveReadWriteLock.writeLock();
+  private final ReadWriteLock primaryMoveReadWriteLock;
+  private final Lock primaryMoveReadLock;
+  private final Lock primaryMoveWriteLock;
 
   /**
    * The advisor for the bucket region that we are colocated with, if this 
region is a colocated
    * region.
    */
-  private BucketAdvisor parentAdvisor;
+  private final BucketAdvisor parentAdvisor;
 
   /**
    * The member that is responsible for choosing the primary for this bucket. 
While this field is
@@ -180,7 +180,15 @@ public class BucketAdvisor extends 
CacheDistributionAdvisor {
     pRegion = this.regionAdvisor.getPartitionedRegion();
     redundancyTracker =
         new BucketRedundancyTracker(pRegion.getRedundantCopies(), 
pRegion.getRedundancyTracker());
-    resetParentAdvisor(bucket.getId());
+    parentAdvisor = getParentAdvisor(bucket.getId());
+
+    if (parentAdvisor == null) {
+      primaryMoveReadWriteLock = new ReentrantReadWriteLock();
+    } else {
+      primaryMoveReadWriteLock = parentAdvisor.primaryMoveReadWriteLock;
+    }
+    primaryMoveReadLock = primaryMoveReadWriteLock.readLock();
+    primaryMoveWriteLock = primaryMoveReadWriteLock.writeLock();
   }
 
   public static BucketAdvisor createBucketAdvisor(Bucket bucket, RegionAdvisor 
regionAdvisor) {
@@ -189,7 +197,7 @@ public class BucketAdvisor extends CacheDistributionAdvisor 
{
     return advisor;
   }
 
-  private void resetParentAdvisor(int bucketId) {
+  private BucketAdvisor getParentAdvisor(int bucketId) {
     PartitionedRegion colocatedRegion = 
ColocationHelper.getColocatedRegion(pRegion);
     if (colocatedRegion != null) {
       if (colocatedRegion.isFixedPartitionedRegion()) {
@@ -197,18 +205,15 @@ public class BucketAdvisor extends 
CacheDistributionAdvisor {
         if (fpas != null) {
           for (FixedPartitionAttributesImpl fpa : fpas) {
             if (fpa.hasBucket(bucketId)) {
-              parentAdvisor =
-                  
colocatedRegion.getRegionAdvisor().getBucketAdvisor(fpa.getStartingBucketID());
-              break;
+              return 
colocatedRegion.getRegionAdvisor().getBucketAdvisor(fpa.getStartingBucketID());
             }
           }
         }
       } else {
-        parentAdvisor = 
colocatedRegion.getRegionAdvisor().getBucketAdvisor(bucketId);
+        return colocatedRegion.getRegionAdvisor().getBucketAdvisor(bucketId);
       }
-    } else {
-      parentAdvisor = null;
     }
+    return null;
   }
 
   private void assignStartingBucketAdvisorIfFixedPartitioned() {
@@ -240,19 +245,6 @@ public class BucketAdvisor extends 
CacheDistributionAdvisor {
     return primaryMoveReadLock;
   }
 
-  /**
-   * Returns the lock that prevents the parent's primary from moving while 
active writes are in
-   * progress. This should be locked before checking if the local bucket is 
primary.
-   *
-   * @return the lock for in-progress write operations
-   */
-  Lock getParentPrimaryMoveReadLock() {
-    if (parentAdvisor != null) {
-      return parentAdvisor.getPrimaryMoveReadLock();
-    }
-    return null;
-  }
-
   /**
    * Try to lock the primary bucket to make sure no operation is on-going at 
current bucket.
    *
@@ -899,6 +891,7 @@ public class BucketAdvisor extends CacheDistributionAdvisor 
{
         releasePrimaryLock();
         // this was a deposePrimary call so we need to depose children as well
         deposePrimaryForColocatedChildren();
+
         if (pRegion.isFixedPartitionedRegion()) {
           deposeOtherPrimaryBucketForFixedPartition();
         }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index 421a854498..ffb1109115 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -799,7 +799,7 @@ public class BucketRegion extends DistributedRegion 
implements Bucket {
   }
 
   /**
-   * lock this bucket and, if present, its colocated "parent"
+   * lock this bucket
    *
    * @param tryLock - whether to use tryLock (true) or a blocking lock (false)
    * @return true if locks were obtained and are still held
@@ -832,41 +832,20 @@ public class BucketRegion extends DistributedRegion 
implements Bucket {
 
   private boolean lockPrimaryStateReadLock(boolean tryLock) {
     Lock primaryMoveReadLock = getBucketAdvisor().getPrimaryMoveReadLock();
-    Lock parentLock = getBucketAdvisor().getParentPrimaryMoveReadLock();
     for (;;) {
       boolean interrupted = Thread.interrupted();
       try {
         // Get the lock. If we have to wait here, it's because
         // this VM is actively becoming "not primary". We don't want
         // to throw an exception until this VM is actually no longer
-        // primary, so we wait here for not primary to complete. See bug #39963
-        if (parentLock != null) {
-          if (tryLock) {
-            boolean locked = parentLock.tryLock();
-            if (!locked) {
-              return false;
-            }
-          } else {
-            parentLock.lockInterruptibly();
-          }
-          if (tryLock) {
-            boolean locked = primaryMoveReadLock.tryLock();
-            if (!locked) {
-              parentLock.unlock();
-              return false;
-            }
-          } else {
-            primaryMoveReadLock.lockInterruptibly();
+        // primary, so we wait here for not primary to complete.
+        if (tryLock) {
+          boolean locked = primaryMoveReadLock.tryLock();
+          if (!locked) {
+            return false;
           }
         } else {
-          if (tryLock) {
-            boolean locked = primaryMoveReadLock.tryLock();
-            if (!locked) {
-              return false;
-            }
-          } else {
-            primaryMoveReadLock.lockInterruptibly();
-          }
+          primaryMoveReadLock.lockInterruptibly();
         }
         break; // success
       } catch (InterruptedException e) {
@@ -886,10 +865,6 @@ public class BucketRegion extends DistributedRegion 
implements Bucket {
   public void doUnlockForPrimary() {
     Lock primaryMoveReadLock = getBucketAdvisor().getPrimaryMoveReadLock();
     primaryMoveReadLock.unlock();
-    Lock parentLock = getBucketAdvisor().getParentPrimaryMoveReadLock();
-    if (parentLock != null) {
-      parentLock.unlock();
-    }
   }
 
   /**
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java
index 817386cb7f..c406092c85 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java
@@ -19,7 +19,6 @@ import static org.apache.geode.cache.Region.SEPARATOR;
 import static 
org.apache.geode.internal.cache.CacheServerImpl.CACHE_SERVER_BIND_ADDRESS_NOT_AVAILABLE_EXCEPTION_MESSAGE;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doCallRealMethod;
 import static org.mockito.Mockito.doNothing;
@@ -33,7 +32,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
 import org.apache.geode.cache.PartitionAttributes;
@@ -43,10 +42,10 @@ import 
org.apache.geode.distributed.internal.membership.InternalDistributedMembe
 import org.apache.geode.internal.cache.partitioned.Bucket;
 import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
 
-public class BucketAdvisorTest {
+class BucketAdvisorTest {
 
   @Test
-  public void shouldBeMockable() throws Exception {
+  void shouldBeMockable() throws Exception {
     BucketAdvisor mockBucketAdvisor = mock(BucketAdvisor.class);
     InternalDistributedMember mockInternalDistributedMember = 
mock(InternalDistributedMember.class);
 
@@ -58,7 +57,7 @@ public class BucketAdvisorTest {
   }
 
   @Test
-  public void 
whenServerStopsAfterTheFirstIsRunningCheckThenItShouldNotBeAddedToLocations() {
+  void 
whenServerStopsAfterTheFirstIsRunningCheckThenItShouldNotBeAddedToLocations() {
     InternalCache mockCache = mock(InternalCache.class);
     ProxyBucketRegion mockBucket = mock(ProxyBucketRegion.class);
     RegionAdvisor mockRegionAdvisor = mock(RegionAdvisor.class);
@@ -86,7 +85,7 @@ public class BucketAdvisorTest {
   }
 
   @Test
-  public void 
whenServerThrowsIllegalStateExceptionWithoutBindAddressMsgThenExceptionMustBeThrown()
 {
+  void 
whenServerThrowsIllegalStateExceptionWithoutBindAddressMsgThenExceptionMustBeThrown()
 {
     InternalCache mockCache = mock(InternalCache.class);
     ProxyBucketRegion mockBucket = mock(ProxyBucketRegion.class);
     RegionAdvisor mockRegionAdvisor = mock(RegionAdvisor.class);
@@ -114,7 +113,7 @@ public class BucketAdvisorTest {
   }
 
   @Test
-  public void volunteerForPrimaryIgnoresMissingPrimaryElector() {
+  void volunteerForPrimaryIgnoresMissingPrimaryElector() {
     DistributionManager distributionManager = mock(DistributionManager.class);
     when(distributionManager.getId()).thenReturn(new 
InternalDistributedMember("localhost", 321));
 
@@ -153,12 +152,13 @@ public class BucketAdvisorTest {
         mock(BucketAdvisor.VolunteeringDelegate.class);
     advisorSpy.setVolunteeringDelegate(volunteeringDelegate);
     advisorSpy.initializePrimaryElector(missingElectorId);
-    assertEquals(missingElectorId, advisorSpy.getPrimaryElector());
+    assertThat(missingElectorId).isEqualTo(advisorSpy.getPrimaryElector());
     advisorSpy.volunteerForPrimary();
     verify(volunteeringDelegate).volunteerForPrimary();
   }
 
-  BucketAdvisor mockBucketAdvisorWithShadowBucketsDestroyedMap(Map<String, 
Boolean> shadowBuckets) {
+  BucketAdvisor mockBucketAdvisorWithShadowBucketsDestroyedMap(
+      Map<String, Boolean> shadowBuckets) {
     DistributionManager distributionManager = mock(DistributionManager.class);
     when(distributionManager.getId()).thenReturn(new 
InternalDistributedMember("localhost", 321));
 
@@ -180,7 +180,7 @@ public class BucketAdvisorTest {
   }
 
   @Test
-  public void 
markAllShadowBucketsAsNonDestroyedShouldClearTheShadowBucketsDestroyedMap() {
+  void 
markAllShadowBucketsAsNonDestroyedShouldClearTheShadowBucketsDestroyedMap() {
     Map<String, Boolean> buckets = of(SEPARATOR + "b1", false, SEPARATOR + 
"b2", true);
     BucketAdvisor bucketAdvisor = 
mockBucketAdvisorWithShadowBucketsDestroyedMap(buckets);
 
@@ -190,7 +190,7 @@ public class BucketAdvisorTest {
   }
 
   @Test
-  public void 
markAllShadowBucketsAsDestroyedShouldSetTheFlagAsTrueForEveryKnownShadowBucket()
 {
+  void 
markAllShadowBucketsAsDestroyedShouldSetTheFlagAsTrueForEveryKnownShadowBucket()
 {
     Map<String, Boolean> buckets =
         of(SEPARATOR + "b1", false, SEPARATOR + "b2", false, SEPARATOR + "b3", 
false);
     BucketAdvisor bucketAdvisor = 
mockBucketAdvisorWithShadowBucketsDestroyedMap(buckets);
@@ -201,7 +201,7 @@ public class BucketAdvisorTest {
   }
 
   @Test
-  public void 
markShadowBucketAsDestroyedShouldSetTheFlagAsTrueOnlyForTheSpecificBucket() {
+  void 
markShadowBucketAsDestroyedShouldSetTheFlagAsTrueOnlyForTheSpecificBucket() {
     Map<String, Boolean> buckets = of(SEPARATOR + "b1", false);
     BucketAdvisor bucketAdvisor = 
mockBucketAdvisorWithShadowBucketsDestroyedMap(buckets);
 
@@ -217,7 +217,7 @@ public class BucketAdvisorTest {
   }
 
   @Test
-  public void isShadowBucketDestroyedShouldReturnCorrectly() {
+  void isShadowBucketDestroyedShouldReturnCorrectly() {
     Map<String, Boolean> buckets = of(SEPARATOR + "b1", true, SEPARATOR + 
"b2", false);
     BucketAdvisor bucketAdvisor = 
mockBucketAdvisorWithShadowBucketsDestroyedMap(buckets);
 
@@ -230,7 +230,7 @@ public class BucketAdvisorTest {
   }
 
   @Test
-  public void 
testGetAllHostingMembersReturnsNoMembersWhenBucketAdvisorHasNoProfiles() {
+  void 
testGetAllHostingMembersReturnsNoMembersWhenBucketAdvisorHasNoProfiles() {
     DistributionManager distributionManager = mock(DistributionManager.class);
     when(distributionManager.getId()).thenReturn(new 
InternalDistributedMember("localhost", 321));
 
@@ -252,7 +252,7 @@ public class BucketAdvisorTest {
   }
 
   @Test
-  public void 
testGetAllHostingMembersReturnsMemberWhenBucketAdvisorHasOneProfileWithHostingBucket()
 {
+  void 
testGetAllHostingMembersReturnsMemberWhenBucketAdvisorHasOneProfileWithHostingBucket()
 {
     DistributionManager distributionManager = mock(DistributionManager.class);
     InternalDistributedMember memberId = new 
InternalDistributedMember("localhost", 321);
 
@@ -282,7 +282,7 @@ public class BucketAdvisorTest {
   }
 
   @Test
-  public void 
testGetAllHostingMembersReturnsMemberWhenBucketAdvisorHasTwoProfilesAndOneIsHostingBucket()
 {
+  void 
testGetAllHostingMembersReturnsMemberWhenBucketAdvisorHasTwoProfilesAndOneIsHostingBucket()
 {
     DistributionManager distributionManager = mock(DistributionManager.class);
     InternalDistributedMember memberId = new 
InternalDistributedMember("localhost", 321);
     InternalDistributedMember memberId2 = new 
InternalDistributedMember("localhost", 323);
diff --git 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index 16ea5f6b91..ce7d19abc5 100644
--- 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -770,10 +770,17 @@ public class WANTestBase extends DistributedTestCase {
 
   public static void createCustomerOrderShipmentPartitionedRegion(String 
senderIds,
       Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap) {
+    createCustomerOrderShipmentPartitionedRegion(senderIds, redundantCopies, 
totalNumBuckets,
+        offHeap, RegionShortcut.PARTITION);
+  }
+
+  public static void createCustomerOrderShipmentPartitionedRegion(String 
senderIds,
+      Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap,
+      RegionShortcut regionShortcut) {
     IgnoredException exp =
         addIgnoredException(ForceReattemptException.class.getName());
     try {
-      RegionFactory<?, ?> fact = 
cache.createRegionFactory(RegionShortcut.PARTITION);
+      RegionFactory<?, ?> fact = cache.createRegionFactory(regionShortcut);
       if (senderIds != null) {
         StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
         while (tokenizer.hasMoreTokens()) {
@@ -795,7 +802,7 @@ public class WANTestBase extends DistributedTestCase {
       
paf.setRedundantCopies(redundantCopies).setTotalNumBuckets(totalNumBuckets)
           .setColocatedWith(customerRegionName)
           .setPartitionResolver(new 
CustomerIDPartitionResolver("CustomerIDPartitionResolver"));
-      fact = cache.createRegionFactory(RegionShortcut.PARTITION);
+      fact = cache.createRegionFactory(regionShortcut);
       if (senderIds != null) {
         StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
         while (tokenizer.hasMoreTokens()) {
@@ -813,7 +820,7 @@ public class WANTestBase extends DistributedTestCase {
       
paf.setRedundantCopies(redundantCopies).setTotalNumBuckets(totalNumBuckets)
           .setColocatedWith(orderRegionName)
           .setPartitionResolver(new 
CustomerIDPartitionResolver("CustomerIDPartitionResolver"));
-      fact = cache.createRegionFactory(RegionShortcut.PARTITION);
+      fact = cache.createRegionFactory(regionShortcut);
       if (senderIds != null) {
         StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
         while (tokenizer.hasMoreTokens()) {
diff --git 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDistributedTest.java
 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDistributedTest.java
index f56c12847e..8e732c57f0 100644
--- 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDistributedTest.java
+++ 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDistributedTest.java
@@ -21,23 +21,45 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.stream.Stream;
 
+import org.apache.logging.log4j.Logger;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.control.RebalanceResults;
+import org.apache.geode.cache.control.ResourceManager;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.DistributionMessageObserver;
+import org.apache.geode.distributed.internal.locks.DLockReleaseProcessor;
+import org.apache.geode.internal.cache.UpdateOperation;
 import org.apache.geode.internal.cache.execute.data.CustId;
 import org.apache.geode.internal.cache.execute.data.Customer;
 import org.apache.geode.internal.cache.execute.data.Order;
 import org.apache.geode.internal.cache.execute.data.OrderId;
 import org.apache.geode.internal.cache.execute.data.Shipment;
 import org.apache.geode.internal.cache.execute.data.ShipmentId;
+import org.apache.geode.internal.cache.partitioned.DeposePrimaryBucketMessage;
 import org.apache.geode.internal.cache.wan.WANTestBase;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.dunit.AsyncInvocation;
 import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.test.dunit.rules.DistributedBlackboard;
 import org.apache.geode.test.junit.categories.WanTest;
 
 @Category({WanTest.class})
 public class ParallelWANConflationDistributedTest extends WANTestBase {
+  @Rule
+  public DistributedBlackboard blackboard = new DistributedBlackboard();
+
   private static final long serialVersionUID = 1L;
+  private static final Logger logger = LogService.getLogger();
+  private static final String DEPOSE_PRIMARY_MESSAGE_RECEIVED = 
"received_deposePrimaryMessage";
+  private static final String CONTINUE_PUT_OP = "continue_put_op";
+  private static final String CONTINUE_DEPOSE_PRIMARY = 
"continue_depose_primary";
 
   public ParallelWANConflationDistributedTest() {
     super();
@@ -384,6 +406,103 @@ public class ParallelWANConflationDistributedTest extends 
WANTestBase {
     validateColocatedRegionContents(custKeyValues, orderKeyValues, 
shipmentKeyValues);
   }
 
+  @Test
+  public void 
wanEventNotLostWhenDoOperationOnColocatedRegionsDuringRebalance() throws 
Exception {
+    blackboard.initBlackboard();
+
+    initialSetUp();
+    int numberOfBuckets = 2;
+    int numberOfPuts = 101;
+    int newId = numberOfPuts + 1;
+
+    createSendersNoConflation();
+
+    Stream.of(vm4, vm5, vm6)
+        .forEach(server -> 
server.invoke(this::addDistributionMessageObserver));
+
+    vm4.invoke(
+        () -> createCustomerOrderShipmentPartitionedRegion("ln", 1, 
numberOfBuckets, isOffHeap(),
+            RegionShortcut.PARTITION_PROXY));
+    vm5.invoke(
+        () -> createCustomerOrderShipmentPartitionedRegion("ln", 1, 
numberOfBuckets, isOffHeap()));
+
+    startSenderInVMs("ln", vm4, vm5, vm6);
+    vm2.invoke(
+        () -> createCustomerOrderShipmentPartitionedRegion(null, 0, 
numberOfBuckets, isOffHeap()));
+    vm3.invoke(
+        () -> createCustomerOrderShipmentPartitionedRegion(null, 0, 
numberOfBuckets, isOffHeap()));
+
+    vm4.invoke(() -> putCustomerPartitionedRegion(numberOfPuts));
+    vm4.invoke(() -> putOrderPartitionedRegion(numberOfPuts));
+    vm4.invoke(() -> putShipmentPartitionedRegion(numberOfPuts));
+
+    vm6.invoke(
+        () -> createCustomerOrderShipmentPartitionedRegion("ln", 1, 
numberOfBuckets, isOffHeap()));
+
+    AsyncInvocation<?> asyncInvocation = vm4.invokeAsync(this::doRebalance);
+
+    if (!blackboard.isGateSignaled(DEPOSE_PRIMARY_MESSAGE_RECEIVED)) {
+      blackboard.waitForGate(DEPOSE_PRIMARY_MESSAGE_RECEIVED);
+    }
+    vm4.invokeAsync(() -> putInShipmentRegion(newId));
+
+    if (!blackboard.isGateSignaled(CONTINUE_PUT_OP)) {
+      blackboard.waitForGate(CONTINUE_PUT_OP);
+    }
+    vm4.invokeAsync(() -> putInCustomerRegion(newId));
+    vm4.invokeAsync(() -> putInOrderRegion(newId));
+
+    asyncInvocation.get();
+
+    vm2.invoke(() -> verifyContent(newId));
+    vm3.invoke(() -> verifyContent(newId));
+  }
+
+  private void addDistributionMessageObserver() {
+    DistributionMessageObserver.setInstance(new 
TestDistributionMessageObserver());
+  }
+
+  private RebalanceResults doRebalance() throws Exception {
+    ResourceManager manager = cache.getResourceManager();
+    return 
manager.createRebalanceFactory().includeRegions(null).start().getResults();
+  }
+
+  private void putInCustomerRegion(int id) {
+    CustId custid = new CustId(id);
+    Customer customer = new Customer("name" + id, "Address" + id);
+    customerRegion.put(custid, customer);
+  }
+
+  private void putInOrderRegion(int id) {
+    CustId custid = new CustId(id);
+    int oid = id + 1;
+    OrderId orderId = new OrderId(oid, custid);
+    Order order = new Order("ORDER" + oid);
+    orderRegion.put(orderId, order);
+  }
+
+  private void putInShipmentRegion(int id) {
+    CustId custid = new CustId(id);
+    int oid = id + 1;
+    OrderId orderId = new OrderId(oid, custid);
+    int sid = oid + 1;
+    ShipmentId shipmentId = new ShipmentId(sid, orderId);
+    Shipment shipment = new Shipment("Shipment" + sid);
+    shipmentRegion.put(shipmentId, shipment);
+  }
+
+  private void verifyContent(int id) {
+    CustId custid = new CustId(id);
+    int oid = id + 1;
+    OrderId orderId = new OrderId(oid, custid);
+    int sid = oid + 1;
+    ShipmentId shipmentId = new ShipmentId(sid, orderId);
+
+    await().untilAsserted(() -> 
assertThat(orderRegion.get(orderId)).isNotNull());
+    await().untilAsserted(() -> 
assertThat(shipmentRegion.get(shipmentId)).isNotNull());
+    await().untilAsserted(() -> 
assertThat(customerRegion.get(custid)).isNotNull());
+  }
+
   protected void validateColocatedRegionContents(Map<?, ?> custKeyValues, 
Map<?, ?> orderKeyValues,
       Map<?, ?> shipmentKeyValues) {
     vm2.invoke(() -> validateRegionSize(WANTestBase.customerRegionName, 
custKeyValues.size()));
@@ -508,4 +627,44 @@ public class ParallelWANConflationDistributedTest extends 
WANTestBase {
     vm7.invoke(() -> createSender("ln", 2, true, 100, 2, true, false, null, 
true));
   }
 
+  private class TestDistributionMessageObserver extends 
DistributionMessageObserver {
+    public void beforeProcessMessage(ClusterDistributionManager dm, 
DistributionMessage message) {
+      if (message instanceof DeposePrimaryBucketMessage) {
+        logger.info(
+            "TestDistributionMessageObserver.beforeProcessMessage about to 
signal received_deposePrimaryMessage gate",
+            new Exception());
+        blackboard.signalGate(DEPOSE_PRIMARY_MESSAGE_RECEIVED);
+        logger.info(
+            "TestDistributionMessageObserver.beforeProcessMessage done signal 
received_deposePrimaryMessage gate");
+      } else if (message instanceof 
DLockReleaseProcessor.DLockReleaseReplyMessage
+          && blackboard.isGateSignaled(DEPOSE_PRIMARY_MESSAGE_RECEIVED)) {
+        try {
+          logger.info(
+              "TestDistributionMessageObserver.beforeSendMessage about to 
signal on continue_put_op");
+          blackboard.signalGate(CONTINUE_PUT_OP);
+
+          logger.info(
+              "TestDistributionMessageObserver.beforeSendMessage waits for 
signal on "
+                  + CONTINUE_DEPOSE_PRIMARY);
+          blackboard.waitForGate(CONTINUE_DEPOSE_PRIMARY);
+          logger.info(
+              "TestDistributionMessageObserver.beforeSendMessage done wait for 
signal on "
+                  + CONTINUE_DEPOSE_PRIMARY);
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+
+    public void beforeSendMessage(ClusterDistributionManager dm, 
DistributionMessage message) {
+      if (blackboard.isGateSignaled(DEPOSE_PRIMARY_MESSAGE_RECEIVED)) {
+        if (message instanceof UpdateOperation.UpdateMessage) {
+          logger.info(
+              "TestDistributionMessageObserver.beforeSendMessage sending 
signal to stop wait on "
+                  + CONTINUE_DEPOSE_PRIMARY);
+          blackboard.signalGate(CONTINUE_DEPOSE_PRIMARY);
+        }
+      }
+    }
+  }
 }

Reply via email to