[ 
https://issues.apache.org/jira/browse/GEODE-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17095355#comment-17095355
 ] 

ASF GitHub Bot commented on GEODE-7678:
---------------------------------------

jujoramos commented on a change in pull request #4987:
URL: https://github.com/apache/geode/pull/4987#discussion_r417216163



##########
File path: 
geode-core/src/distributedTest/java/org/apache/geode/cache/ReplicateCacheListenerDistributedTest.java
##########
@@ -302,4 +349,64 @@ public void afterDestroy(final EntryEvent<String, Integer> 
event) {
       errorCollector.checkThat(event.getNewValue(), nullValue());
     }
   }
+
+  protected class ClearCountingCacheListener extends BaseCacheListener {
+
+    @Override
+    public void afterCreate(final EntryEvent<String, Integer> event) {
+      sharedCountersRule.increment(CREATES);
+    }
+
+    @Override
+    public void afterUpdate(final EntryEvent<String, Integer> event) {
+      sharedCountersRule.increment(UPDATES);
+    }
+
+    @Override
+    public void afterRegionClear(RegionEvent<String, Integer> event) {
+
+      sharedCountersRule.increment(CLEAR);
+      if 
(!event.getRegion().getAttributes().getDataPolicy().withPartitioning()) {
+        if (event.isOriginRemote()) {
+          errorCollector.checkThat(event.getDistributedMember(),
+              not(cacheRule.getSystem().getDistributedMember()));
+        } else {
+          errorCollector.checkThat(event.getDistributedMember(),
+              equalTo(cacheRule.getSystem().getDistributedMember()));
+        }
+      }
+      errorCollector.checkThat(event.getOperation(), 
equalTo(Operation.REGION_CLEAR));
+      errorCollector.checkThat(event.getRegion().getName(), 
equalTo(regionName));
+    }
+  }
+
+  protected class RegionDestroyCountingCacheListener extends BaseCacheListener 
{
+
+    @Override
+    public void afterCreate(final EntryEvent<String, Integer> event) {
+      sharedCountersRule.increment(CREATES);
+    }
+
+    @Override
+    public void afterUpdate(final EntryEvent<String, Integer> event) {
+      sharedCountersRule.increment(UPDATES);
+    }
+
+    @Override
+    public void afterRegionDestroy(final RegionEvent<String, Integer> event) {
+      sharedCountersRule.increment(REGION_DESTROY);
+
+      if 
(!event.getRegion().getAttributes().getDataPolicy().withPartitioning()) {
+        if (event.isOriginRemote()) {
+          errorCollector.checkThat(event.getDistributedMember(),
+              not(cacheRule.getSystem().getDistributedMember()));
+        } else {
+          errorCollector.checkThat(event.getDistributedMember(),
+              equalTo(cacheRule.getSystem().getDistributedMember()));
+        }
+      }

Review comment:
       This code is duplicated in the test class, maybe better to extract this 
to its own method?.

##########
File path: 
geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionAfterClearNotificationDUnitTest.java
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.geode.internal.Assert.fail;
+import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getCache;
+import static 
org.apache.geode.test.dunit.rules.ClusterStartupRule.getClientCache;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.DistributionMessageObserver;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.DUnitBlackboard;
+import org.apache.geode.test.dunit.SerializableCallableIF;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+public class PartitionedRegionAfterClearNotificationDUnitTest implements 
Serializable {
+  protected static final String REGION_NAME = "testPR";
+  protected static final int NUM_ENTRIES = 100;
+
+  protected int locatorPort;
+  protected MemberVM locator;
+  protected MemberVM dataStore1, dataStore2, dataStore3, accessor;
+  protected ClientVM client1, client2;
+
+  private static final Logger logger = LogManager.getLogger();

Review comment:
       You can delete this field as it's not used.

##########
File path: 
geode-core/src/distributedTest/java/org/apache/geode/cache/PRCacheListenerDistributedTest.java
##########
@@ -38,22 +45,57 @@
 @SuppressWarnings("serial")
 public class PRCacheListenerDistributedTest extends 
ReplicateCacheListenerDistributedTest {
 
-  @Parameters(name = "{index}: redundancy={0}")
-  public static Iterable<Integer> data() {
-    return Arrays.asList(0, 3);
+  @Parameters
+  public static Collection<Object[]> data() {
+    return Arrays.asList(new Object[][] {
+        {1, Boolean.FALSE},
+        {3, Boolean.TRUE},
+    });
   }
 
   @Parameter
   public int redundancy;
 
+  @Parameter(1)
+  public Boolean withData;
+
   @Override
   protected Region<String, Integer> createRegion(final String name,
       final CacheListener<String, Integer> listener) {
+    LogService.getLogger()
+        .info("Params [Redundancy: " + redundancy + " withData:" + withData + 
"]");
     PartitionAttributesFactory<String, Integer> paf = new 
PartitionAttributesFactory<>();
     paf.setRedundantCopies(redundancy);
 
     RegionFactory<String, Integer> regionFactory = 
cacheRule.getCache().createRegionFactory();
-    regionFactory.addCacheListener(listener);
+    if (listener != null) {
+      regionFactory.addCacheListener(listener);
+    }
+    regionFactory.setDataPolicy(DataPolicy.PARTITION);
+    regionFactory.setPartitionAttributes(paf.create());
+
+    return regionFactory.create(name);

Review comment:
       This code is duplicated in the test class, maybe better to extract this 
to its own method?.

##########
File path: 
geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionAfterClearNotificationDUnitTest.java
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.geode.internal.Assert.fail;
+import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getCache;
+import static 
org.apache.geode.test.dunit.rules.ClusterStartupRule.getClientCache;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.DistributionMessageObserver;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.DUnitBlackboard;
+import org.apache.geode.test.dunit.SerializableCallableIF;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+public class PartitionedRegionAfterClearNotificationDUnitTest implements 
Serializable {
+  protected static final String REGION_NAME = "testPR";
+  protected static final int NUM_ENTRIES = 100;
+
+  protected int locatorPort;
+  protected MemberVM locator;
+  protected MemberVM dataStore1, dataStore2, dataStore3, accessor;
+  protected ClientVM client1, client2;
+
+  private static final Logger logger = LogManager.getLogger();
+
+  private static volatile DUnitBlackboard blackboard;
+
+  @Rule
+  public ClusterStartupRule cluster = new ClusterStartupRule(7);
+
+  @Before
+  public void setUp() throws Exception {
+    locator = cluster.startLocatorVM(0);
+    locatorPort = locator.getPort();
+    dataStore1 = cluster.startServerVM(1, getProperties(), locatorPort);
+    dataStore2 = cluster.startServerVM(2, getProperties(), locatorPort);
+    dataStore3 = cluster.startServerVM(3, getProperties(), locatorPort);
+    accessor = cluster.startServerVM(4, getProperties(), locatorPort);
+
+    client1 = cluster.startClientVM(5,
+        c -> 
c.withPoolSubscription(true).withLocatorConnection((locatorPort)));
+    client2 = cluster.startClientVM(6,
+        c -> 
c.withPoolSubscription(true).withLocatorConnection((locatorPort)));
+
+    dataStore1.invoke(this::initDataStore);
+    dataStore2.invoke(this::initDataStore);
+    dataStore3.invoke(this::initDataStore);
+    accessor.invoke(this::initAccessor);
+
+    getBlackboard().initBlackboard();
+  }
+
+  protected RegionShortcut getRegionShortCut() {
+    return RegionShortcut.PARTITION_REDUNDANT;
+  }
+
+  protected Properties getProperties() {
+    Properties properties = new Properties();
+    // properties.setProperty("log-level", "info");
+    return properties;
+  }
+
+  private Region getRegion(boolean isClient) {
+    if (isClient) {
+      return getClientCache().getRegion(REGION_NAME);
+    } else {
+      return getCache().getRegion(REGION_NAME);
+    }
+  }
+
+  private void verifyRegionSize(boolean isClient, int expectedNum) {
+    GeodeAwaitility.await().atMost(10, SECONDS)

Review comment:
       Any reason for overriding the default `GeodeAwaitility` timeout?, maybe 
we should leave the default to avoid flakiness (for those cases there's a 
resource contention while running the tests)?.

##########
File path: 
geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionAfterClearNotificationDUnitTest.java
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.geode.internal.Assert.fail;
+import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getCache;
+import static 
org.apache.geode.test.dunit.rules.ClusterStartupRule.getClientCache;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.DistributionMessageObserver;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.DUnitBlackboard;
+import org.apache.geode.test.dunit.SerializableCallableIF;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+public class PartitionedRegionAfterClearNotificationDUnitTest implements 
Serializable {
+  protected static final String REGION_NAME = "testPR";
+  protected static final int NUM_ENTRIES = 100;
+
+  protected int locatorPort;
+  protected MemberVM locator;
+  protected MemberVM dataStore1, dataStore2, dataStore3, accessor;
+  protected ClientVM client1, client2;
+
+  private static final Logger logger = LogManager.getLogger();
+
+  private static volatile DUnitBlackboard blackboard;
+
+  @Rule
+  public ClusterStartupRule cluster = new ClusterStartupRule(7);
+
+  @Before
+  public void setUp() throws Exception {
+    locator = cluster.startLocatorVM(0);
+    locatorPort = locator.getPort();
+    dataStore1 = cluster.startServerVM(1, getProperties(), locatorPort);
+    dataStore2 = cluster.startServerVM(2, getProperties(), locatorPort);
+    dataStore3 = cluster.startServerVM(3, getProperties(), locatorPort);
+    accessor = cluster.startServerVM(4, getProperties(), locatorPort);
+
+    client1 = cluster.startClientVM(5,
+        c -> 
c.withPoolSubscription(true).withLocatorConnection((locatorPort)));
+    client2 = cluster.startClientVM(6,
+        c -> 
c.withPoolSubscription(true).withLocatorConnection((locatorPort)));
+
+    dataStore1.invoke(this::initDataStore);
+    dataStore2.invoke(this::initDataStore);
+    dataStore3.invoke(this::initDataStore);
+    accessor.invoke(this::initAccessor);
+
+    getBlackboard().initBlackboard();
+  }
+
+  protected RegionShortcut getRegionShortCut() {
+    return RegionShortcut.PARTITION_REDUNDANT;
+  }
+
+  protected Properties getProperties() {
+    Properties properties = new Properties();
+    // properties.setProperty("log-level", "info");
+    return properties;
+  }
+
+  private Region getRegion(boolean isClient) {
+    if (isClient) {
+      return getClientCache().getRegion(REGION_NAME);
+    } else {
+      return getCache().getRegion(REGION_NAME);
+    }
+  }
+
+  private void verifyRegionSize(boolean isClient, int expectedNum) {
+    GeodeAwaitility.await().atMost(10, SECONDS)
+        .untilAsserted(() -> 
assertThat(getRegion(isClient).size()).isEqualTo(expectedNum));
+  }
+
+  private void initClientCache() {
+    Region region = 
getClientCache().createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY)
+        .create(REGION_NAME);
+    region.registerInterestForAllKeys(InterestResultPolicy.KEYS);
+  }
+
+  private void stopServers() {
+    List<CacheServer> cacheServers = getCache().getCacheServers();
+    for (CacheServer server : cacheServers) {
+      server.stop();
+    }
+  }
+
+  private void initDataStore() {
+    getCache().createRegionFactory(getRegionShortCut())
+        .setPartitionAttributes(new 
PartitionAttributesFactory().setTotalNumBuckets(10).create())
+        .addCacheListener(new CountingCacheListener())
+        .create(REGION_NAME);
+  }
+
+  private void initAccessor() {
+    RegionShortcut shortcut = getRegionShortCut();
+    if (shortcut.isPersistent()) {
+      if (shortcut == RegionShortcut.PARTITION_PERSISTENT) {
+        shortcut = RegionShortcut.PARTITION;
+      } else if (shortcut == RegionShortcut.PARTITION_PERSISTENT_OVERFLOW) {
+        shortcut = RegionShortcut.PARTITION_OVERFLOW;
+      } else if (shortcut == RegionShortcut.PARTITION_REDUNDANT_PERSISTENT) {
+        shortcut = RegionShortcut.PARTITION_REDUNDANT;
+      } else if (shortcut == 
RegionShortcut.PARTITION_REDUNDANT_PERSISTENT_OVERFLOW) {
+        shortcut = RegionShortcut.PARTITION_REDUNDANT_OVERFLOW;
+      } else {
+        fail("Wrong region type:" + shortcut);

Review comment:
       Can we throw an exception here instead of using the `fail` method?.

##########
File path: 
geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionAfterClearNotificationDUnitTest.java
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.geode.internal.Assert.fail;
+import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getCache;
+import static 
org.apache.geode.test.dunit.rules.ClusterStartupRule.getClientCache;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.DistributionMessageObserver;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.DUnitBlackboard;
+import org.apache.geode.test.dunit.SerializableCallableIF;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+public class PartitionedRegionAfterClearNotificationDUnitTest implements 
Serializable {
+  protected static final String REGION_NAME = "testPR";
+  protected static final int NUM_ENTRIES = 100;
+
+  protected int locatorPort;
+  protected MemberVM locator;
+  protected MemberVM dataStore1, dataStore2, dataStore3, accessor;
+  protected ClientVM client1, client2;
+
+  private static final Logger logger = LogManager.getLogger();
+
+  private static volatile DUnitBlackboard blackboard;
+
+  @Rule
+  public ClusterStartupRule cluster = new ClusterStartupRule(7);
+
+  @Before
+  public void setUp() throws Exception {
+    locator = cluster.startLocatorVM(0);
+    locatorPort = locator.getPort();
+    dataStore1 = cluster.startServerVM(1, getProperties(), locatorPort);
+    dataStore2 = cluster.startServerVM(2, getProperties(), locatorPort);
+    dataStore3 = cluster.startServerVM(3, getProperties(), locatorPort);
+    accessor = cluster.startServerVM(4, getProperties(), locatorPort);
+
+    client1 = cluster.startClientVM(5,
+        c -> 
c.withPoolSubscription(true).withLocatorConnection((locatorPort)));
+    client2 = cluster.startClientVM(6,
+        c -> 
c.withPoolSubscription(true).withLocatorConnection((locatorPort)));
+
+    dataStore1.invoke(this::initDataStore);
+    dataStore2.invoke(this::initDataStore);
+    dataStore3.invoke(this::initDataStore);
+    accessor.invoke(this::initAccessor);
+
+    getBlackboard().initBlackboard();
+  }
+
+  protected RegionShortcut getRegionShortCut() {
+    return RegionShortcut.PARTITION_REDUNDANT;
+  }
+
+  protected Properties getProperties() {
+    Properties properties = new Properties();
+    // properties.setProperty("log-level", "info");
+    return properties;
+  }
+
+  private Region getRegion(boolean isClient) {
+    if (isClient) {
+      return getClientCache().getRegion(REGION_NAME);
+    } else {
+      return getCache().getRegion(REGION_NAME);
+    }
+  }
+
+  private void verifyRegionSize(boolean isClient, int expectedNum) {
+    GeodeAwaitility.await().atMost(10, SECONDS)
+        .untilAsserted(() -> 
assertThat(getRegion(isClient).size()).isEqualTo(expectedNum));
+  }
+
+  private void initClientCache() {
+    Region region = 
getClientCache().createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY)
+        .create(REGION_NAME);
+    region.registerInterestForAllKeys(InterestResultPolicy.KEYS);
+  }
+
+  private void stopServers() {
+    List<CacheServer> cacheServers = getCache().getCacheServers();
+    for (CacheServer server : cacheServers) {
+      server.stop();
+    }
+  }
+
+  private void initDataStore() {
+    getCache().createRegionFactory(getRegionShortCut())
+        .setPartitionAttributes(new 
PartitionAttributesFactory().setTotalNumBuckets(10).create())
+        .addCacheListener(new CountingCacheListener())
+        .create(REGION_NAME);
+  }
+
+  private void initAccessor() {
+    RegionShortcut shortcut = getRegionShortCut();
+    if (shortcut.isPersistent()) {
+      if (shortcut == RegionShortcut.PARTITION_PERSISTENT) {
+        shortcut = RegionShortcut.PARTITION;
+      } else if (shortcut == RegionShortcut.PARTITION_PERSISTENT_OVERFLOW) {
+        shortcut = RegionShortcut.PARTITION_OVERFLOW;
+      } else if (shortcut == RegionShortcut.PARTITION_REDUNDANT_PERSISTENT) {
+        shortcut = RegionShortcut.PARTITION_REDUNDANT;
+      } else if (shortcut == 
RegionShortcut.PARTITION_REDUNDANT_PERSISTENT_OVERFLOW) {
+        shortcut = RegionShortcut.PARTITION_REDUNDANT_OVERFLOW;
+      } else {
+        fail("Wrong region type:" + shortcut);
+      }
+    }
+    getCache().createRegionFactory(shortcut)
+        .setPartitionAttributes(
+            new 
PartitionAttributesFactory().setTotalNumBuckets(10).setLocalMaxMemory(0).create())
+        .addCacheListener(new CountingCacheListener())
+        .create(REGION_NAME);
+  }
+
+  private void feed(boolean isClient) {
+    Region region = getRegion(isClient);
+    IntStream.range(0, NUM_ENTRIES).forEach(i -> region.put(i, "value" + i));
+  }
+
+  private void verifyServerRegionSize(int expectedNum) {
+    accessor.invoke(() -> verifyRegionSize(false, expectedNum));
+    dataStore1.invoke(() -> verifyRegionSize(false, expectedNum));
+    dataStore2.invoke(() -> verifyRegionSize(false, expectedNum));
+    dataStore3.invoke(() -> verifyRegionSize(false, expectedNum));
+  }
+
+  private void verifyClientRegionSize(int expectedNum) {
+    client1.invoke(() -> verifyRegionSize(true, expectedNum));
+    client2.invoke(() -> verifyRegionSize(true, expectedNum));
+  }
+
+  private void verifyCacheListenerTriggerCount(MemberVM serverVM) {
+    SerializableCallableIF<Integer> getListenerTriggerCount = () -> {
+      CountingCacheListener countingCacheListener =
+          (CountingCacheListener) getRegion(false).getAttributes()
+              .getCacheListeners()[0];
+      return countingCacheListener.getClears();
+    };
+
+    int count = accessor.invoke(getListenerTriggerCount)
+        + dataStore1.invoke(getListenerTriggerCount)
+        + dataStore2.invoke(getListenerTriggerCount)
+        + dataStore3.invoke(getListenerTriggerCount);
+    assertThat(count).isEqualTo(4);
+
+    if (serverVM != null) {
+      assertThat(serverVM.invoke(getListenerTriggerCount)).isEqualTo(1);
+    }
+  }
+
+  @Test
+  public void normalClearFromDataStore() {
+    accessor.invoke(() -> feed(false));
+    verifyServerRegionSize(NUM_ENTRIES);
+
+    dataStore1.invoke(() -> getRegion(false).clear());
+
+    verifyServerRegionSize(0);
+    verifyCacheListenerTriggerCount(dataStore1);
+  }
+
+  @Test
+  public void normalClearFromAccessor() {
+    accessor.invoke(() -> feed(false));
+    verifyServerRegionSize(NUM_ENTRIES);
+    accessor.invoke(() -> getRegion(false).clear());
+    verifyServerRegionSize(0);
+    verifyCacheListenerTriggerCount(accessor);
+  }
+
+  @Test
+  public void normalClearFromClient() {
+    client1.invoke(this::initClientCache);
+    client2.invoke(this::initClientCache);
+
+    client1.invoke(() -> feed(true));
+    verifyClientRegionSize(NUM_ENTRIES);
+    verifyServerRegionSize(NUM_ENTRIES);
+
+    client1.invoke(() -> getRegion(true).clear());
+
+    verifyServerRegionSize(0);
+    verifyClientRegionSize(0);
+    verifyCacheListenerTriggerCount(null);
+  }
+
+  @Test
+  public void clearFromClientWithAccessorAsServer() {
+    dataStore1.invoke(this::stopServers);
+    dataStore2.invoke(this::stopServers);
+    dataStore3.invoke(this::stopServers);
+
+    client1.invoke(this::initClientCache);
+    client2.invoke(this::initClientCache);
+
+    client1.invoke(() -> feed(true));
+    verifyClientRegionSize(NUM_ENTRIES);
+    verifyServerRegionSize(NUM_ENTRIES);
+
+    client1.invoke(() -> getRegion(true).clear());
+
+    verifyServerRegionSize(0);
+    verifyClientRegionSize(0);
+    verifyCacheListenerTriggerCount(null);
+  }
+
+  @Test
+  public void normalClearFromDataStoreWithClientInterest() {
+    client1.invoke(this::initClientCache);
+    client2.invoke(this::initClientCache);
+
+    accessor.invoke(() -> feed(false));
+    verifyServerRegionSize(NUM_ENTRIES);
+
+    dataStore1.invoke(() -> getRegion(false).clear());
+
+    verifyServerRegionSize(0);
+    verifyCacheListenerTriggerCount(dataStore1);
+  }
+
+  @Test(expected = AssertionError.class)

Review comment:
       Can we use `assertThatThrownBy` instead?.

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/ClearPartitionedRegion.java
##########
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.CancelException;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.MembershipListener;
+import org.apache.geode.distributed.internal.ReplyException;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.versions.RegionVersionVector;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+
+public class ClearPartitionedRegion {
+
+  private static final Logger logger = LogService.getLogger();
+
+  private final PartitionedRegion partitionedRegion;
+
+  private LockForListenerAndClientNotification 
lockForListenerAndClientNotification =
+      new LockForListenerAndClientNotification();
+
+  private volatile boolean membershipchange = false;
+
+  public ClearPartitionedRegion(PartitionedRegion partitionedRegion) {
+    this.partitionedRegion = partitionedRegion;
+    partitionedRegion.getDistributionManager()
+        .addMembershipListener(new ClearPartitionedRegionListener());
+  }
+
+  public boolean isLockedForListenerAndClientNotification() {
+    return lockForListenerAndClientNotification.isLocked();
+  }
+
+  void acquireDistributedClearLock(String clearLock) {
+    try {
+      partitionedRegion.getPartitionedRegionLockService().lock(clearLock, -1, 
-1);
+    } catch (IllegalStateException e) {
+      partitionedRegion.lockCheckReadiness();
+      throw e;
+    }
+  }
+
+  void releaseDistributedClearLock(String clearLock) {
+    try {
+      partitionedRegion.getPartitionedRegionLockService().unlock(clearLock);
+    } catch (IllegalStateException e) {
+      partitionedRegion.lockCheckReadiness();
+    } catch (Exception ex) {
+      logger.warn("Caught exception while unlocking clear distributed lock", 
ex.getMessage());
+    }
+  }
+
+  void obtainLockForClear(RegionEventImpl event) {
+    sendClearRegionMessage(event,
+        ClearPartitionedRegionMessage.OperationType.OP_LOCK_FOR_PR_CLEAR);
+    obtainClearLockLocal(partitionedRegion.getDistributionManager().getId());
+  }
+
+  void releaseLockForClear(RegionEventImpl event) {
+    sendClearRegionMessage(event,
+        ClearPartitionedRegionMessage.OperationType.OP_UNLOCK_FOR_PR_CLEAR);
+    releaseClearLockLocal();
+  }
+
+  void clearRegion(RegionEventImpl regionEvent, boolean cacheWrite,
+      RegionVersionVector vector) {
+    clearRegionLocal(regionEvent, cacheWrite, null);
+    sendClearRegionMessage(regionEvent,
+        ClearPartitionedRegionMessage.OperationType.OP_PR_CLEAR);
+  }
+
+  private void waitForPrimary() {
+    boolean retry;
+    int retryTime = 2 * 60 * 1000 /* partitionedRegion.getRetryTimeout() */;
+    PartitionedRegion.RetryTimeKeeper retryTimer = new 
PartitionedRegion.RetryTimeKeeper(retryTime);
+    do {
+      retry = false;
+      for (BucketRegion bucketRegion : partitionedRegion.getDataStore()
+          .getAllLocalBucketRegions()) {
+        if (!bucketRegion.getBucketAdvisor().hasPrimary()) {
+          if (retryTimer.overMaximum()) {
+            PRHARedundancyProvider.timedOut(partitionedRegion, null, null,
+                "do clear. Missing primary bucket",
+                retryTime);
+          }
+          retryTimer.waitForBucketsRecovery();
+          retry = true;
+        }
+      }
+    } while (retry);
+  }
+
+  public void clearRegionLocal(RegionEventImpl regionEvent, boolean cacheWrite,
+      RegionVersionVector vector) {
+    // Synchronized to handle the requester departure.
+    synchronized (lockForListenerAndClientNotification) {
+      if (partitionedRegion.getDataStore() != null) {
+        partitionedRegion.getDataStore().lockBucketCreationForRegionClear();
+        try {
+          boolean retry;
+          do {
+            retry = false;
+            for (BucketRegion localPrimaryBucketRegion : 
partitionedRegion.getDataStore()
+                .getAllLocalPrimaryBucketRegions()) {
+              if (localPrimaryBucketRegion.size() > 0) {
+                localPrimaryBucketRegion.clear();
+              }
+            }
+            if (membershipchange) {
+              membershipchange = false;
+              retry = true;
+              waitForPrimary();
+            }
+          } while (retry);
+
+          doAfterClear(regionEvent);
+        } finally {
+          
partitionedRegion.getDataStore().unLockBucketCreationForRegionClear();
+        }
+      } else {
+        // Non data-store with client queue and listener
+        doAfterClear(regionEvent);
+      }
+    }
+  }
+
+  private void doAfterClear(RegionEventImpl regionEvent) {
+    if (partitionedRegion.hasAnyClientsInterested()) {
+      notifyClients(regionEvent);
+    }
+
+    if (partitionedRegion.hasListener()) {
+      
partitionedRegion.dispatchListenerEvent(EnumListenerEvent.AFTER_REGION_CLEAR, 
regionEvent);
+    }
+  }
+
+  void notifyClients(RegionEventImpl event) {
+    // Set client routing information into the event
+    // The clear operation in case of PR is distributed differently
+    // hence the FilterRoutingInfo is set here instead of
+    // DistributedCacheOperation.distribute().
+    event.setEventType(EnumListenerEvent.AFTER_REGION_CLEAR);
+    if (!partitionedRegion.isUsedForMetaRegion() && !partitionedRegion
+        .isUsedForPartitionedRegionAdmin()
+        && !partitionedRegion.isUsedForPartitionedRegionBucket() && 
!partitionedRegion
+            .isUsedForParallelGatewaySenderQueue()) {
+
+      FilterRoutingInfo localCqFrInfo =
+          partitionedRegion.getFilterProfile().getFilterRoutingInfoPart1(event,
+              FilterProfile.NO_PROFILES, Collections.emptySet());
+
+      FilterRoutingInfo localCqInterestFrInfo =
+          
partitionedRegion.getFilterProfile().getFilterRoutingInfoPart2(localCqFrInfo, 
event);
+
+      if (localCqInterestFrInfo != null) {
+        event.setLocalFilterInfo(localCqInterestFrInfo.getLocalFilterInfo());
+      }
+    }
+    partitionedRegion.notifyBridgeClients(event);
+  }
+
+  protected void obtainClearLockLocal(InternalDistributedMember requester) {
+    synchronized (lockForListenerAndClientNotification) {
+      // Check if the member is still part of the distributed system
+      if 
(!partitionedRegion.getDistributionManager().isCurrentMember(requester)) {
+        return;
+      }
+
+      lockForListenerAndClientNotification.setLocked(requester);
+      if (partitionedRegion.getDataStore() != null) {
+        for (BucketRegion localPrimaryBucketRegion : 
partitionedRegion.getDataStore()
+            .getAllLocalPrimaryBucketRegions()) {
+          try {
+            
localPrimaryBucketRegion.lockLocallyForClear(partitionedRegion.getDistributionManager(),
+                partitionedRegion.getMyId(), null);
+          } catch (Exception ex) {
+            partitionedRegion.checkClosed();
+          }
+        }
+      }
+    }
+  }
+
+  protected void releaseClearLockLocal() {
+    synchronized (lockForListenerAndClientNotification) {
+      if (lockForListenerAndClientNotification.getLockRequester() == null) {
+        // The member has been left.
+        return;
+      }
+      try {
+        if (partitionedRegion.getDataStore() != null) {
+
+          for (BucketRegion localPrimaryBucketRegion : 
partitionedRegion.getDataStore()
+              .getAllLocalPrimaryBucketRegions()) {
+            try {
+              localPrimaryBucketRegion.releaseLockLocallyForClear(null);
+            } catch (Exception ex) {
+              logger.debug(
+                  "Unable to acquire clear lock for bucket region " + 
localPrimaryBucketRegion
+                      .getName(),
+                  ex.getMessage());
+              partitionedRegion.checkClosed();
+            }
+          }
+        }
+      } finally {
+        lockForListenerAndClientNotification.setUnLocked();
+      }
+    }
+  }
+
+  private void sendClearRegionMessage(RegionEventImpl event,
+      ClearPartitionedRegionMessage.OperationType op) {
+    RegionEventImpl eventForLocalClear = (RegionEventImpl) event.clone();
+    eventForLocalClear.setOperation(Operation.REGION_LOCAL_CLEAR);
+
+    boolean retry = true;
+    while (retry) {
+      retry = attemptToSendClearRegionMessage(event, op);
+    }
+  }
+
+  private boolean attemptToSendClearRegionMessage(RegionEventImpl event,
+      ClearPartitionedRegionMessage.OperationType op) {
+    if (partitionedRegion.getPRRoot() == null) {
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            "Partition region {} failed to initialize. Remove its profile from 
remote members.",
+            this);
+      }
+      new UpdateAttributesProcessor(partitionedRegion, true).distribute(false);
+      return false;
+    }
+    final HashSet configRecipients =
+        new HashSet(partitionedRegion.getRegionAdvisor().adviseAllPRNodes());
+
+    // It's possible this instance has not been initialized
+    // or hasn't gotten through initialize() far enough to have
+    // sent a CreateRegionProcessor message, bug 36048
+    try {
+      final PartitionRegionConfig prConfig =
+          
partitionedRegion.getPRRoot().get(partitionedRegion.getRegionIdentifier());
+
+      if (prConfig != null) {
+        // Fix for bug#34621 by Tushar
+        Iterator itr = prConfig.getNodes().iterator();
+        while (itr.hasNext()) {
+          InternalDistributedMember idm = ((Node) itr.next()).getMemberId();
+          if (!idm.equals(partitionedRegion.getMyId())) {
+            configRecipients.add(idm);
+          }
+        }
+      }
+    } catch (CancelException ignore) {
+      // ignore
+    }
+
+    try {
+      ClearPartitionedRegionMessage.ClearPartitionedRegionResponse resp =
+          new ClearPartitionedRegionMessage.ClearPartitionedRegionResponse(
+              partitionedRegion.getSystem(),
+              configRecipients);
+      ClearPartitionedRegionMessage clearPartitionedRegionMessage =
+          new ClearPartitionedRegionMessage(configRecipients, 
partitionedRegion, resp, op, event);
+      clearPartitionedRegionMessage.send();
+
+      resp.waitForRepliesUninterruptibly();
+
+    } catch (ReplyException e) {
+      if (e.getRootCause() instanceof ForceReattemptException) {
+        return true;
+      }
+      logger.warn(
+          "PartitionedRegion#sendClearRegionMessage: Caught exception during 
ClearRegionMessage send and waiting for response",
+          e);
+    }
+    return false;
+  }
+
+  void doClear(RegionEventImpl regionEvent, boolean cacheWrite,
+      PartitionedRegion partitionedRegion) {
+    String lockName = "_clearOperation" + partitionedRegion.getDisplayName();
+
+    try {
+      // distributed lock to make sure only one clear op is in progress in the 
cluster.
+      acquireDistributedClearLock(lockName);
+
+      // Force all primary buckets to be created before clear.
+      PartitionRegionHelper.assignBucketsToPartitions(partitionedRegion);
+
+      // do cacheWrite
+      partitionedRegion.cacheWriteBeforeRegionClear(regionEvent);
+
+      // Check if there are any listeners or clients interested. If so, then 
clear write
+      // locks needs to be taken on all local and remote primary buckets in 
order to
+      // preserve the ordering of client events (for concurrent operations on 
the region).
+      boolean acquireClearLockForClientNotification =
+          (partitionedRegion.hasAnyClientsInterested() && 
partitionedRegion.hasListener());
+      if (acquireClearLockForClientNotification) {
+        obtainLockForClear(regionEvent);
+      }
+      try {
+        clearRegion(regionEvent, cacheWrite, null);
+      } finally {
+        if (acquireClearLockForClientNotification) {
+          releaseLockForClear(regionEvent);
+        }
+      }
+
+    } finally {
+      releaseDistributedClearLock(lockName);
+    }
+  }
+
+  void handleClearFromDepartedMember(InternalDistributedMember departedMember) 
{
+    if 
(departedMember.equals(lockForListenerAndClientNotification.getLockRequester()))
 {
+      synchronized (lockForListenerAndClientNotification) {
+        if (lockForListenerAndClientNotification.getLockRequester() != null) {
+          releaseClearLockLocal();
+        }
+      }
+    }
+  }
+
+  class LockForListenerAndClientNotification {
+
+    private volatile boolean locked = false;
+
+    private volatile InternalDistributedMember lockRequester;

Review comment:
       I think we don't need to `volatile` keyword here, as the fields are 
`private` and all methods (both accessors and setters) are `synchronised` 
already.

##########
File path: 
geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionAfterClearNotificationDUnitTest.java
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.geode.internal.Assert.fail;
+import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getCache;
+import static 
org.apache.geode.test.dunit.rules.ClusterStartupRule.getClientCache;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.DistributionMessageObserver;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.DUnitBlackboard;
+import org.apache.geode.test.dunit.SerializableCallableIF;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+public class PartitionedRegionAfterClearNotificationDUnitTest implements 
Serializable {
+  protected static final String REGION_NAME = "testPR";
+  protected static final int NUM_ENTRIES = 100;
+
+  protected int locatorPort;
+  protected MemberVM locator;
+  protected MemberVM dataStore1, dataStore2, dataStore3, accessor;
+  protected ClientVM client1, client2;
+
+  private static final Logger logger = LogManager.getLogger();
+
+  private static volatile DUnitBlackboard blackboard;
+
+  @Rule
+  public ClusterStartupRule cluster = new ClusterStartupRule(7);
+
+  @Before
+  public void setUp() throws Exception {
+    locator = cluster.startLocatorVM(0);
+    locatorPort = locator.getPort();
+    dataStore1 = cluster.startServerVM(1, getProperties(), locatorPort);
+    dataStore2 = cluster.startServerVM(2, getProperties(), locatorPort);
+    dataStore3 = cluster.startServerVM(3, getProperties(), locatorPort);
+    accessor = cluster.startServerVM(4, getProperties(), locatorPort);
+
+    client1 = cluster.startClientVM(5,
+        c -> 
c.withPoolSubscription(true).withLocatorConnection((locatorPort)));
+    client2 = cluster.startClientVM(6,
+        c -> 
c.withPoolSubscription(true).withLocatorConnection((locatorPort)));
+
+    dataStore1.invoke(this::initDataStore);
+    dataStore2.invoke(this::initDataStore);
+    dataStore3.invoke(this::initDataStore);
+    accessor.invoke(this::initAccessor);
+
+    getBlackboard().initBlackboard();
+  }
+
+  protected RegionShortcut getRegionShortCut() {
+    return RegionShortcut.PARTITION_REDUNDANT;
+  }
+
+  protected Properties getProperties() {
+    Properties properties = new Properties();
+    // properties.setProperty("log-level", "info");
+    return properties;
+  }
+
+  private Region getRegion(boolean isClient) {
+    if (isClient) {
+      return getClientCache().getRegion(REGION_NAME);
+    } else {
+      return getCache().getRegion(REGION_NAME);
+    }
+  }
+
+  private void verifyRegionSize(boolean isClient, int expectedNum) {
+    GeodeAwaitility.await().atMost(10, SECONDS)
+        .untilAsserted(() -> 
assertThat(getRegion(isClient).size()).isEqualTo(expectedNum));
+  }
+
+  private void initClientCache() {
+    Region region = 
getClientCache().createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY)
+        .create(REGION_NAME);
+    region.registerInterestForAllKeys(InterestResultPolicy.KEYS);
+  }
+
+  private void stopServers() {
+    List<CacheServer> cacheServers = getCache().getCacheServers();
+    for (CacheServer server : cacheServers) {
+      server.stop();
+    }
+  }
+
+  private void initDataStore() {
+    getCache().createRegionFactory(getRegionShortCut())
+        .setPartitionAttributes(new 
PartitionAttributesFactory().setTotalNumBuckets(10).create())
+        .addCacheListener(new CountingCacheListener())
+        .create(REGION_NAME);
+  }
+
+  private void initAccessor() {
+    RegionShortcut shortcut = getRegionShortCut();
+    if (shortcut.isPersistent()) {
+      if (shortcut == RegionShortcut.PARTITION_PERSISTENT) {
+        shortcut = RegionShortcut.PARTITION;
+      } else if (shortcut == RegionShortcut.PARTITION_PERSISTENT_OVERFLOW) {
+        shortcut = RegionShortcut.PARTITION_OVERFLOW;
+      } else if (shortcut == RegionShortcut.PARTITION_REDUNDANT_PERSISTENT) {
+        shortcut = RegionShortcut.PARTITION_REDUNDANT;
+      } else if (shortcut == 
RegionShortcut.PARTITION_REDUNDANT_PERSISTENT_OVERFLOW) {
+        shortcut = RegionShortcut.PARTITION_REDUNDANT_OVERFLOW;
+      } else {
+        fail("Wrong region type:" + shortcut);
+      }
+    }
+    getCache().createRegionFactory(shortcut)
+        .setPartitionAttributes(
+            new 
PartitionAttributesFactory().setTotalNumBuckets(10).setLocalMaxMemory(0).create())
+        .addCacheListener(new CountingCacheListener())
+        .create(REGION_NAME);
+  }
+
+  private void feed(boolean isClient) {
+    Region region = getRegion(isClient);
+    IntStream.range(0, NUM_ENTRIES).forEach(i -> region.put(i, "value" + i));
+  }
+
+  private void verifyServerRegionSize(int expectedNum) {
+    accessor.invoke(() -> verifyRegionSize(false, expectedNum));
+    dataStore1.invoke(() -> verifyRegionSize(false, expectedNum));
+    dataStore2.invoke(() -> verifyRegionSize(false, expectedNum));
+    dataStore3.invoke(() -> verifyRegionSize(false, expectedNum));
+  }
+
+  private void verifyClientRegionSize(int expectedNum) {
+    client1.invoke(() -> verifyRegionSize(true, expectedNum));
+    client2.invoke(() -> verifyRegionSize(true, expectedNum));
+  }
+
+  private void verifyCacheListenerTriggerCount(MemberVM serverVM) {
+    SerializableCallableIF<Integer> getListenerTriggerCount = () -> {
+      CountingCacheListener countingCacheListener =
+          (CountingCacheListener) getRegion(false).getAttributes()
+              .getCacheListeners()[0];
+      return countingCacheListener.getClears();
+    };
+
+    int count = accessor.invoke(getListenerTriggerCount)
+        + dataStore1.invoke(getListenerTriggerCount)
+        + dataStore2.invoke(getListenerTriggerCount)
+        + dataStore3.invoke(getListenerTriggerCount);
+    assertThat(count).isEqualTo(4);
+
+    if (serverVM != null) {
+      assertThat(serverVM.invoke(getListenerTriggerCount)).isEqualTo(1);
+    }
+  }
+
+  @Test
+  public void normalClearFromDataStore() {
+    accessor.invoke(() -> feed(false));
+    verifyServerRegionSize(NUM_ENTRIES);
+
+    dataStore1.invoke(() -> getRegion(false).clear());
+
+    verifyServerRegionSize(0);
+    verifyCacheListenerTriggerCount(dataStore1);
+  }
+
+  @Test
+  public void normalClearFromAccessor() {
+    accessor.invoke(() -> feed(false));
+    verifyServerRegionSize(NUM_ENTRIES);
+    accessor.invoke(() -> getRegion(false).clear());
+    verifyServerRegionSize(0);
+    verifyCacheListenerTriggerCount(accessor);
+  }
+
+  @Test
+  public void normalClearFromClient() {
+    client1.invoke(this::initClientCache);
+    client2.invoke(this::initClientCache);
+
+    client1.invoke(() -> feed(true));
+    verifyClientRegionSize(NUM_ENTRIES);
+    verifyServerRegionSize(NUM_ENTRIES);
+
+    client1.invoke(() -> getRegion(true).clear());
+
+    verifyServerRegionSize(0);
+    verifyClientRegionSize(0);
+    verifyCacheListenerTriggerCount(null);
+  }
+
+  @Test
+  public void clearFromClientWithAccessorAsServer() {
+    dataStore1.invoke(this::stopServers);
+    dataStore2.invoke(this::stopServers);
+    dataStore3.invoke(this::stopServers);
+
+    client1.invoke(this::initClientCache);
+    client2.invoke(this::initClientCache);
+
+    client1.invoke(() -> feed(true));
+    verifyClientRegionSize(NUM_ENTRIES);
+    verifyServerRegionSize(NUM_ENTRIES);
+
+    client1.invoke(() -> getRegion(true).clear());
+
+    verifyServerRegionSize(0);
+    verifyClientRegionSize(0);
+    verifyCacheListenerTriggerCount(null);
+  }
+
+  @Test
+  public void normalClearFromDataStoreWithClientInterest() {
+    client1.invoke(this::initClientCache);
+    client2.invoke(this::initClientCache);
+
+    accessor.invoke(() -> feed(false));
+    verifyServerRegionSize(NUM_ENTRIES);
+
+    dataStore1.invoke(() -> getRegion(false).clear());
+
+    verifyServerRegionSize(0);
+    verifyCacheListenerTriggerCount(dataStore1);
+  }
+
+  @Test(expected = AssertionError.class)
+  public void 
verifyTheLocksAreClearedWhenMemberDepartsAfterTakingClearLockOnRemoteMembers()
+      throws Exception {
+    client1.invoke(this::initClientCache);
+    client2.invoke(this::initClientCache);
+
+    accessor.invoke(() -> feed(false));
+    verifyServerRegionSize(NUM_ENTRIES);
+    dataStore2.invoke(() -> DistributionMessageObserver.setInstance(
+        testHookToKillMemberCallingClearBeforeMessageProcessed()));
+
+    AsyncInvocation ds1ClearAsync = dataStore1.invokeAsync(() -> 
getRegion(false).clear());
+
+    getBlackboard().waitForGate("CacheClose", 30, SECONDS);
+
+    dataStore1.invoke(() -> getCache().close());
+    getBlackboard().signalGate("CacheClosed");
+
+    // This should not be blocked.
+    dataStore2.invoke(() -> feed(false));
+    dataStore3.invoke(() -> feed(false));
+
+    dataStore2.invoke(() -> verifyRegionSize(false, NUM_ENTRIES));
+    dataStore3.invoke(() -> verifyRegionSize(false, NUM_ENTRIES));
+
+    ds1ClearAsync.await();
+  }
+
+  @Test
+  public void 
verifyTheLocksAreClearedWhenMemberDepartsAfterTakingClearLockOnRemoteMembersAfterMessageProcessed()
+      throws Exception {
+    client1.invoke(this::initClientCache);
+    client2.invoke(this::initClientCache);
+
+    accessor.invoke(() -> feed(false));
+    verifyServerRegionSize(NUM_ENTRIES);
+
+    dataStore2.invoke(() -> DistributionMessageObserver.setInstance(
+        testHookToKillMemberCallingClearAfterMessageProcessed()));
+
+    AsyncInvocation ds1ClearAsync = dataStore1.invokeAsync(() -> 
getRegion(false).clear());
+
+    getBlackboard().waitForGate("CacheClose", 30, SECONDS);
+
+    dataStore1.invoke(() -> getCache().close());
+    getBlackboard().signalGate("CacheClosed");
+
+    // This should not be blocked.
+    dataStore2.invoke(() -> feed(false));
+    dataStore3.invoke(() -> feed(false));
+
+    dataStore2.invoke(() -> verifyRegionSize(false, NUM_ENTRIES));
+    dataStore3.invoke(() -> verifyRegionSize(false, NUM_ENTRIES));
+
+    ds1ClearAsync.await();
+  }
+
+
+  private static class CountingCacheListener extends CacheListenerAdapter {
+    private final AtomicInteger clears = new AtomicInteger();
+
+    @Override
+    public void afterRegionClear(RegionEvent event) {
+      Region region = event.getRegion();

Review comment:
       Unused variable.

##########
File path: 
geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionAfterClearNotificationDUnitTest.java
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.geode.internal.Assert.fail;
+import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getCache;
+import static 
org.apache.geode.test.dunit.rules.ClusterStartupRule.getClientCache;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.DistributionMessageObserver;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.DUnitBlackboard;
+import org.apache.geode.test.dunit.SerializableCallableIF;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+public class PartitionedRegionAfterClearNotificationDUnitTest implements 
Serializable {
+  protected static final String REGION_NAME = "testPR";
+  protected static final int NUM_ENTRIES = 100;
+
+  protected int locatorPort;
+  protected MemberVM locator;
+  protected MemberVM dataStore1, dataStore2, dataStore3, accessor;
+  protected ClientVM client1, client2;
+
+  private static final Logger logger = LogManager.getLogger();
+
+  private static volatile DUnitBlackboard blackboard;
+
+  @Rule
+  public ClusterStartupRule cluster = new ClusterStartupRule(7);
+
+  @Before
+  public void setUp() throws Exception {
+    locator = cluster.startLocatorVM(0);
+    locatorPort = locator.getPort();
+    dataStore1 = cluster.startServerVM(1, getProperties(), locatorPort);
+    dataStore2 = cluster.startServerVM(2, getProperties(), locatorPort);
+    dataStore3 = cluster.startServerVM(3, getProperties(), locatorPort);
+    accessor = cluster.startServerVM(4, getProperties(), locatorPort);
+
+    client1 = cluster.startClientVM(5,
+        c -> 
c.withPoolSubscription(true).withLocatorConnection((locatorPort)));
+    client2 = cluster.startClientVM(6,
+        c -> 
c.withPoolSubscription(true).withLocatorConnection((locatorPort)));
+
+    dataStore1.invoke(this::initDataStore);
+    dataStore2.invoke(this::initDataStore);
+    dataStore3.invoke(this::initDataStore);
+    accessor.invoke(this::initAccessor);
+
+    getBlackboard().initBlackboard();
+  }
+
+  protected RegionShortcut getRegionShortCut() {
+    return RegionShortcut.PARTITION_REDUNDANT;
+  }
+
+  protected Properties getProperties() {
+    Properties properties = new Properties();
+    // properties.setProperty("log-level", "info");
+    return properties;
+  }
+
+  private Region getRegion(boolean isClient) {
+    if (isClient) {
+      return getClientCache().getRegion(REGION_NAME);
+    } else {
+      return getCache().getRegion(REGION_NAME);
+    }
+  }
+
+  private void verifyRegionSize(boolean isClient, int expectedNum) {
+    GeodeAwaitility.await().atMost(10, SECONDS)
+        .untilAsserted(() -> 
assertThat(getRegion(isClient).size()).isEqualTo(expectedNum));
+  }
+
+  private void initClientCache() {
+    Region region = 
getClientCache().createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY)
+        .create(REGION_NAME);
+    region.registerInterestForAllKeys(InterestResultPolicy.KEYS);
+  }
+
+  private void stopServers() {
+    List<CacheServer> cacheServers = getCache().getCacheServers();
+    for (CacheServer server : cacheServers) {
+      server.stop();
+    }
+  }
+
+  private void initDataStore() {
+    getCache().createRegionFactory(getRegionShortCut())
+        .setPartitionAttributes(new 
PartitionAttributesFactory().setTotalNumBuckets(10).create())
+        .addCacheListener(new CountingCacheListener())
+        .create(REGION_NAME);
+  }
+
+  private void initAccessor() {
+    RegionShortcut shortcut = getRegionShortCut();
+    if (shortcut.isPersistent()) {
+      if (shortcut == RegionShortcut.PARTITION_PERSISTENT) {
+        shortcut = RegionShortcut.PARTITION;
+      } else if (shortcut == RegionShortcut.PARTITION_PERSISTENT_OVERFLOW) {
+        shortcut = RegionShortcut.PARTITION_OVERFLOW;
+      } else if (shortcut == RegionShortcut.PARTITION_REDUNDANT_PERSISTENT) {
+        shortcut = RegionShortcut.PARTITION_REDUNDANT;
+      } else if (shortcut == 
RegionShortcut.PARTITION_REDUNDANT_PERSISTENT_OVERFLOW) {
+        shortcut = RegionShortcut.PARTITION_REDUNDANT_OVERFLOW;
+      } else {
+        fail("Wrong region type:" + shortcut);
+      }
+    }
+    getCache().createRegionFactory(shortcut)
+        .setPartitionAttributes(
+            new 
PartitionAttributesFactory().setTotalNumBuckets(10).setLocalMaxMemory(0).create())
+        .addCacheListener(new CountingCacheListener())
+        .create(REGION_NAME);
+  }
+
+  private void feed(boolean isClient) {
+    Region region = getRegion(isClient);
+    IntStream.range(0, NUM_ENTRIES).forEach(i -> region.put(i, "value" + i));
+  }
+
+  private void verifyServerRegionSize(int expectedNum) {
+    accessor.invoke(() -> verifyRegionSize(false, expectedNum));
+    dataStore1.invoke(() -> verifyRegionSize(false, expectedNum));
+    dataStore2.invoke(() -> verifyRegionSize(false, expectedNum));
+    dataStore3.invoke(() -> verifyRegionSize(false, expectedNum));
+  }
+
+  private void verifyClientRegionSize(int expectedNum) {
+    client1.invoke(() -> verifyRegionSize(true, expectedNum));
+    client2.invoke(() -> verifyRegionSize(true, expectedNum));
+  }
+
+  private void verifyCacheListenerTriggerCount(MemberVM serverVM) {
+    SerializableCallableIF<Integer> getListenerTriggerCount = () -> {
+      CountingCacheListener countingCacheListener =
+          (CountingCacheListener) getRegion(false).getAttributes()
+              .getCacheListeners()[0];
+      return countingCacheListener.getClears();
+    };
+
+    int count = accessor.invoke(getListenerTriggerCount)
+        + dataStore1.invoke(getListenerTriggerCount)
+        + dataStore2.invoke(getListenerTriggerCount)
+        + dataStore3.invoke(getListenerTriggerCount);
+    assertThat(count).isEqualTo(4);
+
+    if (serverVM != null) {
+      assertThat(serverVM.invoke(getListenerTriggerCount)).isEqualTo(1);
+    }
+  }
+
+  @Test
+  public void normalClearFromDataStore() {
+    accessor.invoke(() -> feed(false));
+    verifyServerRegionSize(NUM_ENTRIES);
+
+    dataStore1.invoke(() -> getRegion(false).clear());
+
+    verifyServerRegionSize(0);
+    verifyCacheListenerTriggerCount(dataStore1);
+  }
+
+  @Test
+  public void normalClearFromAccessor() {
+    accessor.invoke(() -> feed(false));
+    verifyServerRegionSize(NUM_ENTRIES);
+    accessor.invoke(() -> getRegion(false).clear());
+    verifyServerRegionSize(0);
+    verifyCacheListenerTriggerCount(accessor);
+  }
+
+  @Test
+  public void normalClearFromClient() {
+    client1.invoke(this::initClientCache);
+    client2.invoke(this::initClientCache);
+
+    client1.invoke(() -> feed(true));
+    verifyClientRegionSize(NUM_ENTRIES);
+    verifyServerRegionSize(NUM_ENTRIES);
+
+    client1.invoke(() -> getRegion(true).clear());
+
+    verifyServerRegionSize(0);
+    verifyClientRegionSize(0);
+    verifyCacheListenerTriggerCount(null);
+  }
+
+  @Test
+  public void clearFromClientWithAccessorAsServer() {
+    dataStore1.invoke(this::stopServers);
+    dataStore2.invoke(this::stopServers);
+    dataStore3.invoke(this::stopServers);
+
+    client1.invoke(this::initClientCache);
+    client2.invoke(this::initClientCache);
+
+    client1.invoke(() -> feed(true));
+    verifyClientRegionSize(NUM_ENTRIES);
+    verifyServerRegionSize(NUM_ENTRIES);
+
+    client1.invoke(() -> getRegion(true).clear());
+
+    verifyServerRegionSize(0);
+    verifyClientRegionSize(0);
+    verifyCacheListenerTriggerCount(null);
+  }
+
+  @Test
+  public void normalClearFromDataStoreWithClientInterest() {
+    client1.invoke(this::initClientCache);
+    client2.invoke(this::initClientCache);
+
+    accessor.invoke(() -> feed(false));
+    verifyServerRegionSize(NUM_ENTRIES);
+
+    dataStore1.invoke(() -> getRegion(false).clear());
+
+    verifyServerRegionSize(0);
+    verifyCacheListenerTriggerCount(dataStore1);
+  }
+
+  @Test(expected = AssertionError.class)
+  public void 
verifyTheLocksAreClearedWhenMemberDepartsAfterTakingClearLockOnRemoteMembers()
+      throws Exception {
+    client1.invoke(this::initClientCache);
+    client2.invoke(this::initClientCache);
+
+    accessor.invoke(() -> feed(false));
+    verifyServerRegionSize(NUM_ENTRIES);
+    dataStore2.invoke(() -> DistributionMessageObserver.setInstance(
+        testHookToKillMemberCallingClearBeforeMessageProcessed()));
+
+    AsyncInvocation ds1ClearAsync = dataStore1.invokeAsync(() -> 
getRegion(false).clear());
+
+    getBlackboard().waitForGate("CacheClose", 30, SECONDS);

Review comment:
       +1

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/ClearPartitionedRegion.java
##########
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.CancelException;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.MembershipListener;
+import org.apache.geode.distributed.internal.ReplyException;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.versions.RegionVersionVector;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+
+public class ClearPartitionedRegion {

Review comment:
       Can we change the name to something more "meaningful"?, maybe 
`PartitionedRegionClearer` or something that lets the reader know that the 
class holds the actual implementation for clearing a `Partitioned Region`?.

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/ClearPartitionedRegion.java
##########
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.CancelException;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.MembershipListener;
+import org.apache.geode.distributed.internal.ReplyException;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.versions.RegionVersionVector;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+
+public class ClearPartitionedRegion {
+
+  private static final Logger logger = LogService.getLogger();
+
+  private final PartitionedRegion partitionedRegion;
+
+  private LockForListenerAndClientNotification 
lockForListenerAndClientNotification =
+      new LockForListenerAndClientNotification();
+
+  private volatile boolean membershipchange = false;

Review comment:
       +1

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
##########
@@ -980,6 +980,14 @@ protected void lockBucketCreationAndVisit(BucketVisitor 
visitor) {
     }
   }
 
+  protected void lockBucketCreationForRegionClear() {
+    bucketCreationLock.writeLock().lock();
+  }
+
+  protected void unLockBucketCreationForRegionClear() {

Review comment:
       Should probably be `unlockBucketCreationForRegionClear`.

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java
##########
@@ -834,7 +834,7 @@ public InternalDistributedMember 
adviseFixedPrimaryPartitionDataStore(final int
     });
   }
 
-  Set adviseAllServersWithInterest() {
+  public Set adviseAllServersWithInterest() {

Review comment:
       No need to make it `public`, it's only invoked from 
`PRTombstoneMessage`, which belongs to the same package.

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/ClearPartitionedRegion.java
##########
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.CancelException;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.MembershipListener;
+import org.apache.geode.distributed.internal.ReplyException;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.versions.RegionVersionVector;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+
+public class ClearPartitionedRegion {
+
+  private static final Logger logger = LogService.getLogger();
+
+  private final PartitionedRegion partitionedRegion;
+
+  private LockForListenerAndClientNotification 
lockForListenerAndClientNotification =
+      new LockForListenerAndClientNotification();
+
+  private volatile boolean membershipchange = false;
+
+  public ClearPartitionedRegion(PartitionedRegion partitionedRegion) {
+    this.partitionedRegion = partitionedRegion;
+    partitionedRegion.getDistributionManager()
+        .addMembershipListener(new ClearPartitionedRegionListener());
+  }
+
+  public boolean isLockedForListenerAndClientNotification() {
+    return lockForListenerAndClientNotification.isLocked();
+  }
+
+  void acquireDistributedClearLock(String clearLock) {
+    try {
+      partitionedRegion.getPartitionedRegionLockService().lock(clearLock, -1, 
-1);
+    } catch (IllegalStateException e) {
+      partitionedRegion.lockCheckReadiness();
+      throw e;
+    }
+  }
+
+  void releaseDistributedClearLock(String clearLock) {
+    try {
+      partitionedRegion.getPartitionedRegionLockService().unlock(clearLock);
+    } catch (IllegalStateException e) {
+      partitionedRegion.lockCheckReadiness();
+    } catch (Exception ex) {
+      logger.warn("Caught exception while unlocking clear distributed lock", 
ex.getMessage());
+    }
+  }
+
+  void obtainLockForClear(RegionEventImpl event) {
+    sendClearRegionMessage(event,
+        ClearPartitionedRegionMessage.OperationType.OP_LOCK_FOR_PR_CLEAR);
+    obtainClearLockLocal(partitionedRegion.getDistributionManager().getId());
+  }
+
+  void releaseLockForClear(RegionEventImpl event) {
+    sendClearRegionMessage(event,
+        ClearPartitionedRegionMessage.OperationType.OP_UNLOCK_FOR_PR_CLEAR);
+    releaseClearLockLocal();
+  }
+
+  void clearRegion(RegionEventImpl regionEvent, boolean cacheWrite,
+      RegionVersionVector vector) {
+    clearRegionLocal(regionEvent, cacheWrite, null);
+    sendClearRegionMessage(regionEvent,
+        ClearPartitionedRegionMessage.OperationType.OP_PR_CLEAR);
+  }
+
+  private void waitForPrimary() {
+    boolean retry;
+    int retryTime = 2 * 60 * 1000 /* partitionedRegion.getRetryTimeout() */;
+    PartitionedRegion.RetryTimeKeeper retryTimer = new 
PartitionedRegion.RetryTimeKeeper(retryTime);
+    do {
+      retry = false;
+      for (BucketRegion bucketRegion : partitionedRegion.getDataStore()
+          .getAllLocalBucketRegions()) {
+        if (!bucketRegion.getBucketAdvisor().hasPrimary()) {
+          if (retryTimer.overMaximum()) {
+            PRHARedundancyProvider.timedOut(partitionedRegion, null, null,
+                "do clear. Missing primary bucket",
+                retryTime);
+          }
+          retryTimer.waitForBucketsRecovery();
+          retry = true;
+        }
+      }
+    } while (retry);
+  }
+
+  public void clearRegionLocal(RegionEventImpl regionEvent, boolean cacheWrite,
+      RegionVersionVector vector) {

Review comment:
       +1

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/ClearPartitionedRegion.java
##########
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.CancelException;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.MembershipListener;
+import org.apache.geode.distributed.internal.ReplyException;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.versions.RegionVersionVector;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+
+public class ClearPartitionedRegion {
+
+  private static final Logger logger = LogService.getLogger();
+
+  private final PartitionedRegion partitionedRegion;
+
+  private LockForListenerAndClientNotification 
lockForListenerAndClientNotification =
+      new LockForListenerAndClientNotification();
+
+  private volatile boolean membershipchange = false;
+
+  public ClearPartitionedRegion(PartitionedRegion partitionedRegion) {
+    this.partitionedRegion = partitionedRegion;
+    partitionedRegion.getDistributionManager()
+        .addMembershipListener(new ClearPartitionedRegionListener());
+  }
+
+  public boolean isLockedForListenerAndClientNotification() {
+    return lockForListenerAndClientNotification.isLocked();
+  }
+
+  void acquireDistributedClearLock(String clearLock) {
+    try {
+      partitionedRegion.getPartitionedRegionLockService().lock(clearLock, -1, 
-1);
+    } catch (IllegalStateException e) {
+      partitionedRegion.lockCheckReadiness();
+      throw e;
+    }
+  }
+
+  void releaseDistributedClearLock(String clearLock) {
+    try {
+      partitionedRegion.getPartitionedRegionLockService().unlock(clearLock);
+    } catch (IllegalStateException e) {
+      partitionedRegion.lockCheckReadiness();
+    } catch (Exception ex) {
+      logger.warn("Caught exception while unlocking clear distributed lock", 
ex.getMessage());

Review comment:
       I think you're missing a `{}` within the log message, otherwise the 
`ex.getMessage()` won't be shown.

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/ClearPartitionedRegion.java
##########
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.CancelException;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.MembershipListener;
+import org.apache.geode.distributed.internal.ReplyException;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.versions.RegionVersionVector;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+
+public class ClearPartitionedRegion {
+
+  private static final Logger logger = LogService.getLogger();
+
+  private final PartitionedRegion partitionedRegion;
+
+  private LockForListenerAndClientNotification 
lockForListenerAndClientNotification =
+      new LockForListenerAndClientNotification();
+
+  private volatile boolean membershipchange = false;
+
+  public ClearPartitionedRegion(PartitionedRegion partitionedRegion) {
+    this.partitionedRegion = partitionedRegion;
+    partitionedRegion.getDistributionManager()
+        .addMembershipListener(new ClearPartitionedRegionListener());
+  }
+
+  public boolean isLockedForListenerAndClientNotification() {
+    return lockForListenerAndClientNotification.isLocked();
+  }
+
+  void acquireDistributedClearLock(String clearLock) {
+    try {
+      partitionedRegion.getPartitionedRegionLockService().lock(clearLock, -1, 
-1);
+    } catch (IllegalStateException e) {
+      partitionedRegion.lockCheckReadiness();
+      throw e;
+    }
+  }
+
+  void releaseDistributedClearLock(String clearLock) {
+    try {
+      partitionedRegion.getPartitionedRegionLockService().unlock(clearLock);
+    } catch (IllegalStateException e) {
+      partitionedRegion.lockCheckReadiness();
+    } catch (Exception ex) {
+      logger.warn("Caught exception while unlocking clear distributed lock", 
ex.getMessage());
+    }
+  }
+
+  void obtainLockForClear(RegionEventImpl event) {
+    sendClearRegionMessage(event,
+        ClearPartitionedRegionMessage.OperationType.OP_LOCK_FOR_PR_CLEAR);
+    obtainClearLockLocal(partitionedRegion.getDistributionManager().getId());
+  }
+
+  void releaseLockForClear(RegionEventImpl event) {
+    sendClearRegionMessage(event,
+        ClearPartitionedRegionMessage.OperationType.OP_UNLOCK_FOR_PR_CLEAR);
+    releaseClearLockLocal();
+  }
+
+  void clearRegion(RegionEventImpl regionEvent, boolean cacheWrite,
+      RegionVersionVector vector) {
+    clearRegionLocal(regionEvent, cacheWrite, null);
+    sendClearRegionMessage(regionEvent,
+        ClearPartitionedRegionMessage.OperationType.OP_PR_CLEAR);
+  }
+
+  private void waitForPrimary() {
+    boolean retry;
+    int retryTime = 2 * 60 * 1000 /* partitionedRegion.getRetryTimeout() */;
+    PartitionedRegion.RetryTimeKeeper retryTimer = new 
PartitionedRegion.RetryTimeKeeper(retryTime);
+    do {
+      retry = false;
+      for (BucketRegion bucketRegion : partitionedRegion.getDataStore()
+          .getAllLocalBucketRegions()) {
+        if (!bucketRegion.getBucketAdvisor().hasPrimary()) {
+          if (retryTimer.overMaximum()) {
+            PRHARedundancyProvider.timedOut(partitionedRegion, null, null,
+                "do clear. Missing primary bucket",
+                retryTime);
+          }
+          retryTimer.waitForBucketsRecovery();
+          retry = true;
+        }
+      }
+    } while (retry);
+  }
+
+  public void clearRegionLocal(RegionEventImpl regionEvent, boolean cacheWrite,
+      RegionVersionVector vector) {
+    // Synchronized to handle the requester departure.
+    synchronized (lockForListenerAndClientNotification) {
+      if (partitionedRegion.getDataStore() != null) {
+        partitionedRegion.getDataStore().lockBucketCreationForRegionClear();
+        try {
+          boolean retry;
+          do {
+            retry = false;
+            for (BucketRegion localPrimaryBucketRegion : 
partitionedRegion.getDataStore()
+                .getAllLocalPrimaryBucketRegions()) {
+              if (localPrimaryBucketRegion.size() > 0) {
+                localPrimaryBucketRegion.clear();
+              }
+            }
+            if (membershipchange) {
+              membershipchange = false;
+              retry = true;
+              waitForPrimary();
+            }
+          } while (retry);
+
+          doAfterClear(regionEvent);
+        } finally {
+          
partitionedRegion.getDataStore().unLockBucketCreationForRegionClear();
+        }
+      } else {
+        // Non data-store with client queue and listener
+        doAfterClear(regionEvent);
+      }
+    }
+  }

Review comment:
       The method `doAfterClear(regionEvent)` is invoked no matter what (at 
leat when there's no exception thrown), so maybe moving it outside of the 
`if/else` block might make things more readable.

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/ClearPartitionedRegion.java
##########
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.CancelException;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.MembershipListener;
+import org.apache.geode.distributed.internal.ReplyException;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.versions.RegionVersionVector;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+
+public class ClearPartitionedRegion {
+
+  private static final Logger logger = LogService.getLogger();
+
+  private final PartitionedRegion partitionedRegion;
+
+  private LockForListenerAndClientNotification 
lockForListenerAndClientNotification =

Review comment:
       +1

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
##########
@@ -459,4 +459,7 @@ VersionedObjectList basicRemoveAll(Collection<Object> keys,
    * @return true if synchronization should be attempted
    */
   boolean shouldSyncForCrashedMember(InternalDistributedMember id);
+
+  void clearRegionLocal(RegionEventImpl regionEvent, boolean cacheWrite,

Review comment:
       Maybe it's just me, but the old method name , `clearRegionLocally`, 
seems to be easier to understand for the reader?.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Partitioned Region clear operations must invoke cache level listeners
> ---------------------------------------------------------------------
>
>                 Key: GEODE-7678
>                 URL: https://issues.apache.org/jira/browse/GEODE-7678
>             Project: Geode
>          Issue Type: Sub-task
>          Components: regions
>            Reporter: Nabarun Nag
>            Priority: Major
>              Labels: GeodeCommons
>
> Clear operations are successful and CacheListener.afterRegionClear(), 
> CacheWriter.beforeRegionClear() are invoked.
>  
> Acceptance :
>  * DUnit tests validating the above behavior.
>  * Test coverage to when a member departs in this scenario
>  * Test coverage to when a member restarts in this scenario
>  * Unit tests with complete code coverage for the newly written code.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to