This is an automated email from the ASF dual-hosted git repository.
eshu11 pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 6f57745ce6 GEODE-10242: Colocated buckets share primary move lock
(#7630)
6f57745ce6 is described below
commit 6f57745ce65583d49459d5cb120985d3b72e26af
Author: Eric Shu <[email protected]>
AuthorDate: Tue May 3 10:48:00 2022 -0700
GEODE-10242: Colocated buckets share primary move lock (#7630)
* All colocated buckets now share the same primaryMoveReadWriteLock.
When parent bucket is being moved, no operations will be executed on
child buckets as well. So moving primary for all colocated buckets
shold be faster, and there is no need to hold parent locks anymore.
* Added.a dunit test to validate the fix works.
---
.../apache/geode/internal/cache/BucketAdvisor.java | 43 +++---
.../apache/geode/internal/cache/BucketRegion.java | 39 +----
.../geode/internal/cache/BucketAdvisorTest.java | 32 ++---
.../geode/internal/cache/wan/WANTestBase.java | 13 +-
.../ParallelWANConflationDistributedTest.java | 159 +++++++++++++++++++++
5 files changed, 210 insertions(+), 76 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
index 2b70f868d2..e6dcd3fb8f 100755
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
@@ -142,15 +142,15 @@ public class BucketAdvisor extends
CacheDistributionAdvisor {
* A read/write lock to prevent making this bucket not primary while a write
is in progress on the
* bucket.
*/
- private final ReadWriteLock primaryMoveReadWriteLock = new
ReentrantReadWriteLock();
- private final Lock primaryMoveReadLock = primaryMoveReadWriteLock.readLock();
- private final Lock primaryMoveWriteLock =
primaryMoveReadWriteLock.writeLock();
+ private final ReadWriteLock primaryMoveReadWriteLock;
+ private final Lock primaryMoveReadLock;
+ private final Lock primaryMoveWriteLock;
/**
* The advisor for the bucket region that we are colocated with, if this
region is a colocated
* region.
*/
- private BucketAdvisor parentAdvisor;
+ private final BucketAdvisor parentAdvisor;
/**
* The member that is responsible for choosing the primary for this bucket.
While this field is
@@ -180,7 +180,15 @@ public class BucketAdvisor extends
CacheDistributionAdvisor {
pRegion = this.regionAdvisor.getPartitionedRegion();
redundancyTracker =
new BucketRedundancyTracker(pRegion.getRedundantCopies(),
pRegion.getRedundancyTracker());
- resetParentAdvisor(bucket.getId());
+ parentAdvisor = getParentAdvisor(bucket.getId());
+
+ if (parentAdvisor == null) {
+ primaryMoveReadWriteLock = new ReentrantReadWriteLock();
+ } else {
+ primaryMoveReadWriteLock = parentAdvisor.primaryMoveReadWriteLock;
+ }
+ primaryMoveReadLock = primaryMoveReadWriteLock.readLock();
+ primaryMoveWriteLock = primaryMoveReadWriteLock.writeLock();
}
public static BucketAdvisor createBucketAdvisor(Bucket bucket, RegionAdvisor
regionAdvisor) {
@@ -189,7 +197,7 @@ public class BucketAdvisor extends CacheDistributionAdvisor
{
return advisor;
}
- private void resetParentAdvisor(int bucketId) {
+ private BucketAdvisor getParentAdvisor(int bucketId) {
PartitionedRegion colocatedRegion =
ColocationHelper.getColocatedRegion(pRegion);
if (colocatedRegion != null) {
if (colocatedRegion.isFixedPartitionedRegion()) {
@@ -197,18 +205,15 @@ public class BucketAdvisor extends
CacheDistributionAdvisor {
if (fpas != null) {
for (FixedPartitionAttributesImpl fpa : fpas) {
if (fpa.hasBucket(bucketId)) {
- parentAdvisor =
-
colocatedRegion.getRegionAdvisor().getBucketAdvisor(fpa.getStartingBucketID());
- break;
+ return
colocatedRegion.getRegionAdvisor().getBucketAdvisor(fpa.getStartingBucketID());
}
}
}
} else {
- parentAdvisor =
colocatedRegion.getRegionAdvisor().getBucketAdvisor(bucketId);
+ return colocatedRegion.getRegionAdvisor().getBucketAdvisor(bucketId);
}
- } else {
- parentAdvisor = null;
}
+ return null;
}
private void assignStartingBucketAdvisorIfFixedPartitioned() {
@@ -240,19 +245,6 @@ public class BucketAdvisor extends
CacheDistributionAdvisor {
return primaryMoveReadLock;
}
- /**
- * Returns the lock that prevents the parent's primary from moving while
active writes are in
- * progress. This should be locked before checking if the local bucket is
primary.
- *
- * @return the lock for in-progress write operations
- */
- Lock getParentPrimaryMoveReadLock() {
- if (parentAdvisor != null) {
- return parentAdvisor.getPrimaryMoveReadLock();
- }
- return null;
- }
-
/**
* Try to lock the primary bucket to make sure no operation is on-going at
current bucket.
*
@@ -899,6 +891,7 @@ public class BucketAdvisor extends CacheDistributionAdvisor
{
releasePrimaryLock();
// this was a deposePrimary call so we need to depose children as well
deposePrimaryForColocatedChildren();
+
if (pRegion.isFixedPartitionedRegion()) {
deposeOtherPrimaryBucketForFixedPartition();
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index 421a854498..ffb1109115 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -799,7 +799,7 @@ public class BucketRegion extends DistributedRegion
implements Bucket {
}
/**
- * lock this bucket and, if present, its colocated "parent"
+ * lock this bucket
*
* @param tryLock - whether to use tryLock (true) or a blocking lock (false)
* @return true if locks were obtained and are still held
@@ -832,41 +832,20 @@ public class BucketRegion extends DistributedRegion
implements Bucket {
private boolean lockPrimaryStateReadLock(boolean tryLock) {
Lock primaryMoveReadLock = getBucketAdvisor().getPrimaryMoveReadLock();
- Lock parentLock = getBucketAdvisor().getParentPrimaryMoveReadLock();
for (;;) {
boolean interrupted = Thread.interrupted();
try {
// Get the lock. If we have to wait here, it's because
// this VM is actively becoming "not primary". We don't want
// to throw an exception until this VM is actually no longer
- // primary, so we wait here for not primary to complete. See bug #39963
- if (parentLock != null) {
- if (tryLock) {
- boolean locked = parentLock.tryLock();
- if (!locked) {
- return false;
- }
- } else {
- parentLock.lockInterruptibly();
- }
- if (tryLock) {
- boolean locked = primaryMoveReadLock.tryLock();
- if (!locked) {
- parentLock.unlock();
- return false;
- }
- } else {
- primaryMoveReadLock.lockInterruptibly();
+ // primary, so we wait here for not primary to complete.
+ if (tryLock) {
+ boolean locked = primaryMoveReadLock.tryLock();
+ if (!locked) {
+ return false;
}
} else {
- if (tryLock) {
- boolean locked = primaryMoveReadLock.tryLock();
- if (!locked) {
- return false;
- }
- } else {
- primaryMoveReadLock.lockInterruptibly();
- }
+ primaryMoveReadLock.lockInterruptibly();
}
break; // success
} catch (InterruptedException e) {
@@ -886,10 +865,6 @@ public class BucketRegion extends DistributedRegion
implements Bucket {
public void doUnlockForPrimary() {
Lock primaryMoveReadLock = getBucketAdvisor().getPrimaryMoveReadLock();
primaryMoveReadLock.unlock();
- Lock parentLock = getBucketAdvisor().getParentPrimaryMoveReadLock();
- if (parentLock != null) {
- parentLock.unlock();
- }
}
/**
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java
index 817386cb7f..c406092c85 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java
@@ -19,7 +19,6 @@ import static org.apache.geode.cache.Region.SEPARATOR;
import static
org.apache.geode.internal.cache.CacheServerImpl.CACHE_SERVER_BIND_ADDRESS_NOT_AVAILABLE_EXCEPTION_MESSAGE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doNothing;
@@ -33,7 +32,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.apache.geode.cache.PartitionAttributes;
@@ -43,10 +42,10 @@ import
org.apache.geode.distributed.internal.membership.InternalDistributedMembe
import org.apache.geode.internal.cache.partitioned.Bucket;
import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
-public class BucketAdvisorTest {
+class BucketAdvisorTest {
@Test
- public void shouldBeMockable() throws Exception {
+ void shouldBeMockable() throws Exception {
BucketAdvisor mockBucketAdvisor = mock(BucketAdvisor.class);
InternalDistributedMember mockInternalDistributedMember =
mock(InternalDistributedMember.class);
@@ -58,7 +57,7 @@ public class BucketAdvisorTest {
}
@Test
- public void
whenServerStopsAfterTheFirstIsRunningCheckThenItShouldNotBeAddedToLocations() {
+ void
whenServerStopsAfterTheFirstIsRunningCheckThenItShouldNotBeAddedToLocations() {
InternalCache mockCache = mock(InternalCache.class);
ProxyBucketRegion mockBucket = mock(ProxyBucketRegion.class);
RegionAdvisor mockRegionAdvisor = mock(RegionAdvisor.class);
@@ -86,7 +85,7 @@ public class BucketAdvisorTest {
}
@Test
- public void
whenServerThrowsIllegalStateExceptionWithoutBindAddressMsgThenExceptionMustBeThrown()
{
+ void
whenServerThrowsIllegalStateExceptionWithoutBindAddressMsgThenExceptionMustBeThrown()
{
InternalCache mockCache = mock(InternalCache.class);
ProxyBucketRegion mockBucket = mock(ProxyBucketRegion.class);
RegionAdvisor mockRegionAdvisor = mock(RegionAdvisor.class);
@@ -114,7 +113,7 @@ public class BucketAdvisorTest {
}
@Test
- public void volunteerForPrimaryIgnoresMissingPrimaryElector() {
+ void volunteerForPrimaryIgnoresMissingPrimaryElector() {
DistributionManager distributionManager = mock(DistributionManager.class);
when(distributionManager.getId()).thenReturn(new
InternalDistributedMember("localhost", 321));
@@ -153,12 +152,13 @@ public class BucketAdvisorTest {
mock(BucketAdvisor.VolunteeringDelegate.class);
advisorSpy.setVolunteeringDelegate(volunteeringDelegate);
advisorSpy.initializePrimaryElector(missingElectorId);
- assertEquals(missingElectorId, advisorSpy.getPrimaryElector());
+ assertThat(missingElectorId).isEqualTo(advisorSpy.getPrimaryElector());
advisorSpy.volunteerForPrimary();
verify(volunteeringDelegate).volunteerForPrimary();
}
- BucketAdvisor mockBucketAdvisorWithShadowBucketsDestroyedMap(Map<String,
Boolean> shadowBuckets) {
+ BucketAdvisor mockBucketAdvisorWithShadowBucketsDestroyedMap(
+ Map<String, Boolean> shadowBuckets) {
DistributionManager distributionManager = mock(DistributionManager.class);
when(distributionManager.getId()).thenReturn(new
InternalDistributedMember("localhost", 321));
@@ -180,7 +180,7 @@ public class BucketAdvisorTest {
}
@Test
- public void
markAllShadowBucketsAsNonDestroyedShouldClearTheShadowBucketsDestroyedMap() {
+ void
markAllShadowBucketsAsNonDestroyedShouldClearTheShadowBucketsDestroyedMap() {
Map<String, Boolean> buckets = of(SEPARATOR + "b1", false, SEPARATOR +
"b2", true);
BucketAdvisor bucketAdvisor =
mockBucketAdvisorWithShadowBucketsDestroyedMap(buckets);
@@ -190,7 +190,7 @@ public class BucketAdvisorTest {
}
@Test
- public void
markAllShadowBucketsAsDestroyedShouldSetTheFlagAsTrueForEveryKnownShadowBucket()
{
+ void
markAllShadowBucketsAsDestroyedShouldSetTheFlagAsTrueForEveryKnownShadowBucket()
{
Map<String, Boolean> buckets =
of(SEPARATOR + "b1", false, SEPARATOR + "b2", false, SEPARATOR + "b3",
false);
BucketAdvisor bucketAdvisor =
mockBucketAdvisorWithShadowBucketsDestroyedMap(buckets);
@@ -201,7 +201,7 @@ public class BucketAdvisorTest {
}
@Test
- public void
markShadowBucketAsDestroyedShouldSetTheFlagAsTrueOnlyForTheSpecificBucket() {
+ void
markShadowBucketAsDestroyedShouldSetTheFlagAsTrueOnlyForTheSpecificBucket() {
Map<String, Boolean> buckets = of(SEPARATOR + "b1", false);
BucketAdvisor bucketAdvisor =
mockBucketAdvisorWithShadowBucketsDestroyedMap(buckets);
@@ -217,7 +217,7 @@ public class BucketAdvisorTest {
}
@Test
- public void isShadowBucketDestroyedShouldReturnCorrectly() {
+ void isShadowBucketDestroyedShouldReturnCorrectly() {
Map<String, Boolean> buckets = of(SEPARATOR + "b1", true, SEPARATOR +
"b2", false);
BucketAdvisor bucketAdvisor =
mockBucketAdvisorWithShadowBucketsDestroyedMap(buckets);
@@ -230,7 +230,7 @@ public class BucketAdvisorTest {
}
@Test
- public void
testGetAllHostingMembersReturnsNoMembersWhenBucketAdvisorHasNoProfiles() {
+ void
testGetAllHostingMembersReturnsNoMembersWhenBucketAdvisorHasNoProfiles() {
DistributionManager distributionManager = mock(DistributionManager.class);
when(distributionManager.getId()).thenReturn(new
InternalDistributedMember("localhost", 321));
@@ -252,7 +252,7 @@ public class BucketAdvisorTest {
}
@Test
- public void
testGetAllHostingMembersReturnsMemberWhenBucketAdvisorHasOneProfileWithHostingBucket()
{
+ void
testGetAllHostingMembersReturnsMemberWhenBucketAdvisorHasOneProfileWithHostingBucket()
{
DistributionManager distributionManager = mock(DistributionManager.class);
InternalDistributedMember memberId = new
InternalDistributedMember("localhost", 321);
@@ -282,7 +282,7 @@ public class BucketAdvisorTest {
}
@Test
- public void
testGetAllHostingMembersReturnsMemberWhenBucketAdvisorHasTwoProfilesAndOneIsHostingBucket()
{
+ void
testGetAllHostingMembersReturnsMemberWhenBucketAdvisorHasTwoProfilesAndOneIsHostingBucket()
{
DistributionManager distributionManager = mock(DistributionManager.class);
InternalDistributedMember memberId = new
InternalDistributedMember("localhost", 321);
InternalDistributedMember memberId2 = new
InternalDistributedMember("localhost", 323);
diff --git
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index 16ea5f6b91..ce7d19abc5 100644
---
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -770,10 +770,17 @@ public class WANTestBase extends DistributedTestCase {
public static void createCustomerOrderShipmentPartitionedRegion(String
senderIds,
Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap) {
+ createCustomerOrderShipmentPartitionedRegion(senderIds, redundantCopies,
totalNumBuckets,
+ offHeap, RegionShortcut.PARTITION);
+ }
+
+ public static void createCustomerOrderShipmentPartitionedRegion(String
senderIds,
+ Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap,
+ RegionShortcut regionShortcut) {
IgnoredException exp =
addIgnoredException(ForceReattemptException.class.getName());
try {
- RegionFactory<?, ?> fact =
cache.createRegionFactory(RegionShortcut.PARTITION);
+ RegionFactory<?, ?> fact = cache.createRegionFactory(regionShortcut);
if (senderIds != null) {
StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
while (tokenizer.hasMoreTokens()) {
@@ -795,7 +802,7 @@ public class WANTestBase extends DistributedTestCase {
paf.setRedundantCopies(redundantCopies).setTotalNumBuckets(totalNumBuckets)
.setColocatedWith(customerRegionName)
.setPartitionResolver(new
CustomerIDPartitionResolver("CustomerIDPartitionResolver"));
- fact = cache.createRegionFactory(RegionShortcut.PARTITION);
+ fact = cache.createRegionFactory(regionShortcut);
if (senderIds != null) {
StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
while (tokenizer.hasMoreTokens()) {
@@ -813,7 +820,7 @@ public class WANTestBase extends DistributedTestCase {
paf.setRedundantCopies(redundantCopies).setTotalNumBuckets(totalNumBuckets)
.setColocatedWith(orderRegionName)
.setPartitionResolver(new
CustomerIDPartitionResolver("CustomerIDPartitionResolver"));
- fact = cache.createRegionFactory(RegionShortcut.PARTITION);
+ fact = cache.createRegionFactory(regionShortcut);
if (senderIds != null) {
StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
while (tokenizer.hasMoreTokens()) {
diff --git
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDistributedTest.java
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDistributedTest.java
index f56c12847e..8e732c57f0 100644
---
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDistributedTest.java
+++
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDistributedTest.java
@@ -21,23 +21,45 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.stream.Stream;
+import org.apache.logging.log4j.Logger;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.control.RebalanceResults;
+import org.apache.geode.cache.control.ResourceManager;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.DistributionMessageObserver;
+import org.apache.geode.distributed.internal.locks.DLockReleaseProcessor;
+import org.apache.geode.internal.cache.UpdateOperation;
import org.apache.geode.internal.cache.execute.data.CustId;
import org.apache.geode.internal.cache.execute.data.Customer;
import org.apache.geode.internal.cache.execute.data.Order;
import org.apache.geode.internal.cache.execute.data.OrderId;
import org.apache.geode.internal.cache.execute.data.Shipment;
import org.apache.geode.internal.cache.execute.data.ShipmentId;
+import org.apache.geode.internal.cache.partitioned.DeposePrimaryBucketMessage;
import org.apache.geode.internal.cache.wan.WANTestBase;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.test.dunit.rules.DistributedBlackboard;
import org.apache.geode.test.junit.categories.WanTest;
@Category({WanTest.class})
public class ParallelWANConflationDistributedTest extends WANTestBase {
+ @Rule
+ public DistributedBlackboard blackboard = new DistributedBlackboard();
+
private static final long serialVersionUID = 1L;
+ private static final Logger logger = LogService.getLogger();
+ private static final String DEPOSE_PRIMARY_MESSAGE_RECEIVED =
"received_deposePrimaryMessage";
+ private static final String CONTINUE_PUT_OP = "continue_put_op";
+ private static final String CONTINUE_DEPOSE_PRIMARY =
"continue_depose_primary";
public ParallelWANConflationDistributedTest() {
super();
@@ -384,6 +406,103 @@ public class ParallelWANConflationDistributedTest extends
WANTestBase {
validateColocatedRegionContents(custKeyValues, orderKeyValues,
shipmentKeyValues);
}
+ @Test
+ public void
wanEventNotLostWhenDoOperationOnColocatedRegionsDuringRebalance() throws
Exception {
+ blackboard.initBlackboard();
+
+ initialSetUp();
+ int numberOfBuckets = 2;
+ int numberOfPuts = 101;
+ int newId = numberOfPuts + 1;
+
+ createSendersNoConflation();
+
+ Stream.of(vm4, vm5, vm6)
+ .forEach(server ->
server.invoke(this::addDistributionMessageObserver));
+
+ vm4.invoke(
+ () -> createCustomerOrderShipmentPartitionedRegion("ln", 1,
numberOfBuckets, isOffHeap(),
+ RegionShortcut.PARTITION_PROXY));
+ vm5.invoke(
+ () -> createCustomerOrderShipmentPartitionedRegion("ln", 1,
numberOfBuckets, isOffHeap()));
+
+ startSenderInVMs("ln", vm4, vm5, vm6);
+ vm2.invoke(
+ () -> createCustomerOrderShipmentPartitionedRegion(null, 0,
numberOfBuckets, isOffHeap()));
+ vm3.invoke(
+ () -> createCustomerOrderShipmentPartitionedRegion(null, 0,
numberOfBuckets, isOffHeap()));
+
+ vm4.invoke(() -> putCustomerPartitionedRegion(numberOfPuts));
+ vm4.invoke(() -> putOrderPartitionedRegion(numberOfPuts));
+ vm4.invoke(() -> putShipmentPartitionedRegion(numberOfPuts));
+
+ vm6.invoke(
+ () -> createCustomerOrderShipmentPartitionedRegion("ln", 1,
numberOfBuckets, isOffHeap()));
+
+ AsyncInvocation<?> asyncInvocation = vm4.invokeAsync(this::doRebalance);
+
+ if (!blackboard.isGateSignaled(DEPOSE_PRIMARY_MESSAGE_RECEIVED)) {
+ blackboard.waitForGate(DEPOSE_PRIMARY_MESSAGE_RECEIVED);
+ }
+ vm4.invokeAsync(() -> putInShipmentRegion(newId));
+
+ if (!blackboard.isGateSignaled(CONTINUE_PUT_OP)) {
+ blackboard.waitForGate(CONTINUE_PUT_OP);
+ }
+ vm4.invokeAsync(() -> putInCustomerRegion(newId));
+ vm4.invokeAsync(() -> putInOrderRegion(newId));
+
+ asyncInvocation.get();
+
+ vm2.invoke(() -> verifyContent(newId));
+ vm3.invoke(() -> verifyContent(newId));
+ }
+
+ private void addDistributionMessageObserver() {
+ DistributionMessageObserver.setInstance(new
TestDistributionMessageObserver());
+ }
+
+ private RebalanceResults doRebalance() throws Exception {
+ ResourceManager manager = cache.getResourceManager();
+ return
manager.createRebalanceFactory().includeRegions(null).start().getResults();
+ }
+
+ private void putInCustomerRegion(int id) {
+ CustId custid = new CustId(id);
+ Customer customer = new Customer("name" + id, "Address" + id);
+ customerRegion.put(custid, customer);
+ }
+
+ private void putInOrderRegion(int id) {
+ CustId custid = new CustId(id);
+ int oid = id + 1;
+ OrderId orderId = new OrderId(oid, custid);
+ Order order = new Order("ORDER" + oid);
+ orderRegion.put(orderId, order);
+ }
+
+ private void putInShipmentRegion(int id) {
+ CustId custid = new CustId(id);
+ int oid = id + 1;
+ OrderId orderId = new OrderId(oid, custid);
+ int sid = oid + 1;
+ ShipmentId shipmentId = new ShipmentId(sid, orderId);
+ Shipment shipment = new Shipment("Shipment" + sid);
+ shipmentRegion.put(shipmentId, shipment);
+ }
+
+ private void verifyContent(int id) {
+ CustId custid = new CustId(id);
+ int oid = id + 1;
+ OrderId orderId = new OrderId(oid, custid);
+ int sid = oid + 1;
+ ShipmentId shipmentId = new ShipmentId(sid, orderId);
+
+ await().untilAsserted(() ->
assertThat(orderRegion.get(orderId)).isNotNull());
+ await().untilAsserted(() ->
assertThat(shipmentRegion.get(shipmentId)).isNotNull());
+ await().untilAsserted(() ->
assertThat(customerRegion.get(custid)).isNotNull());
+ }
+
protected void validateColocatedRegionContents(Map<?, ?> custKeyValues,
Map<?, ?> orderKeyValues,
Map<?, ?> shipmentKeyValues) {
vm2.invoke(() -> validateRegionSize(WANTestBase.customerRegionName,
custKeyValues.size()));
@@ -508,4 +627,44 @@ public class ParallelWANConflationDistributedTest extends
WANTestBase {
vm7.invoke(() -> createSender("ln", 2, true, 100, 2, true, false, null,
true));
}
+ private class TestDistributionMessageObserver extends
DistributionMessageObserver {
+ public void beforeProcessMessage(ClusterDistributionManager dm,
DistributionMessage message) {
+ if (message instanceof DeposePrimaryBucketMessage) {
+ logger.info(
+ "TestDistributionMessageObserver.beforeProcessMessage about to
signal received_deposePrimaryMessage gate",
+ new Exception());
+ blackboard.signalGate(DEPOSE_PRIMARY_MESSAGE_RECEIVED);
+ logger.info(
+ "TestDistributionMessageObserver.beforeProcessMessage done signal
received_deposePrimaryMessage gate");
+ } else if (message instanceof
DLockReleaseProcessor.DLockReleaseReplyMessage
+ && blackboard.isGateSignaled(DEPOSE_PRIMARY_MESSAGE_RECEIVED)) {
+ try {
+ logger.info(
+ "TestDistributionMessageObserver.beforeSendMessage about to
signal on continue_put_op");
+ blackboard.signalGate(CONTINUE_PUT_OP);
+
+ logger.info(
+ "TestDistributionMessageObserver.beforeSendMessage waits for
signal on "
+ + CONTINUE_DEPOSE_PRIMARY);
+ blackboard.waitForGate(CONTINUE_DEPOSE_PRIMARY);
+ logger.info(
+ "TestDistributionMessageObserver.beforeSendMessage done wait for
signal on "
+ + CONTINUE_DEPOSE_PRIMARY);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public void beforeSendMessage(ClusterDistributionManager dm,
DistributionMessage message) {
+ if (blackboard.isGateSignaled(DEPOSE_PRIMARY_MESSAGE_RECEIVED)) {
+ if (message instanceof UpdateOperation.UpdateMessage) {
+ logger.info(
+ "TestDistributionMessageObserver.beforeSendMessage sending
signal to stop wait on "
+ + CONTINUE_DEPOSE_PRIMARY);
+ blackboard.signalGate(CONTINUE_DEPOSE_PRIMARY);
+ }
+ }
+ }
+ }
}