[ https://issues.apache.org/jira/browse/GEODE-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094798#comment-17094798 ]
ASF GitHub Bot commented on GEODE-7678: --------------------------------------- DonalEvans commented on a change in pull request #4987: URL: https://github.com/apache/geode/pull/4987#discussion_r416766676 ########## File path: geode-core/src/distributedTest/java/org/apache/geode/cache/ReplicateCacheListenerDistributedTest.java ########## @@ -302,4 +349,64 @@ public void afterDestroy(final EntryEvent<String, Integer> event) { errorCollector.checkThat(event.getNewValue(), nullValue()); } } + + protected class ClearCountingCacheListener extends BaseCacheListener { Review comment: Is there a reason that we need six different CacheListener implementations here? Would it be possible to combine them into one? If not, for consistency, it might be good to have each one only implement the "afterX" method that that listener actually cares about, like how the `InvalidateCountingCacheListener` only implements the `afterInvalidate` method. ########## File path: geode-core/src/distributedTest/java/org/apache/geode/cache/PRCacheListenerDistributedTest.java ########## @@ -38,22 +45,57 @@ @SuppressWarnings("serial") public class PRCacheListenerDistributedTest extends ReplicateCacheListenerDistributedTest { - @Parameters(name = "{index}: redundancy={0}") - public static Iterable<Integer> data() { - return Arrays.asList(0, 3); + @Parameters + public static Collection<Object[]> data() { + return Arrays.asList(new Object[][] { + {1, Boolean.FALSE}, + {3, Boolean.TRUE}, + }); } @Parameter public int redundancy; + @Parameter(1) + public Boolean withData; + @Override protected Region<String, Integer> createRegion(final String name, final CacheListener<String, Integer> listener) { + LogService.getLogger() + .info("Params [Redundancy: " + redundancy + " withData:" + withData + "]"); PartitionAttributesFactory<String, Integer> paf = new PartitionAttributesFactory<>(); paf.setRedundantCopies(redundancy); RegionFactory<String, Integer> regionFactory = cacheRule.getCache().createRegionFactory(); - regionFactory.addCacheListener(listener); + if (listener != null) { + regionFactory.addCacheListener(listener); + } + regionFactory.setDataPolicy(DataPolicy.PARTITION); + regionFactory.setPartitionAttributes(paf.create()); + + return regionFactory.create(name); + } + + private void withData(Region region) { + if (withData) { + // Fewer buckets. + // Covers case where node doesn';'t have any buckets depending on redundancy. Review comment: Small typo here, should be `doesn't`. ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/ClearPartitionedRegion.java ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.CancelException; +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.partition.PartitionRegionHelper; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.MembershipListener; +import org.apache.geode.distributed.internal.ReplyException; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.versions.RegionVersionVector; +import org.apache.geode.logging.internal.log4j.api.LogService; + +public class ClearPartitionedRegion { + + private static final Logger logger = LogService.getLogger(); + + private final PartitionedRegion partitionedRegion; + + private LockForListenerAndClientNotification lockForListenerAndClientNotification = Review comment: Could this also be `final`? ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/ClearPartitionedRegion.java ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.CancelException; +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.partition.PartitionRegionHelper; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.MembershipListener; +import org.apache.geode.distributed.internal.ReplyException; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.versions.RegionVersionVector; +import org.apache.geode.logging.internal.log4j.api.LogService; + +public class ClearPartitionedRegion { + + private static final Logger logger = LogService.getLogger(); + + private final PartitionedRegion partitionedRegion; + + private LockForListenerAndClientNotification lockForListenerAndClientNotification = + new LockForListenerAndClientNotification(); + + private volatile boolean membershipchange = false; Review comment: This should probably be membershipChange. ########## File path: geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionAfterClearNotificationDUnitTest.java ########## @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.geode.internal.Assert.fail; +import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getCache; +import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getClientCache; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.Serializable; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.geode.cache.InterestResultPolicy; +import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionEvent; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.server.CacheServer; +import org.apache.geode.cache.util.CacheListenerAdapter; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DistributionMessage; +import org.apache.geode.distributed.internal.DistributionMessageObserver; +import org.apache.geode.test.awaitility.GeodeAwaitility; +import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.dunit.DUnitBlackboard; +import org.apache.geode.test.dunit.SerializableCallableIF; +import org.apache.geode.test.dunit.rules.ClientVM; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; + +public class PartitionedRegionAfterClearNotificationDUnitTest implements Serializable { + protected static final String REGION_NAME = "testPR"; + protected static final int NUM_ENTRIES = 100; + + protected int locatorPort; + protected MemberVM locator; + protected MemberVM dataStore1, dataStore2, dataStore3, accessor; + protected ClientVM client1, client2; + + private static final Logger logger = LogManager.getLogger(); + + private static volatile DUnitBlackboard blackboard; + + @Rule + public ClusterStartupRule cluster = new ClusterStartupRule(7); + + @Before + public void setUp() throws Exception { + locator = cluster.startLocatorVM(0); + locatorPort = locator.getPort(); + dataStore1 = cluster.startServerVM(1, getProperties(), locatorPort); + dataStore2 = cluster.startServerVM(2, getProperties(), locatorPort); + dataStore3 = cluster.startServerVM(3, getProperties(), locatorPort); + accessor = cluster.startServerVM(4, getProperties(), locatorPort); + + client1 = cluster.startClientVM(5, + c -> c.withPoolSubscription(true).withLocatorConnection((locatorPort))); + client2 = cluster.startClientVM(6, + c -> c.withPoolSubscription(true).withLocatorConnection((locatorPort))); + + dataStore1.invoke(this::initDataStore); + dataStore2.invoke(this::initDataStore); + dataStore3.invoke(this::initDataStore); + accessor.invoke(this::initAccessor); + + getBlackboard().initBlackboard(); + } + + protected RegionShortcut getRegionShortCut() { + return RegionShortcut.PARTITION_REDUNDANT; + } + + protected Properties getProperties() { + Properties properties = new Properties(); + // properties.setProperty("log-level", "info"); + return properties; + } + + private Region getRegion(boolean isClient) { + if (isClient) { + return getClientCache().getRegion(REGION_NAME); + } else { + return getCache().getRegion(REGION_NAME); + } + } + + private void verifyRegionSize(boolean isClient, int expectedNum) { + GeodeAwaitility.await().atMost(10, SECONDS) + .untilAsserted(() -> assertThat(getRegion(isClient).size()).isEqualTo(expectedNum)); + } + + private void initClientCache() { + Region region = getClientCache().createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY) + .create(REGION_NAME); + region.registerInterestForAllKeys(InterestResultPolicy.KEYS); + } + + private void stopServers() { + List<CacheServer> cacheServers = getCache().getCacheServers(); + for (CacheServer server : cacheServers) { + server.stop(); + } + } + + private void initDataStore() { + getCache().createRegionFactory(getRegionShortCut()) + .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(10).create()) + .addCacheListener(new CountingCacheListener()) + .create(REGION_NAME); + } + + private void initAccessor() { + RegionShortcut shortcut = getRegionShortCut(); + if (shortcut.isPersistent()) { + if (shortcut == RegionShortcut.PARTITION_PERSISTENT) { + shortcut = RegionShortcut.PARTITION; + } else if (shortcut == RegionShortcut.PARTITION_PERSISTENT_OVERFLOW) { + shortcut = RegionShortcut.PARTITION_OVERFLOW; + } else if (shortcut == RegionShortcut.PARTITION_REDUNDANT_PERSISTENT) { + shortcut = RegionShortcut.PARTITION_REDUNDANT; + } else if (shortcut == RegionShortcut.PARTITION_REDUNDANT_PERSISTENT_OVERFLOW) { + shortcut = RegionShortcut.PARTITION_REDUNDANT_OVERFLOW; + } else { + fail("Wrong region type:" + shortcut); + } + } + getCache().createRegionFactory(shortcut) + .setPartitionAttributes( + new PartitionAttributesFactory().setTotalNumBuckets(10).setLocalMaxMemory(0).create()) + .addCacheListener(new CountingCacheListener()) + .create(REGION_NAME); + } + + private void feed(boolean isClient) { + Region region = getRegion(isClient); + IntStream.range(0, NUM_ENTRIES).forEach(i -> region.put(i, "value" + i)); + } + + private void verifyServerRegionSize(int expectedNum) { + accessor.invoke(() -> verifyRegionSize(false, expectedNum)); + dataStore1.invoke(() -> verifyRegionSize(false, expectedNum)); + dataStore2.invoke(() -> verifyRegionSize(false, expectedNum)); + dataStore3.invoke(() -> verifyRegionSize(false, expectedNum)); + } + + private void verifyClientRegionSize(int expectedNum) { + client1.invoke(() -> verifyRegionSize(true, expectedNum)); + client2.invoke(() -> verifyRegionSize(true, expectedNum)); + } + + private void verifyCacheListenerTriggerCount(MemberVM serverVM) { + SerializableCallableIF<Integer> getListenerTriggerCount = () -> { + CountingCacheListener countingCacheListener = + (CountingCacheListener) getRegion(false).getAttributes() + .getCacheListeners()[0]; + return countingCacheListener.getClears(); + }; + + int count = accessor.invoke(getListenerTriggerCount) + + dataStore1.invoke(getListenerTriggerCount) + + dataStore2.invoke(getListenerTriggerCount) + + dataStore3.invoke(getListenerTriggerCount); + assertThat(count).isEqualTo(4); + + if (serverVM != null) { + assertThat(serverVM.invoke(getListenerTriggerCount)).isEqualTo(1); + } + } + + @Test + public void normalClearFromDataStore() { + accessor.invoke(() -> feed(false)); + verifyServerRegionSize(NUM_ENTRIES); + + dataStore1.invoke(() -> getRegion(false).clear()); + + verifyServerRegionSize(0); + verifyCacheListenerTriggerCount(dataStore1); + } + + @Test + public void normalClearFromAccessor() { + accessor.invoke(() -> feed(false)); + verifyServerRegionSize(NUM_ENTRIES); + accessor.invoke(() -> getRegion(false).clear()); + verifyServerRegionSize(0); + verifyCacheListenerTriggerCount(accessor); + } + + @Test + public void normalClearFromClient() { + client1.invoke(this::initClientCache); + client2.invoke(this::initClientCache); + + client1.invoke(() -> feed(true)); + verifyClientRegionSize(NUM_ENTRIES); + verifyServerRegionSize(NUM_ENTRIES); + + client1.invoke(() -> getRegion(true).clear()); + + verifyServerRegionSize(0); + verifyClientRegionSize(0); + verifyCacheListenerTriggerCount(null); + } + + @Test + public void clearFromClientWithAccessorAsServer() { + dataStore1.invoke(this::stopServers); + dataStore2.invoke(this::stopServers); + dataStore3.invoke(this::stopServers); + + client1.invoke(this::initClientCache); + client2.invoke(this::initClientCache); + + client1.invoke(() -> feed(true)); + verifyClientRegionSize(NUM_ENTRIES); + verifyServerRegionSize(NUM_ENTRIES); + + client1.invoke(() -> getRegion(true).clear()); + + verifyServerRegionSize(0); + verifyClientRegionSize(0); + verifyCacheListenerTriggerCount(null); + } + + @Test + public void normalClearFromDataStoreWithClientInterest() { + client1.invoke(this::initClientCache); + client2.invoke(this::initClientCache); + + accessor.invoke(() -> feed(false)); + verifyServerRegionSize(NUM_ENTRIES); + + dataStore1.invoke(() -> getRegion(false).clear()); + + verifyServerRegionSize(0); + verifyCacheListenerTriggerCount(dataStore1); + } + + @Test(expected = AssertionError.class) + public void verifyTheLocksAreClearedWhenMemberDepartsAfterTakingClearLockOnRemoteMembers() + throws Exception { + client1.invoke(this::initClientCache); + client2.invoke(this::initClientCache); + + accessor.invoke(() -> feed(false)); + verifyServerRegionSize(NUM_ENTRIES); + dataStore2.invoke(() -> DistributionMessageObserver.setInstance( + testHookToKillMemberCallingClearBeforeMessageProcessed())); + + AsyncInvocation ds1ClearAsync = dataStore1.invokeAsync(() -> getRegion(false).clear()); + + getBlackboard().waitForGate("CacheClose", 30, SECONDS); Review comment: The gate names "CacheClose" and "CacheClosed" are very similar. It might be better to extract the strings to constants with names like `START_CACHE_CLOSE` and `CACHE_CLOSE_COMPLETE` or something similarly descriptive. ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/ClearPartitionedRegion.java ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.CancelException; +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.partition.PartitionRegionHelper; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.MembershipListener; +import org.apache.geode.distributed.internal.ReplyException; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.versions.RegionVersionVector; +import org.apache.geode.logging.internal.log4j.api.LogService; + +public class ClearPartitionedRegion { + + private static final Logger logger = LogService.getLogger(); + + private final PartitionedRegion partitionedRegion; + + private LockForListenerAndClientNotification lockForListenerAndClientNotification = + new LockForListenerAndClientNotification(); + + private volatile boolean membershipchange = false; + + public ClearPartitionedRegion(PartitionedRegion partitionedRegion) { + this.partitionedRegion = partitionedRegion; + partitionedRegion.getDistributionManager() + .addMembershipListener(new ClearPartitionedRegionListener()); + } + + public boolean isLockedForListenerAndClientNotification() { + return lockForListenerAndClientNotification.isLocked(); + } + + void acquireDistributedClearLock(String clearLock) { + try { + partitionedRegion.getPartitionedRegionLockService().lock(clearLock, -1, -1); + } catch (IllegalStateException e) { + partitionedRegion.lockCheckReadiness(); + throw e; + } + } + + void releaseDistributedClearLock(String clearLock) { + try { + partitionedRegion.getPartitionedRegionLockService().unlock(clearLock); + } catch (IllegalStateException e) { + partitionedRegion.lockCheckReadiness(); + } catch (Exception ex) { + logger.warn("Caught exception while unlocking clear distributed lock", ex.getMessage()); + } + } + + void obtainLockForClear(RegionEventImpl event) { + sendClearRegionMessage(event, + ClearPartitionedRegionMessage.OperationType.OP_LOCK_FOR_PR_CLEAR); + obtainClearLockLocal(partitionedRegion.getDistributionManager().getId()); + } + + void releaseLockForClear(RegionEventImpl event) { + sendClearRegionMessage(event, + ClearPartitionedRegionMessage.OperationType.OP_UNLOCK_FOR_PR_CLEAR); + releaseClearLockLocal(); + } + + void clearRegion(RegionEventImpl regionEvent, boolean cacheWrite, + RegionVersionVector vector) { + clearRegionLocal(regionEvent, cacheWrite, null); + sendClearRegionMessage(regionEvent, + ClearPartitionedRegionMessage.OperationType.OP_PR_CLEAR); + } + + private void waitForPrimary() { + boolean retry; + int retryTime = 2 * 60 * 1000 /* partitionedRegion.getRetryTimeout() */; + PartitionedRegion.RetryTimeKeeper retryTimer = new PartitionedRegion.RetryTimeKeeper(retryTime); + do { + retry = false; + for (BucketRegion bucketRegion : partitionedRegion.getDataStore() + .getAllLocalBucketRegions()) { + if (!bucketRegion.getBucketAdvisor().hasPrimary()) { + if (retryTimer.overMaximum()) { + PRHARedundancyProvider.timedOut(partitionedRegion, null, null, + "do clear. Missing primary bucket", + retryTime); + } + retryTimer.waitForBucketsRecovery(); + retry = true; + } + } + } while (retry); + } + + public void clearRegionLocal(RegionEventImpl regionEvent, boolean cacheWrite, + RegionVersionVector vector) { + // Synchronized to handle the requester departure. + synchronized (lockForListenerAndClientNotification) { + if (partitionedRegion.getDataStore() != null) { + partitionedRegion.getDataStore().lockBucketCreationForRegionClear(); + try { + boolean retry; + do { + retry = false; + for (BucketRegion localPrimaryBucketRegion : partitionedRegion.getDataStore() + .getAllLocalPrimaryBucketRegions()) { + if (localPrimaryBucketRegion.size() > 0) { + localPrimaryBucketRegion.clear(); + } + } + if (membershipchange) { + membershipchange = false; + retry = true; + waitForPrimary(); + } + } while (retry); + + doAfterClear(regionEvent); + } finally { + partitionedRegion.getDataStore().unLockBucketCreationForRegionClear(); + } + } else { + // Non data-store with client queue and listener + doAfterClear(regionEvent); + } + } + } + + private void doAfterClear(RegionEventImpl regionEvent) { + if (partitionedRegion.hasAnyClientsInterested()) { + notifyClients(regionEvent); + } + + if (partitionedRegion.hasListener()) { + partitionedRegion.dispatchListenerEvent(EnumListenerEvent.AFTER_REGION_CLEAR, regionEvent); + } + } + + void notifyClients(RegionEventImpl event) { + // Set client routing information into the event + // The clear operation in case of PR is distributed differently + // hence the FilterRoutingInfo is set here instead of + // DistributedCacheOperation.distribute(). + event.setEventType(EnumListenerEvent.AFTER_REGION_CLEAR); + if (!partitionedRegion.isUsedForMetaRegion() && !partitionedRegion + .isUsedForPartitionedRegionAdmin() + && !partitionedRegion.isUsedForPartitionedRegionBucket() && !partitionedRegion + .isUsedForParallelGatewaySenderQueue()) { + + FilterRoutingInfo localCqFrInfo = + partitionedRegion.getFilterProfile().getFilterRoutingInfoPart1(event, + FilterProfile.NO_PROFILES, Collections.emptySet()); + + FilterRoutingInfo localCqInterestFrInfo = + partitionedRegion.getFilterProfile().getFilterRoutingInfoPart2(localCqFrInfo, event); + + if (localCqInterestFrInfo != null) { + event.setLocalFilterInfo(localCqInterestFrInfo.getLocalFilterInfo()); + } + } + partitionedRegion.notifyBridgeClients(event); + } + + protected void obtainClearLockLocal(InternalDistributedMember requester) { + synchronized (lockForListenerAndClientNotification) { + // Check if the member is still part of the distributed system + if (!partitionedRegion.getDistributionManager().isCurrentMember(requester)) { + return; + } + + lockForListenerAndClientNotification.setLocked(requester); + if (partitionedRegion.getDataStore() != null) { + for (BucketRegion localPrimaryBucketRegion : partitionedRegion.getDataStore() + .getAllLocalPrimaryBucketRegions()) { + try { + localPrimaryBucketRegion.lockLocallyForClear(partitionedRegion.getDistributionManager(), + partitionedRegion.getMyId(), null); + } catch (Exception ex) { + partitionedRegion.checkClosed(); + } + } + } + } + } + + protected void releaseClearLockLocal() { + synchronized (lockForListenerAndClientNotification) { + if (lockForListenerAndClientNotification.getLockRequester() == null) { + // The member has been left. + return; + } + try { + if (partitionedRegion.getDataStore() != null) { + + for (BucketRegion localPrimaryBucketRegion : partitionedRegion.getDataStore() + .getAllLocalPrimaryBucketRegions()) { + try { + localPrimaryBucketRegion.releaseLockLocallyForClear(null); + } catch (Exception ex) { + logger.debug( + "Unable to acquire clear lock for bucket region " + localPrimaryBucketRegion + .getName(), + ex.getMessage()); + partitionedRegion.checkClosed(); + } + } + } + } finally { + lockForListenerAndClientNotification.setUnLocked(); + } + } + } + + private void sendClearRegionMessage(RegionEventImpl event, + ClearPartitionedRegionMessage.OperationType op) { + RegionEventImpl eventForLocalClear = (RegionEventImpl) event.clone(); + eventForLocalClear.setOperation(Operation.REGION_LOCAL_CLEAR); + + boolean retry = true; + while (retry) { + retry = attemptToSendClearRegionMessage(event, op); + } + } + + private boolean attemptToSendClearRegionMessage(RegionEventImpl event, + ClearPartitionedRegionMessage.OperationType op) { + if (partitionedRegion.getPRRoot() == null) { + if (logger.isDebugEnabled()) { + logger.debug( + "Partition region {} failed to initialize. Remove its profile from remote members.", + this); + } + new UpdateAttributesProcessor(partitionedRegion, true).distribute(false); + return false; + } + final HashSet configRecipients = + new HashSet(partitionedRegion.getRegionAdvisor().adviseAllPRNodes()); + + // It's possible this instance has not been initialized + // or hasn't gotten through initialize() far enough to have + // sent a CreateRegionProcessor message, bug 36048 + try { + final PartitionRegionConfig prConfig = + partitionedRegion.getPRRoot().get(partitionedRegion.getRegionIdentifier()); + + if (prConfig != null) { + // Fix for bug#34621 by Tushar Review comment: This comment should probably be removed. ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/ClearPartitionedRegion.java ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.CancelException; +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.partition.PartitionRegionHelper; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.MembershipListener; +import org.apache.geode.distributed.internal.ReplyException; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.versions.RegionVersionVector; +import org.apache.geode.logging.internal.log4j.api.LogService; + +public class ClearPartitionedRegion { + + private static final Logger logger = LogService.getLogger(); + + private final PartitionedRegion partitionedRegion; + + private LockForListenerAndClientNotification lockForListenerAndClientNotification = + new LockForListenerAndClientNotification(); + + private volatile boolean membershipchange = false; + + public ClearPartitionedRegion(PartitionedRegion partitionedRegion) { + this.partitionedRegion = partitionedRegion; + partitionedRegion.getDistributionManager() + .addMembershipListener(new ClearPartitionedRegionListener()); + } + + public boolean isLockedForListenerAndClientNotification() { + return lockForListenerAndClientNotification.isLocked(); + } + + void acquireDistributedClearLock(String clearLock) { + try { + partitionedRegion.getPartitionedRegionLockService().lock(clearLock, -1, -1); + } catch (IllegalStateException e) { + partitionedRegion.lockCheckReadiness(); + throw e; + } + } + + void releaseDistributedClearLock(String clearLock) { + try { + partitionedRegion.getPartitionedRegionLockService().unlock(clearLock); + } catch (IllegalStateException e) { + partitionedRegion.lockCheckReadiness(); + } catch (Exception ex) { + logger.warn("Caught exception while unlocking clear distributed lock", ex.getMessage()); + } + } + + void obtainLockForClear(RegionEventImpl event) { + sendClearRegionMessage(event, + ClearPartitionedRegionMessage.OperationType.OP_LOCK_FOR_PR_CLEAR); + obtainClearLockLocal(partitionedRegion.getDistributionManager().getId()); + } + + void releaseLockForClear(RegionEventImpl event) { + sendClearRegionMessage(event, + ClearPartitionedRegionMessage.OperationType.OP_UNLOCK_FOR_PR_CLEAR); + releaseClearLockLocal(); + } + + void clearRegion(RegionEventImpl regionEvent, boolean cacheWrite, + RegionVersionVector vector) { + clearRegionLocal(regionEvent, cacheWrite, null); + sendClearRegionMessage(regionEvent, + ClearPartitionedRegionMessage.OperationType.OP_PR_CLEAR); + } + + private void waitForPrimary() { + boolean retry; + int retryTime = 2 * 60 * 1000 /* partitionedRegion.getRetryTimeout() */; Review comment: This timeout should probably not be hard-coded. ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/ClearPartitionedRegion.java ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.CancelException; +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.partition.PartitionRegionHelper; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.MembershipListener; +import org.apache.geode.distributed.internal.ReplyException; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.versions.RegionVersionVector; +import org.apache.geode.logging.internal.log4j.api.LogService; + +public class ClearPartitionedRegion { + + private static final Logger logger = LogService.getLogger(); + + private final PartitionedRegion partitionedRegion; + + private LockForListenerAndClientNotification lockForListenerAndClientNotification = + new LockForListenerAndClientNotification(); + + private volatile boolean membershipchange = false; + + public ClearPartitionedRegion(PartitionedRegion partitionedRegion) { + this.partitionedRegion = partitionedRegion; + partitionedRegion.getDistributionManager() + .addMembershipListener(new ClearPartitionedRegionListener()); + } + + public boolean isLockedForListenerAndClientNotification() { + return lockForListenerAndClientNotification.isLocked(); + } + + void acquireDistributedClearLock(String clearLock) { + try { + partitionedRegion.getPartitionedRegionLockService().lock(clearLock, -1, -1); + } catch (IllegalStateException e) { + partitionedRegion.lockCheckReadiness(); + throw e; + } + } + + void releaseDistributedClearLock(String clearLock) { + try { + partitionedRegion.getPartitionedRegionLockService().unlock(clearLock); + } catch (IllegalStateException e) { + partitionedRegion.lockCheckReadiness(); + } catch (Exception ex) { + logger.warn("Caught exception while unlocking clear distributed lock", ex.getMessage()); + } + } + + void obtainLockForClear(RegionEventImpl event) { + sendClearRegionMessage(event, + ClearPartitionedRegionMessage.OperationType.OP_LOCK_FOR_PR_CLEAR); + obtainClearLockLocal(partitionedRegion.getDistributionManager().getId()); + } + + void releaseLockForClear(RegionEventImpl event) { + sendClearRegionMessage(event, + ClearPartitionedRegionMessage.OperationType.OP_UNLOCK_FOR_PR_CLEAR); + releaseClearLockLocal(); + } + + void clearRegion(RegionEventImpl regionEvent, boolean cacheWrite, + RegionVersionVector vector) { + clearRegionLocal(regionEvent, cacheWrite, null); + sendClearRegionMessage(regionEvent, + ClearPartitionedRegionMessage.OperationType.OP_PR_CLEAR); + } + + private void waitForPrimary() { + boolean retry; + int retryTime = 2 * 60 * 1000 /* partitionedRegion.getRetryTimeout() */; + PartitionedRegion.RetryTimeKeeper retryTimer = new PartitionedRegion.RetryTimeKeeper(retryTime); + do { + retry = false; + for (BucketRegion bucketRegion : partitionedRegion.getDataStore() + .getAllLocalBucketRegions()) { + if (!bucketRegion.getBucketAdvisor().hasPrimary()) { + if (retryTimer.overMaximum()) { + PRHARedundancyProvider.timedOut(partitionedRegion, null, null, + "do clear. Missing primary bucket", + retryTime); + } + retryTimer.waitForBucketsRecovery(); + retry = true; + } + } + } while (retry); + } + + public void clearRegionLocal(RegionEventImpl regionEvent, boolean cacheWrite, + RegionVersionVector vector) { + // Synchronized to handle the requester departure. + synchronized (lockForListenerAndClientNotification) { + if (partitionedRegion.getDataStore() != null) { + partitionedRegion.getDataStore().lockBucketCreationForRegionClear(); + try { + boolean retry; + do { + retry = false; + for (BucketRegion localPrimaryBucketRegion : partitionedRegion.getDataStore() + .getAllLocalPrimaryBucketRegions()) { + if (localPrimaryBucketRegion.size() > 0) { + localPrimaryBucketRegion.clear(); + } + } + if (membershipchange) { + membershipchange = false; + retry = true; + waitForPrimary(); Review comment: It seems possible that we could end up calling `waitForPrimary()` more than once over the course of a clear operation. Given that, might it be a good idea to move the tracking of the timeout to this method rather than the `waitForPrimary()` method? Otherwise we might not go past the timeout in each individual `waitForPrimary()` call but still timeout on the clear operation as a whole. ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java ########## @@ -575,16 +575,23 @@ public void cmnClearRegion(RegionEventImpl regionEvent, boolean cacheWrite, bool // get rvvLock Set<InternalDistributedMember> participants = getCacheDistributionAdvisor().adviseInvalidateRegion(); + boolean isLockedAlready = this.partitionedRegion.getClearPartitionedRegion() + .isLockedForListenerAndClientNotification(); + try { - obtainWriteLocksForClear(regionEvent, participants); + if (!isLockedAlready) { + obtainWriteLocksForClear(regionEvent, participants); + } Review comment: There might exist a small race condition here that the value of `isLockedAlready` changes between the call to `isLockedForListenerAndClientNotification()` and the call to `obtainWriteLocksForClear()`. Is this something that can actually happen? What will happen if `isLockedAlready` is initially false, but before we can call `obtainWriteLocksForClear()`, the actual value becomes true? ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/ClearPartitionedRegion.java ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.CancelException; +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.partition.PartitionRegionHelper; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.MembershipListener; +import org.apache.geode.distributed.internal.ReplyException; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.versions.RegionVersionVector; +import org.apache.geode.logging.internal.log4j.api.LogService; + +public class ClearPartitionedRegion { + + private static final Logger logger = LogService.getLogger(); + + private final PartitionedRegion partitionedRegion; + + private LockForListenerAndClientNotification lockForListenerAndClientNotification = + new LockForListenerAndClientNotification(); + + private volatile boolean membershipchange = false; + + public ClearPartitionedRegion(PartitionedRegion partitionedRegion) { + this.partitionedRegion = partitionedRegion; + partitionedRegion.getDistributionManager() + .addMembershipListener(new ClearPartitionedRegionListener()); + } + + public boolean isLockedForListenerAndClientNotification() { + return lockForListenerAndClientNotification.isLocked(); + } + + void acquireDistributedClearLock(String clearLock) { + try { + partitionedRegion.getPartitionedRegionLockService().lock(clearLock, -1, -1); + } catch (IllegalStateException e) { + partitionedRegion.lockCheckReadiness(); + throw e; + } + } + + void releaseDistributedClearLock(String clearLock) { + try { + partitionedRegion.getPartitionedRegionLockService().unlock(clearLock); + } catch (IllegalStateException e) { + partitionedRegion.lockCheckReadiness(); + } catch (Exception ex) { + logger.warn("Caught exception while unlocking clear distributed lock", ex.getMessage()); + } + } + + void obtainLockForClear(RegionEventImpl event) { + sendClearRegionMessage(event, + ClearPartitionedRegionMessage.OperationType.OP_LOCK_FOR_PR_CLEAR); + obtainClearLockLocal(partitionedRegion.getDistributionManager().getId()); + } + + void releaseLockForClear(RegionEventImpl event) { + sendClearRegionMessage(event, + ClearPartitionedRegionMessage.OperationType.OP_UNLOCK_FOR_PR_CLEAR); + releaseClearLockLocal(); + } + + void clearRegion(RegionEventImpl regionEvent, boolean cacheWrite, + RegionVersionVector vector) { + clearRegionLocal(regionEvent, cacheWrite, null); + sendClearRegionMessage(regionEvent, + ClearPartitionedRegionMessage.OperationType.OP_PR_CLEAR); + } + + private void waitForPrimary() { + boolean retry; + int retryTime = 2 * 60 * 1000 /* partitionedRegion.getRetryTimeout() */; + PartitionedRegion.RetryTimeKeeper retryTimer = new PartitionedRegion.RetryTimeKeeper(retryTime); + do { + retry = false; + for (BucketRegion bucketRegion : partitionedRegion.getDataStore() + .getAllLocalBucketRegions()) { + if (!bucketRegion.getBucketAdvisor().hasPrimary()) { + if (retryTimer.overMaximum()) { + PRHARedundancyProvider.timedOut(partitionedRegion, null, null, + "do clear. Missing primary bucket", + retryTime); + } + retryTimer.waitForBucketsRecovery(); + retry = true; + } + } + } while (retry); + } + + public void clearRegionLocal(RegionEventImpl regionEvent, boolean cacheWrite, + RegionVersionVector vector) { + // Synchronized to handle the requester departure. + synchronized (lockForListenerAndClientNotification) { + if (partitionedRegion.getDataStore() != null) { + partitionedRegion.getDataStore().lockBucketCreationForRegionClear(); + try { + boolean retry; + do { + retry = false; + for (BucketRegion localPrimaryBucketRegion : partitionedRegion.getDataStore() + .getAllLocalPrimaryBucketRegions()) { + if (localPrimaryBucketRegion.size() > 0) { + localPrimaryBucketRegion.clear(); + } + } + if (membershipchange) { + membershipchange = false; + retry = true; + waitForPrimary(); + } + } while (retry); + + doAfterClear(regionEvent); + } finally { + partitionedRegion.getDataStore().unLockBucketCreationForRegionClear(); + } + } else { + // Non data-store with client queue and listener + doAfterClear(regionEvent); + } + } + } + + private void doAfterClear(RegionEventImpl regionEvent) { + if (partitionedRegion.hasAnyClientsInterested()) { + notifyClients(regionEvent); + } + + if (partitionedRegion.hasListener()) { + partitionedRegion.dispatchListenerEvent(EnumListenerEvent.AFTER_REGION_CLEAR, regionEvent); + } + } + + void notifyClients(RegionEventImpl event) { + // Set client routing information into the event + // The clear operation in case of PR is distributed differently + // hence the FilterRoutingInfo is set here instead of + // DistributedCacheOperation.distribute(). + event.setEventType(EnumListenerEvent.AFTER_REGION_CLEAR); + if (!partitionedRegion.isUsedForMetaRegion() && !partitionedRegion + .isUsedForPartitionedRegionAdmin() + && !partitionedRegion.isUsedForPartitionedRegionBucket() && !partitionedRegion + .isUsedForParallelGatewaySenderQueue()) { + + FilterRoutingInfo localCqFrInfo = + partitionedRegion.getFilterProfile().getFilterRoutingInfoPart1(event, + FilterProfile.NO_PROFILES, Collections.emptySet()); + + FilterRoutingInfo localCqInterestFrInfo = + partitionedRegion.getFilterProfile().getFilterRoutingInfoPart2(localCqFrInfo, event); + + if (localCqInterestFrInfo != null) { + event.setLocalFilterInfo(localCqInterestFrInfo.getLocalFilterInfo()); + } + } + partitionedRegion.notifyBridgeClients(event); + } + + protected void obtainClearLockLocal(InternalDistributedMember requester) { + synchronized (lockForListenerAndClientNotification) { + // Check if the member is still part of the distributed system + if (!partitionedRegion.getDistributionManager().isCurrentMember(requester)) { + return; + } + + lockForListenerAndClientNotification.setLocked(requester); + if (partitionedRegion.getDataStore() != null) { + for (BucketRegion localPrimaryBucketRegion : partitionedRegion.getDataStore() + .getAllLocalPrimaryBucketRegions()) { + try { + localPrimaryBucketRegion.lockLocallyForClear(partitionedRegion.getDistributionManager(), + partitionedRegion.getMyId(), null); + } catch (Exception ex) { + partitionedRegion.checkClosed(); + } + } + } + } + } + + protected void releaseClearLockLocal() { + synchronized (lockForListenerAndClientNotification) { + if (lockForListenerAndClientNotification.getLockRequester() == null) { + // The member has been left. + return; + } + try { + if (partitionedRegion.getDataStore() != null) { + + for (BucketRegion localPrimaryBucketRegion : partitionedRegion.getDataStore() + .getAllLocalPrimaryBucketRegions()) { + try { + localPrimaryBucketRegion.releaseLockLocallyForClear(null); + } catch (Exception ex) { + logger.debug( + "Unable to acquire clear lock for bucket region " + localPrimaryBucketRegion + .getName(), + ex.getMessage()); + partitionedRegion.checkClosed(); + } + } + } + } finally { + lockForListenerAndClientNotification.setUnLocked(); + } + } + } + + private void sendClearRegionMessage(RegionEventImpl event, + ClearPartitionedRegionMessage.OperationType op) { + RegionEventImpl eventForLocalClear = (RegionEventImpl) event.clone(); + eventForLocalClear.setOperation(Operation.REGION_LOCAL_CLEAR); + + boolean retry = true; + while (retry) { + retry = attemptToSendClearRegionMessage(event, op); + } + } + + private boolean attemptToSendClearRegionMessage(RegionEventImpl event, + ClearPartitionedRegionMessage.OperationType op) { + if (partitionedRegion.getPRRoot() == null) { + if (logger.isDebugEnabled()) { + logger.debug( + "Partition region {} failed to initialize. Remove its profile from remote members.", + this); Review comment: For this log output, the `this` should probably be replaced with `partitionedRegion`. ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/ClearPartitionedRegionMessage.java ########## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.cache; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Set; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.DataSerializer; +import org.apache.geode.cache.CacheException; +import org.apache.geode.cache.Operation; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.distributed.internal.ReplyProcessor21; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.Assert; +import org.apache.geode.internal.cache.partitioned.PartitionMessage; +import org.apache.geode.internal.serialization.DeserializationContext; +import org.apache.geode.internal.serialization.SerializationContext; +import org.apache.geode.logging.internal.log4j.api.LogService; + +public class ClearPartitionedRegionMessage extends PartitionMessage { + private static final Logger logger = LogService.getLogger(); + + public static enum OperationType { Review comment: The `static` here is not necessary. ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/ClearPartitionedRegionMessage.java ########## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.cache; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Set; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.DataSerializer; +import org.apache.geode.cache.CacheException; +import org.apache.geode.cache.Operation; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.distributed.internal.ReplyProcessor21; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.Assert; +import org.apache.geode.internal.cache.partitioned.PartitionMessage; +import org.apache.geode.internal.serialization.DeserializationContext; +import org.apache.geode.internal.serialization.SerializationContext; +import org.apache.geode.logging.internal.log4j.api.LogService; + +public class ClearPartitionedRegionMessage extends PartitionMessage { + private static final Logger logger = LogService.getLogger(); + + public static enum OperationType { + OP_LOCK_FOR_PR_CLEAR, OP_UNLOCK_FOR_PR_CLEAR, OP_PR_CLEAR, + } + + private Object cbArg; + + private OperationType op; + + private EventID eventID; + + private PartitionedRegion partitionedRegion; + + private Set<InternalDistributedMember> recipients; + + @Override + public EventID getEventID() { + return eventID; + } + + public ClearPartitionedRegionMessage() {} + + ClearPartitionedRegionMessage(Set recipients, PartitionedRegion region, + ReplyProcessor21 processor, ClearPartitionedRegionMessage.OperationType operationType, + final RegionEventImpl event) { + super(recipients, region.getPRId(), processor); + this.recipients = recipients; + partitionedRegion = region; + op = operationType; + cbArg = event.getRawCallbackArgument(); + eventID = event.getEventId(); + } + + public OperationType getOp() { + return op; + } + + public void send() { + Assert.assertTrue(recipients != null, "ClearMessage NULL recipients set"); + setTransactionDistributed(partitionedRegion.getCache().getTxManager().isDistributed()); + partitionedRegion.getDistributionManager().putOutgoing(this); + } + + @Override + protected Throwable processCheckForPR(PartitionedRegion pr, + DistributionManager distributionManager) { + if (pr != null && !pr.getDistributionAdvisor().isInitialized()) { + Throwable thr = new ForceReattemptException( + String.format("%s : could not find partitioned region with Id %s", + distributionManager.getDistributionManagerId(), + pr.getRegionIdentifier())); + return thr; + } + return null; + } + + + @Override + protected boolean operateOnPartitionedRegion(ClusterDistributionManager dm, PartitionedRegion r, + long startTime) throws CacheException { + + if (r == null) { + return true; + } + + if (r.isDestroyed()) { + return true; + } + + if (op == OperationType.OP_LOCK_FOR_PR_CLEAR) { + r.getClearPartitionedRegion().obtainClearLockLocal(getSender()); + } else if (op == OperationType.OP_UNLOCK_FOR_PR_CLEAR) { + r.getClearPartitionedRegion().releaseClearLockLocal(); + } else { + RegionEventImpl event = + new RegionEventImpl(r, Operation.REGION_CLEAR, this.cbArg, true, r.getMyId(), + getEventID()); + r.getClearPartitionedRegion().clearRegionLocal(event, false, null); + } + return true; + } + + @Override + protected void appendFields(StringBuilder buff) { + super.appendFields(buff); + buff.append("; cbArg=").append(this.cbArg).append("; op=").append(this.op); + } + + @Override + public int getDSFID() { + return CLEAR_PARTITIONED_REGION_MESSAGE; + } + + @Override + public void fromData(DataInput in, + DeserializationContext context) throws IOException, ClassNotFoundException { + super.fromData(in, context); + this.cbArg = DataSerializer.readObject(in); + op = ClearPartitionedRegionMessage.OperationType.values()[in.readByte()]; + eventID = DataSerializer.readObject(in); + } + + @Override + public void toData(DataOutput out, + SerializationContext context) throws IOException { + super.toData(out, context); + DataSerializer.writeObject(this.cbArg, out); + out.writeByte(op.ordinal()); + DataSerializer.writeObject(eventID, out); + } + + /** + * The response on which to wait for all the replies. This response ignores any exceptions + * received from the "far side" + * + * @since GemFire 5.0 Review comment: This line should be removed, or updated. ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/ClearPartitionedRegionMessage.java ########## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.cache; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Set; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.DataSerializer; +import org.apache.geode.cache.CacheException; +import org.apache.geode.cache.Operation; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.distributed.internal.ReplyProcessor21; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.Assert; +import org.apache.geode.internal.cache.partitioned.PartitionMessage; +import org.apache.geode.internal.serialization.DeserializationContext; +import org.apache.geode.internal.serialization.SerializationContext; +import org.apache.geode.logging.internal.log4j.api.LogService; + +public class ClearPartitionedRegionMessage extends PartitionMessage { + private static final Logger logger = LogService.getLogger(); Review comment: This logger is never used in this class. ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/ClearPartitionedRegion.java ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.CancelException; +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.partition.PartitionRegionHelper; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.MembershipListener; +import org.apache.geode.distributed.internal.ReplyException; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.versions.RegionVersionVector; +import org.apache.geode.logging.internal.log4j.api.LogService; + +public class ClearPartitionedRegion { + + private static final Logger logger = LogService.getLogger(); + + private final PartitionedRegion partitionedRegion; + + private LockForListenerAndClientNotification lockForListenerAndClientNotification = + new LockForListenerAndClientNotification(); + + private volatile boolean membershipchange = false; + + public ClearPartitionedRegion(PartitionedRegion partitionedRegion) { + this.partitionedRegion = partitionedRegion; + partitionedRegion.getDistributionManager() + .addMembershipListener(new ClearPartitionedRegionListener()); + } + + public boolean isLockedForListenerAndClientNotification() { + return lockForListenerAndClientNotification.isLocked(); + } + + void acquireDistributedClearLock(String clearLock) { + try { + partitionedRegion.getPartitionedRegionLockService().lock(clearLock, -1, -1); + } catch (IllegalStateException e) { + partitionedRegion.lockCheckReadiness(); + throw e; + } + } + + void releaseDistributedClearLock(String clearLock) { + try { + partitionedRegion.getPartitionedRegionLockService().unlock(clearLock); + } catch (IllegalStateException e) { + partitionedRegion.lockCheckReadiness(); + } catch (Exception ex) { + logger.warn("Caught exception while unlocking clear distributed lock", ex.getMessage()); + } + } + + void obtainLockForClear(RegionEventImpl event) { + sendClearRegionMessage(event, + ClearPartitionedRegionMessage.OperationType.OP_LOCK_FOR_PR_CLEAR); + obtainClearLockLocal(partitionedRegion.getDistributionManager().getId()); + } + + void releaseLockForClear(RegionEventImpl event) { + sendClearRegionMessage(event, + ClearPartitionedRegionMessage.OperationType.OP_UNLOCK_FOR_PR_CLEAR); + releaseClearLockLocal(); + } + + void clearRegion(RegionEventImpl regionEvent, boolean cacheWrite, + RegionVersionVector vector) { + clearRegionLocal(regionEvent, cacheWrite, null); + sendClearRegionMessage(regionEvent, + ClearPartitionedRegionMessage.OperationType.OP_PR_CLEAR); + } + + private void waitForPrimary() { + boolean retry; + int retryTime = 2 * 60 * 1000 /* partitionedRegion.getRetryTimeout() */; + PartitionedRegion.RetryTimeKeeper retryTimer = new PartitionedRegion.RetryTimeKeeper(retryTime); + do { + retry = false; + for (BucketRegion bucketRegion : partitionedRegion.getDataStore() + .getAllLocalBucketRegions()) { + if (!bucketRegion.getBucketAdvisor().hasPrimary()) { + if (retryTimer.overMaximum()) { + PRHARedundancyProvider.timedOut(partitionedRegion, null, null, + "do clear. Missing primary bucket", + retryTime); + } + retryTimer.waitForBucketsRecovery(); + retry = true; + } + } + } while (retry); + } + + public void clearRegionLocal(RegionEventImpl regionEvent, boolean cacheWrite, + RegionVersionVector vector) { + // Synchronized to handle the requester departure. + synchronized (lockForListenerAndClientNotification) { + if (partitionedRegion.getDataStore() != null) { + partitionedRegion.getDataStore().lockBucketCreationForRegionClear(); + try { + boolean retry; + do { + retry = false; + for (BucketRegion localPrimaryBucketRegion : partitionedRegion.getDataStore() + .getAllLocalPrimaryBucketRegions()) { + if (localPrimaryBucketRegion.size() > 0) { + localPrimaryBucketRegion.clear(); + } + } + if (membershipchange) { + membershipchange = false; + retry = true; + waitForPrimary(); + } + } while (retry); + + doAfterClear(regionEvent); + } finally { + partitionedRegion.getDataStore().unLockBucketCreationForRegionClear(); + } + } else { + // Non data-store with client queue and listener + doAfterClear(regionEvent); + } + } + } + + private void doAfterClear(RegionEventImpl regionEvent) { + if (partitionedRegion.hasAnyClientsInterested()) { + notifyClients(regionEvent); + } + + if (partitionedRegion.hasListener()) { + partitionedRegion.dispatchListenerEvent(EnumListenerEvent.AFTER_REGION_CLEAR, regionEvent); + } + } + + void notifyClients(RegionEventImpl event) { + // Set client routing information into the event + // The clear operation in case of PR is distributed differently + // hence the FilterRoutingInfo is set here instead of + // DistributedCacheOperation.distribute(). + event.setEventType(EnumListenerEvent.AFTER_REGION_CLEAR); + if (!partitionedRegion.isUsedForMetaRegion() && !partitionedRegion + .isUsedForPartitionedRegionAdmin() + && !partitionedRegion.isUsedForPartitionedRegionBucket() && !partitionedRegion + .isUsedForParallelGatewaySenderQueue()) { + + FilterRoutingInfo localCqFrInfo = + partitionedRegion.getFilterProfile().getFilterRoutingInfoPart1(event, + FilterProfile.NO_PROFILES, Collections.emptySet()); + + FilterRoutingInfo localCqInterestFrInfo = + partitionedRegion.getFilterProfile().getFilterRoutingInfoPart2(localCqFrInfo, event); + + if (localCqInterestFrInfo != null) { + event.setLocalFilterInfo(localCqInterestFrInfo.getLocalFilterInfo()); + } + } + partitionedRegion.notifyBridgeClients(event); + } + + protected void obtainClearLockLocal(InternalDistributedMember requester) { + synchronized (lockForListenerAndClientNotification) { + // Check if the member is still part of the distributed system + if (!partitionedRegion.getDistributionManager().isCurrentMember(requester)) { + return; + } + + lockForListenerAndClientNotification.setLocked(requester); + if (partitionedRegion.getDataStore() != null) { + for (BucketRegion localPrimaryBucketRegion : partitionedRegion.getDataStore() + .getAllLocalPrimaryBucketRegions()) { + try { + localPrimaryBucketRegion.lockLocallyForClear(partitionedRegion.getDistributionManager(), + partitionedRegion.getMyId(), null); + } catch (Exception ex) { + partitionedRegion.checkClosed(); + } + } + } + } + } + + protected void releaseClearLockLocal() { + synchronized (lockForListenerAndClientNotification) { + if (lockForListenerAndClientNotification.getLockRequester() == null) { + // The member has been left. + return; + } + try { + if (partitionedRegion.getDataStore() != null) { + + for (BucketRegion localPrimaryBucketRegion : partitionedRegion.getDataStore() + .getAllLocalPrimaryBucketRegions()) { + try { + localPrimaryBucketRegion.releaseLockLocallyForClear(null); + } catch (Exception ex) { + logger.debug( + "Unable to acquire clear lock for bucket region " + localPrimaryBucketRegion + .getName(), + ex.getMessage()); + partitionedRegion.checkClosed(); + } + } + } + } finally { + lockForListenerAndClientNotification.setUnLocked(); + } + } + } + + private void sendClearRegionMessage(RegionEventImpl event, + ClearPartitionedRegionMessage.OperationType op) { + RegionEventImpl eventForLocalClear = (RegionEventImpl) event.clone(); + eventForLocalClear.setOperation(Operation.REGION_LOCAL_CLEAR); + + boolean retry = true; + while (retry) { + retry = attemptToSendClearRegionMessage(event, op); + } + } + + private boolean attemptToSendClearRegionMessage(RegionEventImpl event, + ClearPartitionedRegionMessage.OperationType op) { + if (partitionedRegion.getPRRoot() == null) { + if (logger.isDebugEnabled()) { + logger.debug( + "Partition region {} failed to initialize. Remove its profile from remote members.", + this); + } + new UpdateAttributesProcessor(partitionedRegion, true).distribute(false); + return false; + } + final HashSet configRecipients = + new HashSet(partitionedRegion.getRegionAdvisor().adviseAllPRNodes()); + + // It's possible this instance has not been initialized + // or hasn't gotten through initialize() far enough to have + // sent a CreateRegionProcessor message, bug 36048 + try { + final PartitionRegionConfig prConfig = + partitionedRegion.getPRRoot().get(partitionedRegion.getRegionIdentifier()); + + if (prConfig != null) { + // Fix for bug#34621 by Tushar + Iterator itr = prConfig.getNodes().iterator(); + while (itr.hasNext()) { + InternalDistributedMember idm = ((Node) itr.next()).getMemberId(); + if (!idm.equals(partitionedRegion.getMyId())) { + configRecipients.add(idm); + } + } + } + } catch (CancelException ignore) { + // ignore + } + + try { + ClearPartitionedRegionMessage.ClearPartitionedRegionResponse resp = + new ClearPartitionedRegionMessage.ClearPartitionedRegionResponse( + partitionedRegion.getSystem(), + configRecipients); + ClearPartitionedRegionMessage clearPartitionedRegionMessage = + new ClearPartitionedRegionMessage(configRecipients, partitionedRegion, resp, op, event); + clearPartitionedRegionMessage.send(); + + resp.waitForRepliesUninterruptibly(); + + } catch (ReplyException e) { + if (e.getRootCause() instanceof ForceReattemptException) { + return true; + } + logger.warn( + "PartitionedRegion#sendClearRegionMessage: Caught exception during ClearRegionMessage send and waiting for response", + e); + } + return false; + } + + void doClear(RegionEventImpl regionEvent, boolean cacheWrite, + PartitionedRegion partitionedRegion) { + String lockName = "_clearOperation" + partitionedRegion.getDisplayName(); Review comment: The "_clearOperation" String should be extracted to a constant, since it's used in multiple places. ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/ClearPartitionedRegion.java ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.CancelException; +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.partition.PartitionRegionHelper; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.MembershipListener; +import org.apache.geode.distributed.internal.ReplyException; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.versions.RegionVersionVector; +import org.apache.geode.logging.internal.log4j.api.LogService; + +public class ClearPartitionedRegion { + + private static final Logger logger = LogService.getLogger(); + + private final PartitionedRegion partitionedRegion; + + private LockForListenerAndClientNotification lockForListenerAndClientNotification = + new LockForListenerAndClientNotification(); + + private volatile boolean membershipchange = false; + + public ClearPartitionedRegion(PartitionedRegion partitionedRegion) { + this.partitionedRegion = partitionedRegion; + partitionedRegion.getDistributionManager() + .addMembershipListener(new ClearPartitionedRegionListener()); + } + + public boolean isLockedForListenerAndClientNotification() { + return lockForListenerAndClientNotification.isLocked(); + } + + void acquireDistributedClearLock(String clearLock) { + try { + partitionedRegion.getPartitionedRegionLockService().lock(clearLock, -1, -1); + } catch (IllegalStateException e) { + partitionedRegion.lockCheckReadiness(); + throw e; + } + } + + void releaseDistributedClearLock(String clearLock) { + try { + partitionedRegion.getPartitionedRegionLockService().unlock(clearLock); + } catch (IllegalStateException e) { + partitionedRegion.lockCheckReadiness(); + } catch (Exception ex) { + logger.warn("Caught exception while unlocking clear distributed lock", ex.getMessage()); + } + } + + void obtainLockForClear(RegionEventImpl event) { + sendClearRegionMessage(event, + ClearPartitionedRegionMessage.OperationType.OP_LOCK_FOR_PR_CLEAR); + obtainClearLockLocal(partitionedRegion.getDistributionManager().getId()); + } + + void releaseLockForClear(RegionEventImpl event) { + sendClearRegionMessage(event, + ClearPartitionedRegionMessage.OperationType.OP_UNLOCK_FOR_PR_CLEAR); + releaseClearLockLocal(); + } + + void clearRegion(RegionEventImpl regionEvent, boolean cacheWrite, + RegionVersionVector vector) { + clearRegionLocal(regionEvent, cacheWrite, null); + sendClearRegionMessage(regionEvent, + ClearPartitionedRegionMessage.OperationType.OP_PR_CLEAR); + } + + private void waitForPrimary() { + boolean retry; + int retryTime = 2 * 60 * 1000 /* partitionedRegion.getRetryTimeout() */; + PartitionedRegion.RetryTimeKeeper retryTimer = new PartitionedRegion.RetryTimeKeeper(retryTime); + do { + retry = false; + for (BucketRegion bucketRegion : partitionedRegion.getDataStore() + .getAllLocalBucketRegions()) { + if (!bucketRegion.getBucketAdvisor().hasPrimary()) { + if (retryTimer.overMaximum()) { + PRHARedundancyProvider.timedOut(partitionedRegion, null, null, + "do clear. Missing primary bucket", + retryTime); + } + retryTimer.waitForBucketsRecovery(); + retry = true; + } + } + } while (retry); + } + + public void clearRegionLocal(RegionEventImpl regionEvent, boolean cacheWrite, + RegionVersionVector vector) { Review comment: The `cacheWrite` and `vector` arguments are currently not used in this method. Can they be removed from the signature, or will they be used in future? ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/ClearPartitionedRegionMessage.java ########## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.cache; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Set; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.DataSerializer; +import org.apache.geode.cache.CacheException; +import org.apache.geode.cache.Operation; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.distributed.internal.ReplyProcessor21; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.Assert; +import org.apache.geode.internal.cache.partitioned.PartitionMessage; +import org.apache.geode.internal.serialization.DeserializationContext; +import org.apache.geode.internal.serialization.SerializationContext; +import org.apache.geode.logging.internal.log4j.api.LogService; + +public class ClearPartitionedRegionMessage extends PartitionMessage { + private static final Logger logger = LogService.getLogger(); + + public static enum OperationType { + OP_LOCK_FOR_PR_CLEAR, OP_UNLOCK_FOR_PR_CLEAR, OP_PR_CLEAR, + } + + private Object cbArg; + + private OperationType op; + + private EventID eventID; + + private PartitionedRegion partitionedRegion; + + private Set<InternalDistributedMember> recipients; + + @Override + public EventID getEventID() { + return eventID; + } + + public ClearPartitionedRegionMessage() {} + + ClearPartitionedRegionMessage(Set recipients, PartitionedRegion region, + ReplyProcessor21 processor, ClearPartitionedRegionMessage.OperationType operationType, + final RegionEventImpl event) { + super(recipients, region.getPRId(), processor); + this.recipients = recipients; + partitionedRegion = region; + op = operationType; + cbArg = event.getRawCallbackArgument(); + eventID = event.getEventId(); + } + + public OperationType getOp() { + return op; + } + + public void send() { + Assert.assertTrue(recipients != null, "ClearMessage NULL recipients set"); + setTransactionDistributed(partitionedRegion.getCache().getTxManager().isDistributed()); + partitionedRegion.getDistributionManager().putOutgoing(this); + } + + @Override + protected Throwable processCheckForPR(PartitionedRegion pr, + DistributionManager distributionManager) { + if (pr != null && !pr.getDistributionAdvisor().isInitialized()) { + Throwable thr = new ForceReattemptException( + String.format("%s : could not find partitioned region with Id %s", + distributionManager.getDistributionManagerId(), + pr.getRegionIdentifier())); + return thr; + } + return null; + } + + + @Override + protected boolean operateOnPartitionedRegion(ClusterDistributionManager dm, PartitionedRegion r, + long startTime) throws CacheException { + + if (r == null) { + return true; + } + + if (r.isDestroyed()) { + return true; + } + + if (op == OperationType.OP_LOCK_FOR_PR_CLEAR) { + r.getClearPartitionedRegion().obtainClearLockLocal(getSender()); + } else if (op == OperationType.OP_UNLOCK_FOR_PR_CLEAR) { + r.getClearPartitionedRegion().releaseClearLockLocal(); + } else { + RegionEventImpl event = + new RegionEventImpl(r, Operation.REGION_CLEAR, this.cbArg, true, r.getMyId(), + getEventID()); + r.getClearPartitionedRegion().clearRegionLocal(event, false, null); + } + return true; + } + + @Override + protected void appendFields(StringBuilder buff) { + super.appendFields(buff); + buff.append("; cbArg=").append(this.cbArg).append("; op=").append(this.op); Review comment: For consistency with the format of `ParitionMessage.toString()`, either these strings should not have semicolons, or `PartitionMessage` should be modified to have semicolon-separated output. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Partitioned Region clear operations must invoke cache level listeners > --------------------------------------------------------------------- > > Key: GEODE-7678 > URL: https://issues.apache.org/jira/browse/GEODE-7678 > Project: Geode > Issue Type: Sub-task > Components: regions > Reporter: Nabarun Nag > Priority: Major > Labels: GeodeCommons > > Clear operations are successful and CacheListener.afterRegionClear(), > CacheWriter.beforeRegionClear() are invoked. > > Acceptance : > * DUnit tests validating the above behavior. > * Test coverage to when a member departs in this scenario > * Test coverage to when a member restarts in this scenario > * Unit tests with complete code coverage for the newly written code. > -- This message was sent by Atlassian Jira (v8.3.4#803005)