[ https://issues.apache.org/jira/browse/GEODE-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17095355#comment-17095355 ]
ASF GitHub Bot commented on GEODE-7678: --------------------------------------- jujoramos commented on a change in pull request #4987: URL: https://github.com/apache/geode/pull/4987#discussion_r417216163 ########## File path: geode-core/src/distributedTest/java/org/apache/geode/cache/ReplicateCacheListenerDistributedTest.java ########## @@ -302,4 +349,64 @@ public void afterDestroy(final EntryEvent<String, Integer> event) { errorCollector.checkThat(event.getNewValue(), nullValue()); } } + + protected class ClearCountingCacheListener extends BaseCacheListener { + + @Override + public void afterCreate(final EntryEvent<String, Integer> event) { + sharedCountersRule.increment(CREATES); + } + + @Override + public void afterUpdate(final EntryEvent<String, Integer> event) { + sharedCountersRule.increment(UPDATES); + } + + @Override + public void afterRegionClear(RegionEvent<String, Integer> event) { + + sharedCountersRule.increment(CLEAR); + if (!event.getRegion().getAttributes().getDataPolicy().withPartitioning()) { + if (event.isOriginRemote()) { + errorCollector.checkThat(event.getDistributedMember(), + not(cacheRule.getSystem().getDistributedMember())); + } else { + errorCollector.checkThat(event.getDistributedMember(), + equalTo(cacheRule.getSystem().getDistributedMember())); + } + } + errorCollector.checkThat(event.getOperation(), equalTo(Operation.REGION_CLEAR)); + errorCollector.checkThat(event.getRegion().getName(), equalTo(regionName)); + } + } + + protected class RegionDestroyCountingCacheListener extends BaseCacheListener { + + @Override + public void afterCreate(final EntryEvent<String, Integer> event) { + sharedCountersRule.increment(CREATES); + } + + @Override + public void afterUpdate(final EntryEvent<String, Integer> event) { + sharedCountersRule.increment(UPDATES); + } + + @Override + public void afterRegionDestroy(final RegionEvent<String, Integer> event) { + sharedCountersRule.increment(REGION_DESTROY); + + if (!event.getRegion().getAttributes().getDataPolicy().withPartitioning()) { + if (event.isOriginRemote()) { + errorCollector.checkThat(event.getDistributedMember(), + not(cacheRule.getSystem().getDistributedMember())); + } else { + errorCollector.checkThat(event.getDistributedMember(), + equalTo(cacheRule.getSystem().getDistributedMember())); + } + } Review comment: This code is duplicated in the test class, maybe better to extract this to its own method?. ########## File path: geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionAfterClearNotificationDUnitTest.java ########## @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.geode.internal.Assert.fail; +import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getCache; +import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getClientCache; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.Serializable; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.geode.cache.InterestResultPolicy; +import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionEvent; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.server.CacheServer; +import org.apache.geode.cache.util.CacheListenerAdapter; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DistributionMessage; +import org.apache.geode.distributed.internal.DistributionMessageObserver; +import org.apache.geode.test.awaitility.GeodeAwaitility; +import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.dunit.DUnitBlackboard; +import org.apache.geode.test.dunit.SerializableCallableIF; +import org.apache.geode.test.dunit.rules.ClientVM; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; + +public class PartitionedRegionAfterClearNotificationDUnitTest implements Serializable { + protected static final String REGION_NAME = "testPR"; + protected static final int NUM_ENTRIES = 100; + + protected int locatorPort; + protected MemberVM locator; + protected MemberVM dataStore1, dataStore2, dataStore3, accessor; + protected ClientVM client1, client2; + + private static final Logger logger = LogManager.getLogger(); Review comment: You can delete this field as it's not used. ########## File path: geode-core/src/distributedTest/java/org/apache/geode/cache/PRCacheListenerDistributedTest.java ########## @@ -38,22 +45,57 @@ @SuppressWarnings("serial") public class PRCacheListenerDistributedTest extends ReplicateCacheListenerDistributedTest { - @Parameters(name = "{index}: redundancy={0}") - public static Iterable<Integer> data() { - return Arrays.asList(0, 3); + @Parameters + public static Collection<Object[]> data() { + return Arrays.asList(new Object[][] { + {1, Boolean.FALSE}, + {3, Boolean.TRUE}, + }); } @Parameter public int redundancy; + @Parameter(1) + public Boolean withData; + @Override protected Region<String, Integer> createRegion(final String name, final CacheListener<String, Integer> listener) { + LogService.getLogger() + .info("Params [Redundancy: " + redundancy + " withData:" + withData + "]"); PartitionAttributesFactory<String, Integer> paf = new PartitionAttributesFactory<>(); paf.setRedundantCopies(redundancy); RegionFactory<String, Integer> regionFactory = cacheRule.getCache().createRegionFactory(); - regionFactory.addCacheListener(listener); + if (listener != null) { + regionFactory.addCacheListener(listener); + } + regionFactory.setDataPolicy(DataPolicy.PARTITION); + regionFactory.setPartitionAttributes(paf.create()); + + return regionFactory.create(name); Review comment: This code is duplicated in the test class, maybe better to extract this to its own method?. ########## File path: geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionAfterClearNotificationDUnitTest.java ########## @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.geode.internal.Assert.fail; +import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getCache; +import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getClientCache; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.Serializable; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.geode.cache.InterestResultPolicy; +import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionEvent; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.server.CacheServer; +import org.apache.geode.cache.util.CacheListenerAdapter; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DistributionMessage; +import org.apache.geode.distributed.internal.DistributionMessageObserver; +import org.apache.geode.test.awaitility.GeodeAwaitility; +import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.dunit.DUnitBlackboard; +import org.apache.geode.test.dunit.SerializableCallableIF; +import org.apache.geode.test.dunit.rules.ClientVM; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; + +public class PartitionedRegionAfterClearNotificationDUnitTest implements Serializable { + protected static final String REGION_NAME = "testPR"; + protected static final int NUM_ENTRIES = 100; + + protected int locatorPort; + protected MemberVM locator; + protected MemberVM dataStore1, dataStore2, dataStore3, accessor; + protected ClientVM client1, client2; + + private static final Logger logger = LogManager.getLogger(); + + private static volatile DUnitBlackboard blackboard; + + @Rule + public ClusterStartupRule cluster = new ClusterStartupRule(7); + + @Before + public void setUp() throws Exception { + locator = cluster.startLocatorVM(0); + locatorPort = locator.getPort(); + dataStore1 = cluster.startServerVM(1, getProperties(), locatorPort); + dataStore2 = cluster.startServerVM(2, getProperties(), locatorPort); + dataStore3 = cluster.startServerVM(3, getProperties(), locatorPort); + accessor = cluster.startServerVM(4, getProperties(), locatorPort); + + client1 = cluster.startClientVM(5, + c -> c.withPoolSubscription(true).withLocatorConnection((locatorPort))); + client2 = cluster.startClientVM(6, + c -> c.withPoolSubscription(true).withLocatorConnection((locatorPort))); + + dataStore1.invoke(this::initDataStore); + dataStore2.invoke(this::initDataStore); + dataStore3.invoke(this::initDataStore); + accessor.invoke(this::initAccessor); + + getBlackboard().initBlackboard(); + } + + protected RegionShortcut getRegionShortCut() { + return RegionShortcut.PARTITION_REDUNDANT; + } + + protected Properties getProperties() { + Properties properties = new Properties(); + // properties.setProperty("log-level", "info"); + return properties; + } + + private Region getRegion(boolean isClient) { + if (isClient) { + return getClientCache().getRegion(REGION_NAME); + } else { + return getCache().getRegion(REGION_NAME); + } + } + + private void verifyRegionSize(boolean isClient, int expectedNum) { + GeodeAwaitility.await().atMost(10, SECONDS) Review comment: Any reason for overriding the default `GeodeAwaitility` timeout?, maybe we should leave the default to avoid flakiness (for those cases there's a resource contention while running the tests)?. ########## File path: geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionAfterClearNotificationDUnitTest.java ########## @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.geode.internal.Assert.fail; +import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getCache; +import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getClientCache; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.Serializable; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.geode.cache.InterestResultPolicy; +import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionEvent; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.server.CacheServer; +import org.apache.geode.cache.util.CacheListenerAdapter; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DistributionMessage; +import org.apache.geode.distributed.internal.DistributionMessageObserver; +import org.apache.geode.test.awaitility.GeodeAwaitility; +import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.dunit.DUnitBlackboard; +import org.apache.geode.test.dunit.SerializableCallableIF; +import org.apache.geode.test.dunit.rules.ClientVM; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; + +public class PartitionedRegionAfterClearNotificationDUnitTest implements Serializable { + protected static final String REGION_NAME = "testPR"; + protected static final int NUM_ENTRIES = 100; + + protected int locatorPort; + protected MemberVM locator; + protected MemberVM dataStore1, dataStore2, dataStore3, accessor; + protected ClientVM client1, client2; + + private static final Logger logger = LogManager.getLogger(); + + private static volatile DUnitBlackboard blackboard; + + @Rule + public ClusterStartupRule cluster = new ClusterStartupRule(7); + + @Before + public void setUp() throws Exception { + locator = cluster.startLocatorVM(0); + locatorPort = locator.getPort(); + dataStore1 = cluster.startServerVM(1, getProperties(), locatorPort); + dataStore2 = cluster.startServerVM(2, getProperties(), locatorPort); + dataStore3 = cluster.startServerVM(3, getProperties(), locatorPort); + accessor = cluster.startServerVM(4, getProperties(), locatorPort); + + client1 = cluster.startClientVM(5, + c -> c.withPoolSubscription(true).withLocatorConnection((locatorPort))); + client2 = cluster.startClientVM(6, + c -> c.withPoolSubscription(true).withLocatorConnection((locatorPort))); + + dataStore1.invoke(this::initDataStore); + dataStore2.invoke(this::initDataStore); + dataStore3.invoke(this::initDataStore); + accessor.invoke(this::initAccessor); + + getBlackboard().initBlackboard(); + } + + protected RegionShortcut getRegionShortCut() { + return RegionShortcut.PARTITION_REDUNDANT; + } + + protected Properties getProperties() { + Properties properties = new Properties(); + // properties.setProperty("log-level", "info"); + return properties; + } + + private Region getRegion(boolean isClient) { + if (isClient) { + return getClientCache().getRegion(REGION_NAME); + } else { + return getCache().getRegion(REGION_NAME); + } + } + + private void verifyRegionSize(boolean isClient, int expectedNum) { + GeodeAwaitility.await().atMost(10, SECONDS) + .untilAsserted(() -> assertThat(getRegion(isClient).size()).isEqualTo(expectedNum)); + } + + private void initClientCache() { + Region region = getClientCache().createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY) + .create(REGION_NAME); + region.registerInterestForAllKeys(InterestResultPolicy.KEYS); + } + + private void stopServers() { + List<CacheServer> cacheServers = getCache().getCacheServers(); + for (CacheServer server : cacheServers) { + server.stop(); + } + } + + private void initDataStore() { + getCache().createRegionFactory(getRegionShortCut()) + .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(10).create()) + .addCacheListener(new CountingCacheListener()) + .create(REGION_NAME); + } + + private void initAccessor() { + RegionShortcut shortcut = getRegionShortCut(); + if (shortcut.isPersistent()) { + if (shortcut == RegionShortcut.PARTITION_PERSISTENT) { + shortcut = RegionShortcut.PARTITION; + } else if (shortcut == RegionShortcut.PARTITION_PERSISTENT_OVERFLOW) { + shortcut = RegionShortcut.PARTITION_OVERFLOW; + } else if (shortcut == RegionShortcut.PARTITION_REDUNDANT_PERSISTENT) { + shortcut = RegionShortcut.PARTITION_REDUNDANT; + } else if (shortcut == RegionShortcut.PARTITION_REDUNDANT_PERSISTENT_OVERFLOW) { + shortcut = RegionShortcut.PARTITION_REDUNDANT_OVERFLOW; + } else { + fail("Wrong region type:" + shortcut); Review comment: Can we throw an exception here instead of using the `fail` method?. ########## File path: geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionAfterClearNotificationDUnitTest.java ########## @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.geode.internal.Assert.fail; +import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getCache; +import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getClientCache; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.Serializable; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.geode.cache.InterestResultPolicy; +import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionEvent; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.server.CacheServer; +import org.apache.geode.cache.util.CacheListenerAdapter; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DistributionMessage; +import org.apache.geode.distributed.internal.DistributionMessageObserver; +import org.apache.geode.test.awaitility.GeodeAwaitility; +import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.dunit.DUnitBlackboard; +import org.apache.geode.test.dunit.SerializableCallableIF; +import org.apache.geode.test.dunit.rules.ClientVM; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; + +public class PartitionedRegionAfterClearNotificationDUnitTest implements Serializable { + protected static final String REGION_NAME = "testPR"; + protected static final int NUM_ENTRIES = 100; + + protected int locatorPort; + protected MemberVM locator; + protected MemberVM dataStore1, dataStore2, dataStore3, accessor; + protected ClientVM client1, client2; + + private static final Logger logger = LogManager.getLogger(); + + private static volatile DUnitBlackboard blackboard; + + @Rule + public ClusterStartupRule cluster = new ClusterStartupRule(7); + + @Before + public void setUp() throws Exception { + locator = cluster.startLocatorVM(0); + locatorPort = locator.getPort(); + dataStore1 = cluster.startServerVM(1, getProperties(), locatorPort); + dataStore2 = cluster.startServerVM(2, getProperties(), locatorPort); + dataStore3 = cluster.startServerVM(3, getProperties(), locatorPort); + accessor = cluster.startServerVM(4, getProperties(), locatorPort); + + client1 = cluster.startClientVM(5, + c -> c.withPoolSubscription(true).withLocatorConnection((locatorPort))); + client2 = cluster.startClientVM(6, + c -> c.withPoolSubscription(true).withLocatorConnection((locatorPort))); + + dataStore1.invoke(this::initDataStore); + dataStore2.invoke(this::initDataStore); + dataStore3.invoke(this::initDataStore); + accessor.invoke(this::initAccessor); + + getBlackboard().initBlackboard(); + } + + protected RegionShortcut getRegionShortCut() { + return RegionShortcut.PARTITION_REDUNDANT; + } + + protected Properties getProperties() { + Properties properties = new Properties(); + // properties.setProperty("log-level", "info"); + return properties; + } + + private Region getRegion(boolean isClient) { + if (isClient) { + return getClientCache().getRegion(REGION_NAME); + } else { + return getCache().getRegion(REGION_NAME); + } + } + + private void verifyRegionSize(boolean isClient, int expectedNum) { + GeodeAwaitility.await().atMost(10, SECONDS) + .untilAsserted(() -> assertThat(getRegion(isClient).size()).isEqualTo(expectedNum)); + } + + private void initClientCache() { + Region region = getClientCache().createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY) + .create(REGION_NAME); + region.registerInterestForAllKeys(InterestResultPolicy.KEYS); + } + + private void stopServers() { + List<CacheServer> cacheServers = getCache().getCacheServers(); + for (CacheServer server : cacheServers) { + server.stop(); + } + } + + private void initDataStore() { + getCache().createRegionFactory(getRegionShortCut()) + .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(10).create()) + .addCacheListener(new CountingCacheListener()) + .create(REGION_NAME); + } + + private void initAccessor() { + RegionShortcut shortcut = getRegionShortCut(); + if (shortcut.isPersistent()) { + if (shortcut == RegionShortcut.PARTITION_PERSISTENT) { + shortcut = RegionShortcut.PARTITION; + } else if (shortcut == RegionShortcut.PARTITION_PERSISTENT_OVERFLOW) { + shortcut = RegionShortcut.PARTITION_OVERFLOW; + } else if (shortcut == RegionShortcut.PARTITION_REDUNDANT_PERSISTENT) { + shortcut = RegionShortcut.PARTITION_REDUNDANT; + } else if (shortcut == RegionShortcut.PARTITION_REDUNDANT_PERSISTENT_OVERFLOW) { + shortcut = RegionShortcut.PARTITION_REDUNDANT_OVERFLOW; + } else { + fail("Wrong region type:" + shortcut); + } + } + getCache().createRegionFactory(shortcut) + .setPartitionAttributes( + new PartitionAttributesFactory().setTotalNumBuckets(10).setLocalMaxMemory(0).create()) + .addCacheListener(new CountingCacheListener()) + .create(REGION_NAME); + } + + private void feed(boolean isClient) { + Region region = getRegion(isClient); + IntStream.range(0, NUM_ENTRIES).forEach(i -> region.put(i, "value" + i)); + } + + private void verifyServerRegionSize(int expectedNum) { + accessor.invoke(() -> verifyRegionSize(false, expectedNum)); + dataStore1.invoke(() -> verifyRegionSize(false, expectedNum)); + dataStore2.invoke(() -> verifyRegionSize(false, expectedNum)); + dataStore3.invoke(() -> verifyRegionSize(false, expectedNum)); + } + + private void verifyClientRegionSize(int expectedNum) { + client1.invoke(() -> verifyRegionSize(true, expectedNum)); + client2.invoke(() -> verifyRegionSize(true, expectedNum)); + } + + private void verifyCacheListenerTriggerCount(MemberVM serverVM) { + SerializableCallableIF<Integer> getListenerTriggerCount = () -> { + CountingCacheListener countingCacheListener = + (CountingCacheListener) getRegion(false).getAttributes() + .getCacheListeners()[0]; + return countingCacheListener.getClears(); + }; + + int count = accessor.invoke(getListenerTriggerCount) + + dataStore1.invoke(getListenerTriggerCount) + + dataStore2.invoke(getListenerTriggerCount) + + dataStore3.invoke(getListenerTriggerCount); + assertThat(count).isEqualTo(4); + + if (serverVM != null) { + assertThat(serverVM.invoke(getListenerTriggerCount)).isEqualTo(1); + } + } + + @Test + public void normalClearFromDataStore() { + accessor.invoke(() -> feed(false)); + verifyServerRegionSize(NUM_ENTRIES); + + dataStore1.invoke(() -> getRegion(false).clear()); + + verifyServerRegionSize(0); + verifyCacheListenerTriggerCount(dataStore1); + } + + @Test + public void normalClearFromAccessor() { + accessor.invoke(() -> feed(false)); + verifyServerRegionSize(NUM_ENTRIES); + accessor.invoke(() -> getRegion(false).clear()); + verifyServerRegionSize(0); + verifyCacheListenerTriggerCount(accessor); + } + + @Test + public void normalClearFromClient() { + client1.invoke(this::initClientCache); + client2.invoke(this::initClientCache); + + client1.invoke(() -> feed(true)); + verifyClientRegionSize(NUM_ENTRIES); + verifyServerRegionSize(NUM_ENTRIES); + + client1.invoke(() -> getRegion(true).clear()); + + verifyServerRegionSize(0); + verifyClientRegionSize(0); + verifyCacheListenerTriggerCount(null); + } + + @Test + public void clearFromClientWithAccessorAsServer() { + dataStore1.invoke(this::stopServers); + dataStore2.invoke(this::stopServers); + dataStore3.invoke(this::stopServers); + + client1.invoke(this::initClientCache); + client2.invoke(this::initClientCache); + + client1.invoke(() -> feed(true)); + verifyClientRegionSize(NUM_ENTRIES); + verifyServerRegionSize(NUM_ENTRIES); + + client1.invoke(() -> getRegion(true).clear()); + + verifyServerRegionSize(0); + verifyClientRegionSize(0); + verifyCacheListenerTriggerCount(null); + } + + @Test + public void normalClearFromDataStoreWithClientInterest() { + client1.invoke(this::initClientCache); + client2.invoke(this::initClientCache); + + accessor.invoke(() -> feed(false)); + verifyServerRegionSize(NUM_ENTRIES); + + dataStore1.invoke(() -> getRegion(false).clear()); + + verifyServerRegionSize(0); + verifyCacheListenerTriggerCount(dataStore1); + } + + @Test(expected = AssertionError.class) Review comment: Can we use `assertThatThrownBy` instead?. ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/ClearPartitionedRegion.java ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.CancelException; +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.partition.PartitionRegionHelper; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.MembershipListener; +import org.apache.geode.distributed.internal.ReplyException; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.versions.RegionVersionVector; +import org.apache.geode.logging.internal.log4j.api.LogService; + +public class ClearPartitionedRegion { + + private static final Logger logger = LogService.getLogger(); + + private final PartitionedRegion partitionedRegion; + + private LockForListenerAndClientNotification lockForListenerAndClientNotification = + new LockForListenerAndClientNotification(); + + private volatile boolean membershipchange = false; + + public ClearPartitionedRegion(PartitionedRegion partitionedRegion) { + this.partitionedRegion = partitionedRegion; + partitionedRegion.getDistributionManager() + .addMembershipListener(new ClearPartitionedRegionListener()); + } + + public boolean isLockedForListenerAndClientNotification() { + return lockForListenerAndClientNotification.isLocked(); + } + + void acquireDistributedClearLock(String clearLock) { + try { + partitionedRegion.getPartitionedRegionLockService().lock(clearLock, -1, -1); + } catch (IllegalStateException e) { + partitionedRegion.lockCheckReadiness(); + throw e; + } + } + + void releaseDistributedClearLock(String clearLock) { + try { + partitionedRegion.getPartitionedRegionLockService().unlock(clearLock); + } catch (IllegalStateException e) { + partitionedRegion.lockCheckReadiness(); + } catch (Exception ex) { + logger.warn("Caught exception while unlocking clear distributed lock", ex.getMessage()); + } + } + + void obtainLockForClear(RegionEventImpl event) { + sendClearRegionMessage(event, + ClearPartitionedRegionMessage.OperationType.OP_LOCK_FOR_PR_CLEAR); + obtainClearLockLocal(partitionedRegion.getDistributionManager().getId()); + } + + void releaseLockForClear(RegionEventImpl event) { + sendClearRegionMessage(event, + ClearPartitionedRegionMessage.OperationType.OP_UNLOCK_FOR_PR_CLEAR); + releaseClearLockLocal(); + } + + void clearRegion(RegionEventImpl regionEvent, boolean cacheWrite, + RegionVersionVector vector) { + clearRegionLocal(regionEvent, cacheWrite, null); + sendClearRegionMessage(regionEvent, + ClearPartitionedRegionMessage.OperationType.OP_PR_CLEAR); + } + + private void waitForPrimary() { + boolean retry; + int retryTime = 2 * 60 * 1000 /* partitionedRegion.getRetryTimeout() */; + PartitionedRegion.RetryTimeKeeper retryTimer = new PartitionedRegion.RetryTimeKeeper(retryTime); + do { + retry = false; + for (BucketRegion bucketRegion : partitionedRegion.getDataStore() + .getAllLocalBucketRegions()) { + if (!bucketRegion.getBucketAdvisor().hasPrimary()) { + if (retryTimer.overMaximum()) { + PRHARedundancyProvider.timedOut(partitionedRegion, null, null, + "do clear. Missing primary bucket", + retryTime); + } + retryTimer.waitForBucketsRecovery(); + retry = true; + } + } + } while (retry); + } + + public void clearRegionLocal(RegionEventImpl regionEvent, boolean cacheWrite, + RegionVersionVector vector) { + // Synchronized to handle the requester departure. + synchronized (lockForListenerAndClientNotification) { + if (partitionedRegion.getDataStore() != null) { + partitionedRegion.getDataStore().lockBucketCreationForRegionClear(); + try { + boolean retry; + do { + retry = false; + for (BucketRegion localPrimaryBucketRegion : partitionedRegion.getDataStore() + .getAllLocalPrimaryBucketRegions()) { + if (localPrimaryBucketRegion.size() > 0) { + localPrimaryBucketRegion.clear(); + } + } + if (membershipchange) { + membershipchange = false; + retry = true; + waitForPrimary(); + } + } while (retry); + + doAfterClear(regionEvent); + } finally { + partitionedRegion.getDataStore().unLockBucketCreationForRegionClear(); + } + } else { + // Non data-store with client queue and listener + doAfterClear(regionEvent); + } + } + } + + private void doAfterClear(RegionEventImpl regionEvent) { + if (partitionedRegion.hasAnyClientsInterested()) { + notifyClients(regionEvent); + } + + if (partitionedRegion.hasListener()) { + partitionedRegion.dispatchListenerEvent(EnumListenerEvent.AFTER_REGION_CLEAR, regionEvent); + } + } + + void notifyClients(RegionEventImpl event) { + // Set client routing information into the event + // The clear operation in case of PR is distributed differently + // hence the FilterRoutingInfo is set here instead of + // DistributedCacheOperation.distribute(). + event.setEventType(EnumListenerEvent.AFTER_REGION_CLEAR); + if (!partitionedRegion.isUsedForMetaRegion() && !partitionedRegion + .isUsedForPartitionedRegionAdmin() + && !partitionedRegion.isUsedForPartitionedRegionBucket() && !partitionedRegion + .isUsedForParallelGatewaySenderQueue()) { + + FilterRoutingInfo localCqFrInfo = + partitionedRegion.getFilterProfile().getFilterRoutingInfoPart1(event, + FilterProfile.NO_PROFILES, Collections.emptySet()); + + FilterRoutingInfo localCqInterestFrInfo = + partitionedRegion.getFilterProfile().getFilterRoutingInfoPart2(localCqFrInfo, event); + + if (localCqInterestFrInfo != null) { + event.setLocalFilterInfo(localCqInterestFrInfo.getLocalFilterInfo()); + } + } + partitionedRegion.notifyBridgeClients(event); + } + + protected void obtainClearLockLocal(InternalDistributedMember requester) { + synchronized (lockForListenerAndClientNotification) { + // Check if the member is still part of the distributed system + if (!partitionedRegion.getDistributionManager().isCurrentMember(requester)) { + return; + } + + lockForListenerAndClientNotification.setLocked(requester); + if (partitionedRegion.getDataStore() != null) { + for (BucketRegion localPrimaryBucketRegion : partitionedRegion.getDataStore() + .getAllLocalPrimaryBucketRegions()) { + try { + localPrimaryBucketRegion.lockLocallyForClear(partitionedRegion.getDistributionManager(), + partitionedRegion.getMyId(), null); + } catch (Exception ex) { + partitionedRegion.checkClosed(); + } + } + } + } + } + + protected void releaseClearLockLocal() { + synchronized (lockForListenerAndClientNotification) { + if (lockForListenerAndClientNotification.getLockRequester() == null) { + // The member has been left. + return; + } + try { + if (partitionedRegion.getDataStore() != null) { + + for (BucketRegion localPrimaryBucketRegion : partitionedRegion.getDataStore() + .getAllLocalPrimaryBucketRegions()) { + try { + localPrimaryBucketRegion.releaseLockLocallyForClear(null); + } catch (Exception ex) { + logger.debug( + "Unable to acquire clear lock for bucket region " + localPrimaryBucketRegion + .getName(), + ex.getMessage()); + partitionedRegion.checkClosed(); + } + } + } + } finally { + lockForListenerAndClientNotification.setUnLocked(); + } + } + } + + private void sendClearRegionMessage(RegionEventImpl event, + ClearPartitionedRegionMessage.OperationType op) { + RegionEventImpl eventForLocalClear = (RegionEventImpl) event.clone(); + eventForLocalClear.setOperation(Operation.REGION_LOCAL_CLEAR); + + boolean retry = true; + while (retry) { + retry = attemptToSendClearRegionMessage(event, op); + } + } + + private boolean attemptToSendClearRegionMessage(RegionEventImpl event, + ClearPartitionedRegionMessage.OperationType op) { + if (partitionedRegion.getPRRoot() == null) { + if (logger.isDebugEnabled()) { + logger.debug( + "Partition region {} failed to initialize. Remove its profile from remote members.", + this); + } + new UpdateAttributesProcessor(partitionedRegion, true).distribute(false); + return false; + } + final HashSet configRecipients = + new HashSet(partitionedRegion.getRegionAdvisor().adviseAllPRNodes()); + + // It's possible this instance has not been initialized + // or hasn't gotten through initialize() far enough to have + // sent a CreateRegionProcessor message, bug 36048 + try { + final PartitionRegionConfig prConfig = + partitionedRegion.getPRRoot().get(partitionedRegion.getRegionIdentifier()); + + if (prConfig != null) { + // Fix for bug#34621 by Tushar + Iterator itr = prConfig.getNodes().iterator(); + while (itr.hasNext()) { + InternalDistributedMember idm = ((Node) itr.next()).getMemberId(); + if (!idm.equals(partitionedRegion.getMyId())) { + configRecipients.add(idm); + } + } + } + } catch (CancelException ignore) { + // ignore + } + + try { + ClearPartitionedRegionMessage.ClearPartitionedRegionResponse resp = + new ClearPartitionedRegionMessage.ClearPartitionedRegionResponse( + partitionedRegion.getSystem(), + configRecipients); + ClearPartitionedRegionMessage clearPartitionedRegionMessage = + new ClearPartitionedRegionMessage(configRecipients, partitionedRegion, resp, op, event); + clearPartitionedRegionMessage.send(); + + resp.waitForRepliesUninterruptibly(); + + } catch (ReplyException e) { + if (e.getRootCause() instanceof ForceReattemptException) { + return true; + } + logger.warn( + "PartitionedRegion#sendClearRegionMessage: Caught exception during ClearRegionMessage send and waiting for response", + e); + } + return false; + } + + void doClear(RegionEventImpl regionEvent, boolean cacheWrite, + PartitionedRegion partitionedRegion) { + String lockName = "_clearOperation" + partitionedRegion.getDisplayName(); + + try { + // distributed lock to make sure only one clear op is in progress in the cluster. + acquireDistributedClearLock(lockName); + + // Force all primary buckets to be created before clear. + PartitionRegionHelper.assignBucketsToPartitions(partitionedRegion); + + // do cacheWrite + partitionedRegion.cacheWriteBeforeRegionClear(regionEvent); + + // Check if there are any listeners or clients interested. If so, then clear write + // locks needs to be taken on all local and remote primary buckets in order to + // preserve the ordering of client events (for concurrent operations on the region). + boolean acquireClearLockForClientNotification = + (partitionedRegion.hasAnyClientsInterested() && partitionedRegion.hasListener()); + if (acquireClearLockForClientNotification) { + obtainLockForClear(regionEvent); + } + try { + clearRegion(regionEvent, cacheWrite, null); + } finally { + if (acquireClearLockForClientNotification) { + releaseLockForClear(regionEvent); + } + } + + } finally { + releaseDistributedClearLock(lockName); + } + } + + void handleClearFromDepartedMember(InternalDistributedMember departedMember) { + if (departedMember.equals(lockForListenerAndClientNotification.getLockRequester())) { + synchronized (lockForListenerAndClientNotification) { + if (lockForListenerAndClientNotification.getLockRequester() != null) { + releaseClearLockLocal(); + } + } + } + } + + class LockForListenerAndClientNotification { + + private volatile boolean locked = false; + + private volatile InternalDistributedMember lockRequester; Review comment: I think we don't need to `volatile` keyword here, as the fields are `private` and all methods (both accessors and setters) are `synchronised` already. ########## File path: geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionAfterClearNotificationDUnitTest.java ########## @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.geode.internal.Assert.fail; +import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getCache; +import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getClientCache; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.Serializable; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.geode.cache.InterestResultPolicy; +import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionEvent; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.server.CacheServer; +import org.apache.geode.cache.util.CacheListenerAdapter; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DistributionMessage; +import org.apache.geode.distributed.internal.DistributionMessageObserver; +import org.apache.geode.test.awaitility.GeodeAwaitility; +import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.dunit.DUnitBlackboard; +import org.apache.geode.test.dunit.SerializableCallableIF; +import org.apache.geode.test.dunit.rules.ClientVM; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; + +public class PartitionedRegionAfterClearNotificationDUnitTest implements Serializable { + protected static final String REGION_NAME = "testPR"; + protected static final int NUM_ENTRIES = 100; + + protected int locatorPort; + protected MemberVM locator; + protected MemberVM dataStore1, dataStore2, dataStore3, accessor; + protected ClientVM client1, client2; + + private static final Logger logger = LogManager.getLogger(); + + private static volatile DUnitBlackboard blackboard; + + @Rule + public ClusterStartupRule cluster = new ClusterStartupRule(7); + + @Before + public void setUp() throws Exception { + locator = cluster.startLocatorVM(0); + locatorPort = locator.getPort(); + dataStore1 = cluster.startServerVM(1, getProperties(), locatorPort); + dataStore2 = cluster.startServerVM(2, getProperties(), locatorPort); + dataStore3 = cluster.startServerVM(3, getProperties(), locatorPort); + accessor = cluster.startServerVM(4, getProperties(), locatorPort); + + client1 = cluster.startClientVM(5, + c -> c.withPoolSubscription(true).withLocatorConnection((locatorPort))); + client2 = cluster.startClientVM(6, + c -> c.withPoolSubscription(true).withLocatorConnection((locatorPort))); + + dataStore1.invoke(this::initDataStore); + dataStore2.invoke(this::initDataStore); + dataStore3.invoke(this::initDataStore); + accessor.invoke(this::initAccessor); + + getBlackboard().initBlackboard(); + } + + protected RegionShortcut getRegionShortCut() { + return RegionShortcut.PARTITION_REDUNDANT; + } + + protected Properties getProperties() { + Properties properties = new Properties(); + // properties.setProperty("log-level", "info"); + return properties; + } + + private Region getRegion(boolean isClient) { + if (isClient) { + return getClientCache().getRegion(REGION_NAME); + } else { + return getCache().getRegion(REGION_NAME); + } + } + + private void verifyRegionSize(boolean isClient, int expectedNum) { + GeodeAwaitility.await().atMost(10, SECONDS) + .untilAsserted(() -> assertThat(getRegion(isClient).size()).isEqualTo(expectedNum)); + } + + private void initClientCache() { + Region region = getClientCache().createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY) + .create(REGION_NAME); + region.registerInterestForAllKeys(InterestResultPolicy.KEYS); + } + + private void stopServers() { + List<CacheServer> cacheServers = getCache().getCacheServers(); + for (CacheServer server : cacheServers) { + server.stop(); + } + } + + private void initDataStore() { + getCache().createRegionFactory(getRegionShortCut()) + .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(10).create()) + .addCacheListener(new CountingCacheListener()) + .create(REGION_NAME); + } + + private void initAccessor() { + RegionShortcut shortcut = getRegionShortCut(); + if (shortcut.isPersistent()) { + if (shortcut == RegionShortcut.PARTITION_PERSISTENT) { + shortcut = RegionShortcut.PARTITION; + } else if (shortcut == RegionShortcut.PARTITION_PERSISTENT_OVERFLOW) { + shortcut = RegionShortcut.PARTITION_OVERFLOW; + } else if (shortcut == RegionShortcut.PARTITION_REDUNDANT_PERSISTENT) { + shortcut = RegionShortcut.PARTITION_REDUNDANT; + } else if (shortcut == RegionShortcut.PARTITION_REDUNDANT_PERSISTENT_OVERFLOW) { + shortcut = RegionShortcut.PARTITION_REDUNDANT_OVERFLOW; + } else { + fail("Wrong region type:" + shortcut); + } + } + getCache().createRegionFactory(shortcut) + .setPartitionAttributes( + new PartitionAttributesFactory().setTotalNumBuckets(10).setLocalMaxMemory(0).create()) + .addCacheListener(new CountingCacheListener()) + .create(REGION_NAME); + } + + private void feed(boolean isClient) { + Region region = getRegion(isClient); + IntStream.range(0, NUM_ENTRIES).forEach(i -> region.put(i, "value" + i)); + } + + private void verifyServerRegionSize(int expectedNum) { + accessor.invoke(() -> verifyRegionSize(false, expectedNum)); + dataStore1.invoke(() -> verifyRegionSize(false, expectedNum)); + dataStore2.invoke(() -> verifyRegionSize(false, expectedNum)); + dataStore3.invoke(() -> verifyRegionSize(false, expectedNum)); + } + + private void verifyClientRegionSize(int expectedNum) { + client1.invoke(() -> verifyRegionSize(true, expectedNum)); + client2.invoke(() -> verifyRegionSize(true, expectedNum)); + } + + private void verifyCacheListenerTriggerCount(MemberVM serverVM) { + SerializableCallableIF<Integer> getListenerTriggerCount = () -> { + CountingCacheListener countingCacheListener = + (CountingCacheListener) getRegion(false).getAttributes() + .getCacheListeners()[0]; + return countingCacheListener.getClears(); + }; + + int count = accessor.invoke(getListenerTriggerCount) + + dataStore1.invoke(getListenerTriggerCount) + + dataStore2.invoke(getListenerTriggerCount) + + dataStore3.invoke(getListenerTriggerCount); + assertThat(count).isEqualTo(4); + + if (serverVM != null) { + assertThat(serverVM.invoke(getListenerTriggerCount)).isEqualTo(1); + } + } + + @Test + public void normalClearFromDataStore() { + accessor.invoke(() -> feed(false)); + verifyServerRegionSize(NUM_ENTRIES); + + dataStore1.invoke(() -> getRegion(false).clear()); + + verifyServerRegionSize(0); + verifyCacheListenerTriggerCount(dataStore1); + } + + @Test + public void normalClearFromAccessor() { + accessor.invoke(() -> feed(false)); + verifyServerRegionSize(NUM_ENTRIES); + accessor.invoke(() -> getRegion(false).clear()); + verifyServerRegionSize(0); + verifyCacheListenerTriggerCount(accessor); + } + + @Test + public void normalClearFromClient() { + client1.invoke(this::initClientCache); + client2.invoke(this::initClientCache); + + client1.invoke(() -> feed(true)); + verifyClientRegionSize(NUM_ENTRIES); + verifyServerRegionSize(NUM_ENTRIES); + + client1.invoke(() -> getRegion(true).clear()); + + verifyServerRegionSize(0); + verifyClientRegionSize(0); + verifyCacheListenerTriggerCount(null); + } + + @Test + public void clearFromClientWithAccessorAsServer() { + dataStore1.invoke(this::stopServers); + dataStore2.invoke(this::stopServers); + dataStore3.invoke(this::stopServers); + + client1.invoke(this::initClientCache); + client2.invoke(this::initClientCache); + + client1.invoke(() -> feed(true)); + verifyClientRegionSize(NUM_ENTRIES); + verifyServerRegionSize(NUM_ENTRIES); + + client1.invoke(() -> getRegion(true).clear()); + + verifyServerRegionSize(0); + verifyClientRegionSize(0); + verifyCacheListenerTriggerCount(null); + } + + @Test + public void normalClearFromDataStoreWithClientInterest() { + client1.invoke(this::initClientCache); + client2.invoke(this::initClientCache); + + accessor.invoke(() -> feed(false)); + verifyServerRegionSize(NUM_ENTRIES); + + dataStore1.invoke(() -> getRegion(false).clear()); + + verifyServerRegionSize(0); + verifyCacheListenerTriggerCount(dataStore1); + } + + @Test(expected = AssertionError.class) + public void verifyTheLocksAreClearedWhenMemberDepartsAfterTakingClearLockOnRemoteMembers() + throws Exception { + client1.invoke(this::initClientCache); + client2.invoke(this::initClientCache); + + accessor.invoke(() -> feed(false)); + verifyServerRegionSize(NUM_ENTRIES); + dataStore2.invoke(() -> DistributionMessageObserver.setInstance( + testHookToKillMemberCallingClearBeforeMessageProcessed())); + + AsyncInvocation ds1ClearAsync = dataStore1.invokeAsync(() -> getRegion(false).clear()); + + getBlackboard().waitForGate("CacheClose", 30, SECONDS); + + dataStore1.invoke(() -> getCache().close()); + getBlackboard().signalGate("CacheClosed"); + + // This should not be blocked. + dataStore2.invoke(() -> feed(false)); + dataStore3.invoke(() -> feed(false)); + + dataStore2.invoke(() -> verifyRegionSize(false, NUM_ENTRIES)); + dataStore3.invoke(() -> verifyRegionSize(false, NUM_ENTRIES)); + + ds1ClearAsync.await(); + } + + @Test + public void verifyTheLocksAreClearedWhenMemberDepartsAfterTakingClearLockOnRemoteMembersAfterMessageProcessed() + throws Exception { + client1.invoke(this::initClientCache); + client2.invoke(this::initClientCache); + + accessor.invoke(() -> feed(false)); + verifyServerRegionSize(NUM_ENTRIES); + + dataStore2.invoke(() -> DistributionMessageObserver.setInstance( + testHookToKillMemberCallingClearAfterMessageProcessed())); + + AsyncInvocation ds1ClearAsync = dataStore1.invokeAsync(() -> getRegion(false).clear()); + + getBlackboard().waitForGate("CacheClose", 30, SECONDS); + + dataStore1.invoke(() -> getCache().close()); + getBlackboard().signalGate("CacheClosed"); + + // This should not be blocked. + dataStore2.invoke(() -> feed(false)); + dataStore3.invoke(() -> feed(false)); + + dataStore2.invoke(() -> verifyRegionSize(false, NUM_ENTRIES)); + dataStore3.invoke(() -> verifyRegionSize(false, NUM_ENTRIES)); + + ds1ClearAsync.await(); + } + + + private static class CountingCacheListener extends CacheListenerAdapter { + private final AtomicInteger clears = new AtomicInteger(); + + @Override + public void afterRegionClear(RegionEvent event) { + Region region = event.getRegion(); Review comment: Unused variable. ########## File path: geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionAfterClearNotificationDUnitTest.java ########## @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.geode.internal.Assert.fail; +import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getCache; +import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getClientCache; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.Serializable; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.geode.cache.InterestResultPolicy; +import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionEvent; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.server.CacheServer; +import org.apache.geode.cache.util.CacheListenerAdapter; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DistributionMessage; +import org.apache.geode.distributed.internal.DistributionMessageObserver; +import org.apache.geode.test.awaitility.GeodeAwaitility; +import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.dunit.DUnitBlackboard; +import org.apache.geode.test.dunit.SerializableCallableIF; +import org.apache.geode.test.dunit.rules.ClientVM; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; + +public class PartitionedRegionAfterClearNotificationDUnitTest implements Serializable { + protected static final String REGION_NAME = "testPR"; + protected static final int NUM_ENTRIES = 100; + + protected int locatorPort; + protected MemberVM locator; + protected MemberVM dataStore1, dataStore2, dataStore3, accessor; + protected ClientVM client1, client2; + + private static final Logger logger = LogManager.getLogger(); + + private static volatile DUnitBlackboard blackboard; + + @Rule + public ClusterStartupRule cluster = new ClusterStartupRule(7); + + @Before + public void setUp() throws Exception { + locator = cluster.startLocatorVM(0); + locatorPort = locator.getPort(); + dataStore1 = cluster.startServerVM(1, getProperties(), locatorPort); + dataStore2 = cluster.startServerVM(2, getProperties(), locatorPort); + dataStore3 = cluster.startServerVM(3, getProperties(), locatorPort); + accessor = cluster.startServerVM(4, getProperties(), locatorPort); + + client1 = cluster.startClientVM(5, + c -> c.withPoolSubscription(true).withLocatorConnection((locatorPort))); + client2 = cluster.startClientVM(6, + c -> c.withPoolSubscription(true).withLocatorConnection((locatorPort))); + + dataStore1.invoke(this::initDataStore); + dataStore2.invoke(this::initDataStore); + dataStore3.invoke(this::initDataStore); + accessor.invoke(this::initAccessor); + + getBlackboard().initBlackboard(); + } + + protected RegionShortcut getRegionShortCut() { + return RegionShortcut.PARTITION_REDUNDANT; + } + + protected Properties getProperties() { + Properties properties = new Properties(); + // properties.setProperty("log-level", "info"); + return properties; + } + + private Region getRegion(boolean isClient) { + if (isClient) { + return getClientCache().getRegion(REGION_NAME); + } else { + return getCache().getRegion(REGION_NAME); + } + } + + private void verifyRegionSize(boolean isClient, int expectedNum) { + GeodeAwaitility.await().atMost(10, SECONDS) + .untilAsserted(() -> assertThat(getRegion(isClient).size()).isEqualTo(expectedNum)); + } + + private void initClientCache() { + Region region = getClientCache().createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY) + .create(REGION_NAME); + region.registerInterestForAllKeys(InterestResultPolicy.KEYS); + } + + private void stopServers() { + List<CacheServer> cacheServers = getCache().getCacheServers(); + for (CacheServer server : cacheServers) { + server.stop(); + } + } + + private void initDataStore() { + getCache().createRegionFactory(getRegionShortCut()) + .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(10).create()) + .addCacheListener(new CountingCacheListener()) + .create(REGION_NAME); + } + + private void initAccessor() { + RegionShortcut shortcut = getRegionShortCut(); + if (shortcut.isPersistent()) { + if (shortcut == RegionShortcut.PARTITION_PERSISTENT) { + shortcut = RegionShortcut.PARTITION; + } else if (shortcut == RegionShortcut.PARTITION_PERSISTENT_OVERFLOW) { + shortcut = RegionShortcut.PARTITION_OVERFLOW; + } else if (shortcut == RegionShortcut.PARTITION_REDUNDANT_PERSISTENT) { + shortcut = RegionShortcut.PARTITION_REDUNDANT; + } else if (shortcut == RegionShortcut.PARTITION_REDUNDANT_PERSISTENT_OVERFLOW) { + shortcut = RegionShortcut.PARTITION_REDUNDANT_OVERFLOW; + } else { + fail("Wrong region type:" + shortcut); + } + } + getCache().createRegionFactory(shortcut) + .setPartitionAttributes( + new PartitionAttributesFactory().setTotalNumBuckets(10).setLocalMaxMemory(0).create()) + .addCacheListener(new CountingCacheListener()) + .create(REGION_NAME); + } + + private void feed(boolean isClient) { + Region region = getRegion(isClient); + IntStream.range(0, NUM_ENTRIES).forEach(i -> region.put(i, "value" + i)); + } + + private void verifyServerRegionSize(int expectedNum) { + accessor.invoke(() -> verifyRegionSize(false, expectedNum)); + dataStore1.invoke(() -> verifyRegionSize(false, expectedNum)); + dataStore2.invoke(() -> verifyRegionSize(false, expectedNum)); + dataStore3.invoke(() -> verifyRegionSize(false, expectedNum)); + } + + private void verifyClientRegionSize(int expectedNum) { + client1.invoke(() -> verifyRegionSize(true, expectedNum)); + client2.invoke(() -> verifyRegionSize(true, expectedNum)); + } + + private void verifyCacheListenerTriggerCount(MemberVM serverVM) { + SerializableCallableIF<Integer> getListenerTriggerCount = () -> { + CountingCacheListener countingCacheListener = + (CountingCacheListener) getRegion(false).getAttributes() + .getCacheListeners()[0]; + return countingCacheListener.getClears(); + }; + + int count = accessor.invoke(getListenerTriggerCount) + + dataStore1.invoke(getListenerTriggerCount) + + dataStore2.invoke(getListenerTriggerCount) + + dataStore3.invoke(getListenerTriggerCount); + assertThat(count).isEqualTo(4); + + if (serverVM != null) { + assertThat(serverVM.invoke(getListenerTriggerCount)).isEqualTo(1); + } + } + + @Test + public void normalClearFromDataStore() { + accessor.invoke(() -> feed(false)); + verifyServerRegionSize(NUM_ENTRIES); + + dataStore1.invoke(() -> getRegion(false).clear()); + + verifyServerRegionSize(0); + verifyCacheListenerTriggerCount(dataStore1); + } + + @Test + public void normalClearFromAccessor() { + accessor.invoke(() -> feed(false)); + verifyServerRegionSize(NUM_ENTRIES); + accessor.invoke(() -> getRegion(false).clear()); + verifyServerRegionSize(0); + verifyCacheListenerTriggerCount(accessor); + } + + @Test + public void normalClearFromClient() { + client1.invoke(this::initClientCache); + client2.invoke(this::initClientCache); + + client1.invoke(() -> feed(true)); + verifyClientRegionSize(NUM_ENTRIES); + verifyServerRegionSize(NUM_ENTRIES); + + client1.invoke(() -> getRegion(true).clear()); + + verifyServerRegionSize(0); + verifyClientRegionSize(0); + verifyCacheListenerTriggerCount(null); + } + + @Test + public void clearFromClientWithAccessorAsServer() { + dataStore1.invoke(this::stopServers); + dataStore2.invoke(this::stopServers); + dataStore3.invoke(this::stopServers); + + client1.invoke(this::initClientCache); + client2.invoke(this::initClientCache); + + client1.invoke(() -> feed(true)); + verifyClientRegionSize(NUM_ENTRIES); + verifyServerRegionSize(NUM_ENTRIES); + + client1.invoke(() -> getRegion(true).clear()); + + verifyServerRegionSize(0); + verifyClientRegionSize(0); + verifyCacheListenerTriggerCount(null); + } + + @Test + public void normalClearFromDataStoreWithClientInterest() { + client1.invoke(this::initClientCache); + client2.invoke(this::initClientCache); + + accessor.invoke(() -> feed(false)); + verifyServerRegionSize(NUM_ENTRIES); + + dataStore1.invoke(() -> getRegion(false).clear()); + + verifyServerRegionSize(0); + verifyCacheListenerTriggerCount(dataStore1); + } + + @Test(expected = AssertionError.class) + public void verifyTheLocksAreClearedWhenMemberDepartsAfterTakingClearLockOnRemoteMembers() + throws Exception { + client1.invoke(this::initClientCache); + client2.invoke(this::initClientCache); + + accessor.invoke(() -> feed(false)); + verifyServerRegionSize(NUM_ENTRIES); + dataStore2.invoke(() -> DistributionMessageObserver.setInstance( + testHookToKillMemberCallingClearBeforeMessageProcessed())); + + AsyncInvocation ds1ClearAsync = dataStore1.invokeAsync(() -> getRegion(false).clear()); + + getBlackboard().waitForGate("CacheClose", 30, SECONDS); Review comment: +1 ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/ClearPartitionedRegion.java ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.CancelException; +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.partition.PartitionRegionHelper; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.MembershipListener; +import org.apache.geode.distributed.internal.ReplyException; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.versions.RegionVersionVector; +import org.apache.geode.logging.internal.log4j.api.LogService; + +public class ClearPartitionedRegion { Review comment: Can we change the name to something more "meaningful"?, maybe `PartitionedRegionClearer` or something that lets the reader know that the class holds the actual implementation for clearing a `Partitioned Region`?. ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/ClearPartitionedRegion.java ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.CancelException; +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.partition.PartitionRegionHelper; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.MembershipListener; +import org.apache.geode.distributed.internal.ReplyException; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.versions.RegionVersionVector; +import org.apache.geode.logging.internal.log4j.api.LogService; + +public class ClearPartitionedRegion { + + private static final Logger logger = LogService.getLogger(); + + private final PartitionedRegion partitionedRegion; + + private LockForListenerAndClientNotification lockForListenerAndClientNotification = + new LockForListenerAndClientNotification(); + + private volatile boolean membershipchange = false; Review comment: +1 ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java ########## @@ -980,6 +980,14 @@ protected void lockBucketCreationAndVisit(BucketVisitor visitor) { } } + protected void lockBucketCreationForRegionClear() { + bucketCreationLock.writeLock().lock(); + } + + protected void unLockBucketCreationForRegionClear() { Review comment: Should probably be `unlockBucketCreationForRegionClear`. ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java ########## @@ -834,7 +834,7 @@ public InternalDistributedMember adviseFixedPrimaryPartitionDataStore(final int }); } - Set adviseAllServersWithInterest() { + public Set adviseAllServersWithInterest() { Review comment: No need to make it `public`, it's only invoked from `PRTombstoneMessage`, which belongs to the same package. ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/ClearPartitionedRegion.java ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.CancelException; +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.partition.PartitionRegionHelper; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.MembershipListener; +import org.apache.geode.distributed.internal.ReplyException; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.versions.RegionVersionVector; +import org.apache.geode.logging.internal.log4j.api.LogService; + +public class ClearPartitionedRegion { + + private static final Logger logger = LogService.getLogger(); + + private final PartitionedRegion partitionedRegion; + + private LockForListenerAndClientNotification lockForListenerAndClientNotification = + new LockForListenerAndClientNotification(); + + private volatile boolean membershipchange = false; + + public ClearPartitionedRegion(PartitionedRegion partitionedRegion) { + this.partitionedRegion = partitionedRegion; + partitionedRegion.getDistributionManager() + .addMembershipListener(new ClearPartitionedRegionListener()); + } + + public boolean isLockedForListenerAndClientNotification() { + return lockForListenerAndClientNotification.isLocked(); + } + + void acquireDistributedClearLock(String clearLock) { + try { + partitionedRegion.getPartitionedRegionLockService().lock(clearLock, -1, -1); + } catch (IllegalStateException e) { + partitionedRegion.lockCheckReadiness(); + throw e; + } + } + + void releaseDistributedClearLock(String clearLock) { + try { + partitionedRegion.getPartitionedRegionLockService().unlock(clearLock); + } catch (IllegalStateException e) { + partitionedRegion.lockCheckReadiness(); + } catch (Exception ex) { + logger.warn("Caught exception while unlocking clear distributed lock", ex.getMessage()); + } + } + + void obtainLockForClear(RegionEventImpl event) { + sendClearRegionMessage(event, + ClearPartitionedRegionMessage.OperationType.OP_LOCK_FOR_PR_CLEAR); + obtainClearLockLocal(partitionedRegion.getDistributionManager().getId()); + } + + void releaseLockForClear(RegionEventImpl event) { + sendClearRegionMessage(event, + ClearPartitionedRegionMessage.OperationType.OP_UNLOCK_FOR_PR_CLEAR); + releaseClearLockLocal(); + } + + void clearRegion(RegionEventImpl regionEvent, boolean cacheWrite, + RegionVersionVector vector) { + clearRegionLocal(regionEvent, cacheWrite, null); + sendClearRegionMessage(regionEvent, + ClearPartitionedRegionMessage.OperationType.OP_PR_CLEAR); + } + + private void waitForPrimary() { + boolean retry; + int retryTime = 2 * 60 * 1000 /* partitionedRegion.getRetryTimeout() */; + PartitionedRegion.RetryTimeKeeper retryTimer = new PartitionedRegion.RetryTimeKeeper(retryTime); + do { + retry = false; + for (BucketRegion bucketRegion : partitionedRegion.getDataStore() + .getAllLocalBucketRegions()) { + if (!bucketRegion.getBucketAdvisor().hasPrimary()) { + if (retryTimer.overMaximum()) { + PRHARedundancyProvider.timedOut(partitionedRegion, null, null, + "do clear. Missing primary bucket", + retryTime); + } + retryTimer.waitForBucketsRecovery(); + retry = true; + } + } + } while (retry); + } + + public void clearRegionLocal(RegionEventImpl regionEvent, boolean cacheWrite, + RegionVersionVector vector) { Review comment: +1 ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/ClearPartitionedRegion.java ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.CancelException; +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.partition.PartitionRegionHelper; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.MembershipListener; +import org.apache.geode.distributed.internal.ReplyException; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.versions.RegionVersionVector; +import org.apache.geode.logging.internal.log4j.api.LogService; + +public class ClearPartitionedRegion { + + private static final Logger logger = LogService.getLogger(); + + private final PartitionedRegion partitionedRegion; + + private LockForListenerAndClientNotification lockForListenerAndClientNotification = + new LockForListenerAndClientNotification(); + + private volatile boolean membershipchange = false; + + public ClearPartitionedRegion(PartitionedRegion partitionedRegion) { + this.partitionedRegion = partitionedRegion; + partitionedRegion.getDistributionManager() + .addMembershipListener(new ClearPartitionedRegionListener()); + } + + public boolean isLockedForListenerAndClientNotification() { + return lockForListenerAndClientNotification.isLocked(); + } + + void acquireDistributedClearLock(String clearLock) { + try { + partitionedRegion.getPartitionedRegionLockService().lock(clearLock, -1, -1); + } catch (IllegalStateException e) { + partitionedRegion.lockCheckReadiness(); + throw e; + } + } + + void releaseDistributedClearLock(String clearLock) { + try { + partitionedRegion.getPartitionedRegionLockService().unlock(clearLock); + } catch (IllegalStateException e) { + partitionedRegion.lockCheckReadiness(); + } catch (Exception ex) { + logger.warn("Caught exception while unlocking clear distributed lock", ex.getMessage()); Review comment: I think you're missing a `{}` within the log message, otherwise the `ex.getMessage()` won't be shown. ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/ClearPartitionedRegion.java ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.CancelException; +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.partition.PartitionRegionHelper; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.MembershipListener; +import org.apache.geode.distributed.internal.ReplyException; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.versions.RegionVersionVector; +import org.apache.geode.logging.internal.log4j.api.LogService; + +public class ClearPartitionedRegion { + + private static final Logger logger = LogService.getLogger(); + + private final PartitionedRegion partitionedRegion; + + private LockForListenerAndClientNotification lockForListenerAndClientNotification = + new LockForListenerAndClientNotification(); + + private volatile boolean membershipchange = false; + + public ClearPartitionedRegion(PartitionedRegion partitionedRegion) { + this.partitionedRegion = partitionedRegion; + partitionedRegion.getDistributionManager() + .addMembershipListener(new ClearPartitionedRegionListener()); + } + + public boolean isLockedForListenerAndClientNotification() { + return lockForListenerAndClientNotification.isLocked(); + } + + void acquireDistributedClearLock(String clearLock) { + try { + partitionedRegion.getPartitionedRegionLockService().lock(clearLock, -1, -1); + } catch (IllegalStateException e) { + partitionedRegion.lockCheckReadiness(); + throw e; + } + } + + void releaseDistributedClearLock(String clearLock) { + try { + partitionedRegion.getPartitionedRegionLockService().unlock(clearLock); + } catch (IllegalStateException e) { + partitionedRegion.lockCheckReadiness(); + } catch (Exception ex) { + logger.warn("Caught exception while unlocking clear distributed lock", ex.getMessage()); + } + } + + void obtainLockForClear(RegionEventImpl event) { + sendClearRegionMessage(event, + ClearPartitionedRegionMessage.OperationType.OP_LOCK_FOR_PR_CLEAR); + obtainClearLockLocal(partitionedRegion.getDistributionManager().getId()); + } + + void releaseLockForClear(RegionEventImpl event) { + sendClearRegionMessage(event, + ClearPartitionedRegionMessage.OperationType.OP_UNLOCK_FOR_PR_CLEAR); + releaseClearLockLocal(); + } + + void clearRegion(RegionEventImpl regionEvent, boolean cacheWrite, + RegionVersionVector vector) { + clearRegionLocal(regionEvent, cacheWrite, null); + sendClearRegionMessage(regionEvent, + ClearPartitionedRegionMessage.OperationType.OP_PR_CLEAR); + } + + private void waitForPrimary() { + boolean retry; + int retryTime = 2 * 60 * 1000 /* partitionedRegion.getRetryTimeout() */; + PartitionedRegion.RetryTimeKeeper retryTimer = new PartitionedRegion.RetryTimeKeeper(retryTime); + do { + retry = false; + for (BucketRegion bucketRegion : partitionedRegion.getDataStore() + .getAllLocalBucketRegions()) { + if (!bucketRegion.getBucketAdvisor().hasPrimary()) { + if (retryTimer.overMaximum()) { + PRHARedundancyProvider.timedOut(partitionedRegion, null, null, + "do clear. Missing primary bucket", + retryTime); + } + retryTimer.waitForBucketsRecovery(); + retry = true; + } + } + } while (retry); + } + + public void clearRegionLocal(RegionEventImpl regionEvent, boolean cacheWrite, + RegionVersionVector vector) { + // Synchronized to handle the requester departure. + synchronized (lockForListenerAndClientNotification) { + if (partitionedRegion.getDataStore() != null) { + partitionedRegion.getDataStore().lockBucketCreationForRegionClear(); + try { + boolean retry; + do { + retry = false; + for (BucketRegion localPrimaryBucketRegion : partitionedRegion.getDataStore() + .getAllLocalPrimaryBucketRegions()) { + if (localPrimaryBucketRegion.size() > 0) { + localPrimaryBucketRegion.clear(); + } + } + if (membershipchange) { + membershipchange = false; + retry = true; + waitForPrimary(); + } + } while (retry); + + doAfterClear(regionEvent); + } finally { + partitionedRegion.getDataStore().unLockBucketCreationForRegionClear(); + } + } else { + // Non data-store with client queue and listener + doAfterClear(regionEvent); + } + } + } Review comment: The method `doAfterClear(regionEvent)` is invoked no matter what (at leat when there's no exception thrown), so maybe moving it outside of the `if/else` block might make things more readable. ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/ClearPartitionedRegion.java ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.CancelException; +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.partition.PartitionRegionHelper; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.MembershipListener; +import org.apache.geode.distributed.internal.ReplyException; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.versions.RegionVersionVector; +import org.apache.geode.logging.internal.log4j.api.LogService; + +public class ClearPartitionedRegion { + + private static final Logger logger = LogService.getLogger(); + + private final PartitionedRegion partitionedRegion; + + private LockForListenerAndClientNotification lockForListenerAndClientNotification = Review comment: +1 ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java ########## @@ -459,4 +459,7 @@ VersionedObjectList basicRemoveAll(Collection<Object> keys, * @return true if synchronization should be attempted */ boolean shouldSyncForCrashedMember(InternalDistributedMember id); + + void clearRegionLocal(RegionEventImpl regionEvent, boolean cacheWrite, Review comment: Maybe it's just me, but the old method name , `clearRegionLocally`, seems to be easier to understand for the reader?. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Partitioned Region clear operations must invoke cache level listeners > --------------------------------------------------------------------- > > Key: GEODE-7678 > URL: https://issues.apache.org/jira/browse/GEODE-7678 > Project: Geode > Issue Type: Sub-task > Components: regions > Reporter: Nabarun Nag > Priority: Major > Labels: GeodeCommons > > Clear operations are successful and CacheListener.afterRegionClear(), > CacheWriter.beforeRegionClear() are invoked. > > Acceptance : > * DUnit tests validating the above behavior. > * Test coverage to when a member departs in this scenario > * Test coverage to when a member restarts in this scenario > * Unit tests with complete code coverage for the newly written code. > -- This message was sent by Atlassian Jira (v8.3.4#803005)