jujoramos commented on a change in pull request #4970: URL: https://github.com/apache/geode/pull/4970#discussion_r411996029
########## File path: geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithExpirationDUnitTest.java ########## @@ -0,0 +1,535 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static org.apache.geode.cache.ExpirationAction.DESTROY; +import static org.apache.geode.cache.ExpirationAction.INVALIDATE; +import static org.apache.geode.cache.RegionShortcut.PARTITION; +import static org.apache.geode.cache.RegionShortcut.PARTITION_OVERFLOW; +import static org.apache.geode.cache.RegionShortcut.PARTITION_PERSISTENT; +import static org.apache.geode.cache.RegionShortcut.PARTITION_PERSISTENT_OVERFLOW; +import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT; +import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT_OVERFLOW; +import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT_PERSISTENT; +import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT_PERSISTENT_OVERFLOW; +import static org.apache.geode.internal.util.ArrayUtils.asList; +import static org.apache.geode.test.awaitility.GeodeAwaitility.await; +import static org.apache.geode.test.dunit.VM.getVM; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; + +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; +import junitparams.naming.TestCaseName; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.apache.geode.ForcedDisconnectException; +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.CacheWriter; +import org.apache.geode.cache.CacheWriterException; +import org.apache.geode.cache.ExpirationAction; +import org.apache.geode.cache.ExpirationAttributes; +import org.apache.geode.cache.PartitionAttributes; +import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionEvent; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.partition.PartitionRegionHelper; +import org.apache.geode.cache.util.CacheWriterAdapter; +import org.apache.geode.distributed.DistributedSystemDisconnectedException; +import org.apache.geode.distributed.internal.DMStats; +import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.distributed.internal.membership.api.MembershipManagerHelper; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.dunit.rules.CacheRule; +import org.apache.geode.test.dunit.rules.DistributedDiskDirRule; +import org.apache.geode.test.dunit.rules.DistributedRule; + +/** + * Tests to verify that {@link PartitionedRegion#clear()} cancels all remaining expiration tasks + * on the {@link PartitionedRegion} once the operation is executed. + */ +@RunWith(JUnitParamsRunner.class) +public class PartitionedRegionClearWithExpirationDUnitTest implements Serializable { + private static final Integer BUCKETS = 13; + private static final Integer EXPIRATION_TIME = 30; + private static final String REGION_NAME = "PartitionedRegion"; + private static final String TEST_CASE_NAME = + "[{index}] {method}(Coordinator:{0}, RegionType:{1}, ExpirationAction:{2})"; + + @Rule + public DistributedRule distributedRule = new DistributedRule(3); + + @Rule + public CacheRule cacheRule = CacheRule.builder().createCacheInAll().build(); + + @Rule + public DistributedDiskDirRule distributedDiskDirRule = new DistributedDiskDirRule(); + + private VM accessor, server1, server2; + + private enum TestVM { + ACCESSOR(0), SERVER1(1), SERVER2(2); + + final int vmNumber; + + TestVM(int vmNumber) { + this.vmNumber = vmNumber; + } + } + + @SuppressWarnings("unused") + static RegionShortcut[] regionTypes() { + return new RegionShortcut[] { + PARTITION, + PARTITION_OVERFLOW, + PARTITION_REDUNDANT, + PARTITION_REDUNDANT_OVERFLOW, + + PARTITION_PERSISTENT, + PARTITION_PERSISTENT_OVERFLOW, + PARTITION_REDUNDANT_PERSISTENT, + PARTITION_REDUNDANT_PERSISTENT_OVERFLOW + }; + } + + @SuppressWarnings("unused") + static Object[] regionTypesAndExpirationActions() { + ArrayList<Object[]> parameters = new ArrayList<>(); + RegionShortcut[] regionShortcuts = regionTypes(); + + Arrays.stream(regionShortcuts).forEach(regionShortcut -> { + parameters.add(new Object[] {regionShortcut, DESTROY}); + parameters.add(new Object[] {regionShortcut, INVALIDATE}); + }); + + return parameters.toArray(); + } + + @SuppressWarnings("unused") + static Object[] vmsRegionTypesAndExpirationActions() { + ArrayList<Object[]> parameters = new ArrayList<>(); + RegionShortcut[] regionShortcuts = regionTypes(); + + Arrays.stream(regionShortcuts).forEach(regionShortcut -> { + parameters.add(new Object[] {TestVM.SERVER1, regionShortcut, DESTROY}); + parameters.add(new Object[] {TestVM.SERVER1, regionShortcut, INVALIDATE}); + parameters.add(new Object[] {TestVM.ACCESSOR, regionShortcut, DESTROY}); + parameters.add(new Object[] {TestVM.ACCESSOR, regionShortcut, INVALIDATE}); + }); + + return parameters.toArray(); + } + + @Before + public void setUp() throws Exception { + server1 = getVM(TestVM.SERVER1.vmNumber); + server2 = getVM(TestVM.SERVER2.vmNumber); + accessor = getVM(TestVM.ACCESSOR.vmNumber); + } + + private RegionShortcut getRegionAccessorShortcut(RegionShortcut dataStoreRegionShortcut) { + if (dataStoreRegionShortcut.isPersistent()) { + switch (dataStoreRegionShortcut) { + case PARTITION_PERSISTENT: + return PARTITION; + case PARTITION_PERSISTENT_OVERFLOW: + return PARTITION_OVERFLOW; + case PARTITION_REDUNDANT_PERSISTENT: + return PARTITION_REDUNDANT; + case PARTITION_REDUNDANT_PERSISTENT_OVERFLOW: + return PARTITION_REDUNDANT_OVERFLOW; + } + } + + return dataStoreRegionShortcut; + } + + private void initAccessor(RegionShortcut regionShortcut, + ExpirationAttributes expirationAttributes) { + RegionShortcut accessorShortcut = getRegionAccessorShortcut(regionShortcut); + PartitionAttributes<String, String> attributes = + new PartitionAttributesFactory<String, String>() + .setTotalNumBuckets(BUCKETS) + .setLocalMaxMemory(0) + .create(); + + cacheRule.getCache() + .<String, String>createRegionFactory(accessorShortcut) + .setPartitionAttributes(attributes) + .setEntryTimeToLive(expirationAttributes) + .setEntryIdleTimeout(expirationAttributes) + .create(REGION_NAME); + } + + private void initDataStore(RegionShortcut regionShortcut, + ExpirationAttributes expirationAttributes) { + PartitionAttributes<String, String> attributes = + new PartitionAttributesFactory<String, String>() + .setTotalNumBuckets(BUCKETS) + .create(); + + cacheRule.getCache() + .<String, String>createRegionFactory(regionShortcut) + .setPartitionAttributes(attributes) + .setEntryTimeToLive(expirationAttributes) + .setEntryIdleTimeout(expirationAttributes) + .create(REGION_NAME); + + ExpiryTask.expiryTaskListener = new ExpirationListener(); + } + + private void parametrizedSetup(RegionShortcut regionShortcut, + ExpirationAttributes expirationAttributes) { + server1.invoke(() -> initDataStore(regionShortcut, expirationAttributes)); + server2.invoke(() -> initDataStore(regionShortcut, expirationAttributes)); + accessor.invoke(() -> initAccessor(regionShortcut, expirationAttributes)); + } + + private void waitForSilence() { + DMStats dmStats = cacheRule.getSystem().getDistributionManager().getStats(); + PartitionedRegion region = (PartitionedRegion) cacheRule.getCache().getRegion(REGION_NAME); + PartitionedRegionStats partitionedRegionStats = region.getPrStats(); + + await().untilAsserted(() -> { + assertThat(dmStats.getReplyWaitsInProgress()).isEqualTo(0); + assertThat(partitionedRegionStats.getVolunteeringInProgress()).isEqualTo(0); + assertThat(partitionedRegionStats.getBucketCreatesInProgress()).isEqualTo(0); + assertThat(partitionedRegionStats.getPrimaryTransfersInProgress()).isEqualTo(0); + assertThat(partitionedRegionStats.getRebalanceBucketCreatesInProgress()).isEqualTo(0); + assertThat(partitionedRegionStats.getRebalancePrimaryTransfersInProgress()).isEqualTo(0); + }); + } + + /** + * Populates the region and verifies the data on the selected VMs. + */ + private void populateRegion(VM feeder, int entryCount, List<VM> vms) { + feeder.invoke(() -> { + Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME); + IntStream.range(0, entryCount).forEach(i -> region.put(String.valueOf(i), "Value_" + i)); + }); + + vms.forEach(vm -> vm.invoke(() -> { + waitForSilence(); + Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME); + + IntStream.range(0, entryCount) + .forEach(i -> assertThat(region.get(String.valueOf(i))).isEqualTo("Value_" + i)); + })); + } + + /** + * Asserts that the region is empty on requested VMs. + */ + private void assertRegionIsEmpty(List<VM> vms) { + vms.forEach(vm -> vm.invoke(() -> { + waitForSilence(); + PartitionedRegion region = (PartitionedRegion) cacheRule.getCache().getRegion(REGION_NAME); + + assertThat(region.getLocalSize()).isEqualTo(0); + })); + } + + /** + * Asserts that the region data is consistent across buckets. + */ + private void assertRegionBucketsConsistency() throws ForceReattemptException { + waitForSilence(); + List<BucketDump> bucketDumps; + PartitionedRegion region = (PartitionedRegion) cacheRule.getCache().getRegion(REGION_NAME); + // Redundant copies + 1 primary. + int expectedCopies = region.getRedundantCopies() + 1; + + for (int bucketId = 0; bucketId < BUCKETS; bucketId++) { + bucketDumps = region.getAllBucketEntries(bucketId); + assertThat(bucketDumps.size()).as("Bucket " + bucketId + " should have " + expectedCopies + + " copies, but has " + bucketDumps.size()).isEqualTo(expectedCopies); + + // Check that all copies of the bucket have the same data. + if (bucketDumps.size() > 1) { + BucketDump firstDump = bucketDumps.get(0); + + for (int j = 1; j < bucketDumps.size(); j++) { + BucketDump otherDump = bucketDumps.get(j); + assertThat(otherDump.getValues()) + .as("Values for bucket " + bucketId + " on member " + otherDump.getMember() + + " are not consistent with member " + firstDump.getMember()) + .isEqualTo(firstDump.getValues()); + assertThat(otherDump.getVersions()) + .as("Versions for bucket " + bucketId + " on member " + otherDump.getMember() + + " are not consistent with member " + firstDump.getMember()) + .isEqualTo(firstDump.getVersions()); + } + } + } + } + + /** + * Register the MemberKiller CacheWriter on the given vms and cancel auto-reconnects. + */ + private void registerVMKillerAsCacheWriter(List<VM> vmsToBounce) { + vmsToBounce.forEach(vm -> vm.invoke(() -> { + Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME); + region.getAttributesMutator().setCacheWriter(new MemberKiller()); + })); + } + + /** + * The test does the following (clear coordinator and expiration action are parametrized): + * - Populates the Partition Region (entries have expiration). + * - Verifies that the entries are synchronized on all members. + * - Clears the Partition Region once. + * - Asserts that, after the clear is finished: + * . No expiration tasks were executed. + * . All expiration tasks were cancelled. + * . Map of expiry tasks per bucket is empty. + * . The Partition Region is empty on all members. + */ + @Test + @TestCaseName(TEST_CASE_NAME) + @Parameters(method = "vmsRegionTypesAndExpirationActions") + public void clearShouldRemoveRegisteredExpirationTasks(TestVM coordinatorVM, + RegionShortcut regionShortcut, ExpirationAction expirationAction) { + final int entries = 500; + parametrizedSetup(regionShortcut, new ExpirationAttributes(EXPIRATION_TIME, expirationAction)); + populateRegion(accessor, entries, asList(accessor, server1, server2)); Review comment: The `populateRegion` methods uses the `accessor` to insert data into the region and, afterwards, it verifies that the entries are in sync in all three VMs. It's true that only `server1` and `server2` are needed as those are the only members hosting data, but I prefer to use all members for verification purposes (the `accessor` ultimately gets the data from other members, but it's harmless to execute the verification anyway). ########## File path: geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithExpirationDUnitTest.java ########## @@ -0,0 +1,535 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static org.apache.geode.cache.ExpirationAction.DESTROY; +import static org.apache.geode.cache.ExpirationAction.INVALIDATE; +import static org.apache.geode.cache.RegionShortcut.PARTITION; +import static org.apache.geode.cache.RegionShortcut.PARTITION_OVERFLOW; +import static org.apache.geode.cache.RegionShortcut.PARTITION_PERSISTENT; +import static org.apache.geode.cache.RegionShortcut.PARTITION_PERSISTENT_OVERFLOW; +import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT; +import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT_OVERFLOW; +import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT_PERSISTENT; +import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT_PERSISTENT_OVERFLOW; +import static org.apache.geode.internal.util.ArrayUtils.asList; +import static org.apache.geode.test.awaitility.GeodeAwaitility.await; +import static org.apache.geode.test.dunit.VM.getVM; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; + +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; +import junitparams.naming.TestCaseName; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.apache.geode.ForcedDisconnectException; +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.CacheWriter; +import org.apache.geode.cache.CacheWriterException; +import org.apache.geode.cache.ExpirationAction; +import org.apache.geode.cache.ExpirationAttributes; +import org.apache.geode.cache.PartitionAttributes; +import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionEvent; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.partition.PartitionRegionHelper; +import org.apache.geode.cache.util.CacheWriterAdapter; +import org.apache.geode.distributed.DistributedSystemDisconnectedException; +import org.apache.geode.distributed.internal.DMStats; +import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.distributed.internal.membership.api.MembershipManagerHelper; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.dunit.rules.CacheRule; +import org.apache.geode.test.dunit.rules.DistributedDiskDirRule; +import org.apache.geode.test.dunit.rules.DistributedRule; + +/** + * Tests to verify that {@link PartitionedRegion#clear()} cancels all remaining expiration tasks + * on the {@link PartitionedRegion} once the operation is executed. + */ +@RunWith(JUnitParamsRunner.class) +public class PartitionedRegionClearWithExpirationDUnitTest implements Serializable { + private static final Integer BUCKETS = 13; + private static final Integer EXPIRATION_TIME = 30; + private static final String REGION_NAME = "PartitionedRegion"; + private static final String TEST_CASE_NAME = + "[{index}] {method}(Coordinator:{0}, RegionType:{1}, ExpirationAction:{2})"; + + @Rule + public DistributedRule distributedRule = new DistributedRule(3); + + @Rule + public CacheRule cacheRule = CacheRule.builder().createCacheInAll().build(); + + @Rule + public DistributedDiskDirRule distributedDiskDirRule = new DistributedDiskDirRule(); + + private VM accessor, server1, server2; + + private enum TestVM { + ACCESSOR(0), SERVER1(1), SERVER2(2); + + final int vmNumber; + + TestVM(int vmNumber) { + this.vmNumber = vmNumber; + } + } + + @SuppressWarnings("unused") + static RegionShortcut[] regionTypes() { + return new RegionShortcut[] { + PARTITION, + PARTITION_OVERFLOW, + PARTITION_REDUNDANT, + PARTITION_REDUNDANT_OVERFLOW, + + PARTITION_PERSISTENT, + PARTITION_PERSISTENT_OVERFLOW, + PARTITION_REDUNDANT_PERSISTENT, + PARTITION_REDUNDANT_PERSISTENT_OVERFLOW + }; + } + + @SuppressWarnings("unused") + static Object[] regionTypesAndExpirationActions() { + ArrayList<Object[]> parameters = new ArrayList<>(); + RegionShortcut[] regionShortcuts = regionTypes(); + + Arrays.stream(regionShortcuts).forEach(regionShortcut -> { + parameters.add(new Object[] {regionShortcut, DESTROY}); + parameters.add(new Object[] {regionShortcut, INVALIDATE}); + }); + + return parameters.toArray(); + } + + @SuppressWarnings("unused") + static Object[] vmsRegionTypesAndExpirationActions() { + ArrayList<Object[]> parameters = new ArrayList<>(); + RegionShortcut[] regionShortcuts = regionTypes(); + + Arrays.stream(regionShortcuts).forEach(regionShortcut -> { + parameters.add(new Object[] {TestVM.SERVER1, regionShortcut, DESTROY}); + parameters.add(new Object[] {TestVM.SERVER1, regionShortcut, INVALIDATE}); + parameters.add(new Object[] {TestVM.ACCESSOR, regionShortcut, DESTROY}); + parameters.add(new Object[] {TestVM.ACCESSOR, regionShortcut, INVALIDATE}); + }); + + return parameters.toArray(); + } + + @Before + public void setUp() throws Exception { + server1 = getVM(TestVM.SERVER1.vmNumber); + server2 = getVM(TestVM.SERVER2.vmNumber); + accessor = getVM(TestVM.ACCESSOR.vmNumber); + } + + private RegionShortcut getRegionAccessorShortcut(RegionShortcut dataStoreRegionShortcut) { + if (dataStoreRegionShortcut.isPersistent()) { + switch (dataStoreRegionShortcut) { + case PARTITION_PERSISTENT: + return PARTITION; + case PARTITION_PERSISTENT_OVERFLOW: + return PARTITION_OVERFLOW; + case PARTITION_REDUNDANT_PERSISTENT: + return PARTITION_REDUNDANT; + case PARTITION_REDUNDANT_PERSISTENT_OVERFLOW: + return PARTITION_REDUNDANT_OVERFLOW; + } + } + + return dataStoreRegionShortcut; + } + + private void initAccessor(RegionShortcut regionShortcut, + ExpirationAttributes expirationAttributes) { + RegionShortcut accessorShortcut = getRegionAccessorShortcut(regionShortcut); + PartitionAttributes<String, String> attributes = + new PartitionAttributesFactory<String, String>() + .setTotalNumBuckets(BUCKETS) + .setLocalMaxMemory(0) + .create(); + + cacheRule.getCache() + .<String, String>createRegionFactory(accessorShortcut) + .setPartitionAttributes(attributes) + .setEntryTimeToLive(expirationAttributes) + .setEntryIdleTimeout(expirationAttributes) + .create(REGION_NAME); + } + + private void initDataStore(RegionShortcut regionShortcut, + ExpirationAttributes expirationAttributes) { + PartitionAttributes<String, String> attributes = + new PartitionAttributesFactory<String, String>() + .setTotalNumBuckets(BUCKETS) + .create(); + + cacheRule.getCache() + .<String, String>createRegionFactory(regionShortcut) + .setPartitionAttributes(attributes) + .setEntryTimeToLive(expirationAttributes) + .setEntryIdleTimeout(expirationAttributes) + .create(REGION_NAME); + + ExpiryTask.expiryTaskListener = new ExpirationListener(); + } + + private void parametrizedSetup(RegionShortcut regionShortcut, + ExpirationAttributes expirationAttributes) { + server1.invoke(() -> initDataStore(regionShortcut, expirationAttributes)); + server2.invoke(() -> initDataStore(regionShortcut, expirationAttributes)); + accessor.invoke(() -> initAccessor(regionShortcut, expirationAttributes)); + } + + private void waitForSilence() { + DMStats dmStats = cacheRule.getSystem().getDistributionManager().getStats(); + PartitionedRegion region = (PartitionedRegion) cacheRule.getCache().getRegion(REGION_NAME); + PartitionedRegionStats partitionedRegionStats = region.getPrStats(); + + await().untilAsserted(() -> { + assertThat(dmStats.getReplyWaitsInProgress()).isEqualTo(0); + assertThat(partitionedRegionStats.getVolunteeringInProgress()).isEqualTo(0); + assertThat(partitionedRegionStats.getBucketCreatesInProgress()).isEqualTo(0); + assertThat(partitionedRegionStats.getPrimaryTransfersInProgress()).isEqualTo(0); + assertThat(partitionedRegionStats.getRebalanceBucketCreatesInProgress()).isEqualTo(0); + assertThat(partitionedRegionStats.getRebalancePrimaryTransfersInProgress()).isEqualTo(0); + }); + } + + /** + * Populates the region and verifies the data on the selected VMs. + */ + private void populateRegion(VM feeder, int entryCount, List<VM> vms) { + feeder.invoke(() -> { + Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME); + IntStream.range(0, entryCount).forEach(i -> region.put(String.valueOf(i), "Value_" + i)); + }); + + vms.forEach(vm -> vm.invoke(() -> { + waitForSilence(); + Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME); + + IntStream.range(0, entryCount) + .forEach(i -> assertThat(region.get(String.valueOf(i))).isEqualTo("Value_" + i)); + })); + } + + /** + * Asserts that the region is empty on requested VMs. + */ + private void assertRegionIsEmpty(List<VM> vms) { + vms.forEach(vm -> vm.invoke(() -> { + waitForSilence(); + PartitionedRegion region = (PartitionedRegion) cacheRule.getCache().getRegion(REGION_NAME); + + assertThat(region.getLocalSize()).isEqualTo(0); + })); + } + + /** + * Asserts that the region data is consistent across buckets. + */ + private void assertRegionBucketsConsistency() throws ForceReattemptException { + waitForSilence(); + List<BucketDump> bucketDumps; + PartitionedRegion region = (PartitionedRegion) cacheRule.getCache().getRegion(REGION_NAME); + // Redundant copies + 1 primary. + int expectedCopies = region.getRedundantCopies() + 1; + + for (int bucketId = 0; bucketId < BUCKETS; bucketId++) { + bucketDumps = region.getAllBucketEntries(bucketId); + assertThat(bucketDumps.size()).as("Bucket " + bucketId + " should have " + expectedCopies + + " copies, but has " + bucketDumps.size()).isEqualTo(expectedCopies); + + // Check that all copies of the bucket have the same data. + if (bucketDumps.size() > 1) { + BucketDump firstDump = bucketDumps.get(0); + + for (int j = 1; j < bucketDumps.size(); j++) { + BucketDump otherDump = bucketDumps.get(j); + assertThat(otherDump.getValues()) + .as("Values for bucket " + bucketId + " on member " + otherDump.getMember() + + " are not consistent with member " + firstDump.getMember()) + .isEqualTo(firstDump.getValues()); + assertThat(otherDump.getVersions()) + .as("Versions for bucket " + bucketId + " on member " + otherDump.getMember() + + " are not consistent with member " + firstDump.getMember()) + .isEqualTo(firstDump.getVersions()); + } + } + } + } + + /** + * Register the MemberKiller CacheWriter on the given vms and cancel auto-reconnects. + */ + private void registerVMKillerAsCacheWriter(List<VM> vmsToBounce) { + vmsToBounce.forEach(vm -> vm.invoke(() -> { + Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME); + region.getAttributesMutator().setCacheWriter(new MemberKiller()); + })); + } + + /** + * The test does the following (clear coordinator and expiration action are parametrized): + * - Populates the Partition Region (entries have expiration). + * - Verifies that the entries are synchronized on all members. + * - Clears the Partition Region once. + * - Asserts that, after the clear is finished: + * . No expiration tasks were executed. + * . All expiration tasks were cancelled. + * . Map of expiry tasks per bucket is empty. + * . The Partition Region is empty on all members. + */ + @Test + @TestCaseName(TEST_CASE_NAME) + @Parameters(method = "vmsRegionTypesAndExpirationActions") + public void clearShouldRemoveRegisteredExpirationTasks(TestVM coordinatorVM, + RegionShortcut regionShortcut, ExpirationAction expirationAction) { + final int entries = 500; + parametrizedSetup(regionShortcut, new ExpirationAttributes(EXPIRATION_TIME, expirationAction)); + populateRegion(accessor, entries, asList(accessor, server1, server2)); + + // Clear the region. + getVM(coordinatorVM.vmNumber).invoke(() -> { + Cache cache = cacheRule.getCache(); + cache.getRegion(REGION_NAME).clear(); + }); + + // Assert all expiration tasks were cancelled and none were executed. + asList(server1, server2).forEach(vm -> vm.invoke(() -> { + ExpirationListener listener = (ExpirationListener) EntryExpiryTask.expiryTaskListener; + assertThat(listener.tasksRan.get()).isEqualTo(0); + assertThat(listener.tasksCanceled.get()).isEqualTo(listener.tasksScheduled.get()); + + PartitionedRegionDataStore dataStore = + ((PartitionedRegion) cacheRule.getCache().getRegion(REGION_NAME)).getDataStore(); + Set<BucketRegion> bucketRegions = dataStore.getAllLocalBucketRegions(); + bucketRegions + .forEach(bucketRegion -> assertThat(bucketRegion.entryExpiryTasks.isEmpty()).isTrue()); + })); + + // Assert Region Buckets are consistent and region is empty, + accessor.invoke(this::assertRegionBucketsConsistency); + assertRegionIsEmpty(asList(accessor, server1, server1)); + } + + /** + * The test does the following (expiration action is parametrized): + * - Populates the Partition Region (entries have expiration). + * - Verifies that the entries are synchronized on all members. + * - Sets the {@link MemberKiller} as a {@link CacheWriter} to stop the coordinator VM while the + * clear is in progress. + * - Clears the Partition Region (at this point the coordinator is restarted). + * - Asserts that, after the clear is finished and the expiration time is reached: + * . No expiration tasks were cancelled. + * . All entries were removed due to the expiration. + * . The Partition Region Buckets are consistent on all members. + */ + @Test + @TestCaseName("[{index}] {method}(RegionType:{0}, ExpirationAction:{1})") + @Parameters(method = "regionTypesAndExpirationActions") + public void clearShouldFailWhenCoordinatorMemberIsBouncedAndExpirationTasksShouldSurvive( + RegionShortcut regionShortcut, ExpirationAction expirationAction) { + final int entries = 1000; + ExpirationAttributes expirationAttributes = + new ExpirationAttributes(EXPIRATION_TIME, expirationAction); + parametrizedSetup(regionShortcut, expirationAttributes); + populateRegion(accessor, entries, asList(accessor, server1, server2)); Review comment: The `populateRegion` methods uses the `accessor` to insert data into the region and, afterwards, it verifies that the entries are in sync in all three VMs. It's true that only `server1` and `server2` are needed as those are the only members hosting data, but I prefer to use all members for verification purposes (the `accessor` ultimately gets the data from other members, but it's harmless to execute the verification anyway). ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org