[ 
https://issues.apache.org/jira/browse/GEODE-8029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17097529#comment-17097529
 ] 

ASF GitHub Bot commented on GEODE-8029:
---------------------------------------

DonalEvans commented on a change in pull request #5037:
URL: https://github.com/apache/geode/pull/5037#discussion_r418620200



##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
##########
@@ -4279,6 +4279,28 @@ public static void validate(String name, File[] dirs) 
throws Exception {
     }
   }
 
+  /**
+   * Validates the disk-store in offline mode, and returns the validated 
DiskStore instance.
+   * This method is an "almost exact copy" of {@link 
DiskStoreImpl#validate(String, File[])}.

Review comment:
       Instead of having two almost identical methods, might it be better to 
have one call the other to prevent code duplication? Or maybe mark this one as 
a test method for now?

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
##########
@@ -937,14 +937,20 @@ void initAfterRecovery(boolean offline) {
         // this.crf.raf.seek(this.crf.currSize);
       } else if (!offline) {
         // drf exists but crf has been deleted (because it was empty).
-        // I don't think the drf needs to be opened. It is only used during
-        // recovery.
-        // At some point the compacter my identify that it can be deleted.
         this.crf.RAFClosed = true;
         deleteCRF();
+
+        // See GEODE-8029.
+        // The drf file needs to be deleted, specially when the disk-store is 
*only* used by

Review comment:
       This should be "especially".

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/PersistentGatewaySenderWithIsolatedDiskStoreDistributedTest.java
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan;
+
+import static 
org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static 
org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
+import static 
org.apache.geode.distributed.ConfigurationProperties.START_LOCATOR;
+import static 
org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPortsForDUnitSite;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import junitparams.naming.TestCaseName;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.DiskStore;
+import org.apache.geode.cache.DiskStoreFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.wan.GatewayReceiverFactory;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.cache.wan.GatewaySenderFactory;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.cache.DiskStoreImpl;
+import org.apache.geode.internal.cache.RegionQueue;
+import org.apache.geode.internal.cache.persistence.OplogType;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.DistributedDiskDirRule;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+/**
+ * Tests to verify WAN functionality when the gateway-sender(s) have isolated, 
non-shared with
+ * other region(s), disk-store(s).
+ */
+@RunWith(JUnitParamsRunner.class)
+public class PersistentGatewaySenderWithIsolatedDiskStoreDistributedTest 
implements Serializable {
+  private static final String REGION_NAME = "TestRegion";
+  private static final String DISK_STORE_ID = "testDisk";
+  private static final String GATEWAY_SENDER_ID = "testSender";
+  private static final String TEST_CASE_NAME = "[{index}] 
{method}(RegionType:{0}, Parallel:{1})";
+  private int site1Port, site2Port;
+  private VM serverCluster1, serverCluster2;
+
+  @Rule
+  public CacheRule cacheRule = new CacheRule();
+
+  @Rule
+  public DistributedRule distributedRule = new DistributedRule();
+
+  @Rule
+  public SerializableTestName testName = new SerializableTestName();
+
+  @Rule
+  public DistributedDiskDirRule distributedDiskDirRule = new 
DistributedDiskDirRule();
+
+  private Properties createLocatorConfiguration(int distributedSystemId, int 
localLocatorPort,
+      int remoteLocatorPort) {
+    Properties config = new Properties();
+    config.setProperty(MCAST_PORT, "0");
+    config.setProperty(DISTRIBUTED_SYSTEM_ID, 
String.valueOf(distributedSystemId));
+    config.setProperty(LOCATORS, "localhost[" + localLocatorPort + ']');
+    config.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocatorPort + 
']');
+    config.setProperty(START_LOCATOR,
+        "localhost[" + localLocatorPort + 
"],server=true,peer=true,hostname-for-clients=localhost");
+
+    return config;
+  }
+
+  private Properties createServerConfiguration(int localLocatorPort) {
+    Properties config = new Properties();
+    config.setProperty(MCAST_PORT, "0");
+    config.setProperty(LOCATORS, "localhost[" + localLocatorPort + ']');
+
+    return config;
+  }
+
+  private void createDiskStore() {
+    String basePath = distributedDiskDirRule.getDiskDir().getAbsolutePath();
+    File diskDirectory = new File(basePath + File.separator + DISK_STORE_ID);
+    DiskStoreFactory diskStoreFactory = 
cacheRule.getCache().createDiskStoreFactory();
+    diskStoreFactory.setAutoCompact(true);
+    diskStoreFactory.setAllowForceCompaction(true);
+    diskStoreFactory.setDiskDirs(new File[] {diskDirectory});
+    diskStoreFactory.create(DISK_STORE_ID);
+  }
+
+  private void createRegion(RegionShortcut regionShortcut) {
+    cacheRule.getCache()
+        .<String, String>createRegionFactory(regionShortcut)
+        .create(REGION_NAME);
+  }
+
+  private void createGatewayReceiver() {
+    GatewayReceiverFactory gatewayReceiverFactory =
+        cacheRule.getCache().createGatewayReceiverFactory();
+    gatewayReceiverFactory.setManualStart(false);
+    gatewayReceiverFactory.create();
+  }
+
+  private void createGatewaySender(boolean parallel, int 
remoteDistributedSystemId) {
+    GatewaySenderFactory gatewaySenderFactory = 
cacheRule.getCache().createGatewaySenderFactory();
+    gatewaySenderFactory.setParallel(parallel);
+    gatewaySenderFactory.setDiskSynchronous(true);
+    gatewaySenderFactory.setPersistenceEnabled(true);
+    gatewaySenderFactory.setDiskStoreName(DISK_STORE_ID);
+    gatewaySenderFactory.create(GATEWAY_SENDER_ID, remoteDistributedSystemId);
+  }
+
+  private void createServerWithRegionAndGatewayReceiver(RegionShortcut 
regionShortcut) {
+    createGatewayReceiver();
+    createRegion(regionShortcut);
+  }
+
+  private void createServerWithRegionAndPersistentGatewaySender(RegionShortcut 
regionShortcut,
+      int remoteDistributedSystemId, boolean parallel) {
+    createDiskStore();
+    createRegion(regionShortcut);
+    createGatewaySender(parallel, remoteDistributedSystemId);
+    Region<String, String> region = 
cacheRule.getCache().getRegion(REGION_NAME);
+    region.getAttributesMutator().addGatewaySenderId(GATEWAY_SENDER_ID);
+  }
+
+  private void gracefullyDisconnect() {
+    
InternalDistributedSystem.getConnectedInstance().stopReconnectingNoDisconnect();
+    InternalDistributedSystem.getConnectedInstance().disconnect();
+    await()
+        .untilAsserted(() -> 
assertThat(InternalDistributedSystem.getConnectedInstance()).isNull());
+  }
+
+  private void awaitForQueueSize(int queueSize) {
+    GatewaySender gatewaySender = 
cacheRule.getCache().getGatewaySender(GATEWAY_SENDER_ID);
+    await().untilAsserted(() -> {
+      Set<RegionQueue> queues = ((AbstractGatewaySender) 
gatewaySender).getQueues();
+      int totalSize = queues.stream().mapToInt(RegionQueue::size).sum();
+      assertThat(queueSize).isEqualTo(totalSize);
+    });
+  }
+
+  @SuppressWarnings("unused")
+  static Object[] regionAndGatewayTypes() {
+    ArrayList<Object[]> parameters = new ArrayList<>();
+    parameters.add(new Object[] {RegionShortcut.PARTITION, true});
+    parameters.add(new Object[] {RegionShortcut.REPLICATE, false});
+
+    return parameters.toArray();
+  }
+
+  @Before
+  public void setUp() {
+    VM locatorCluster1 = getVM(0);
+    serverCluster1 = getVM(1);
+    VM locatorCluster2 = getVM(2);
+    serverCluster2 = getVM(3);
+
+    int[] ports = getRandomAvailableTCPPortsForDUnitSite(2);
+    site1Port = ports[0];
+    site2Port = ports[1];
+
+    // Start 2 sites, one locator and one server per site.
+    locatorCluster1
+        .invoke(() -> cacheRule.createCache(createLocatorConfiguration(1, 
site1Port, site2Port)));
+    locatorCluster2
+        .invoke(() -> cacheRule.createCache(createLocatorConfiguration(2, 
site2Port, site1Port)));
+
+    serverCluster1.invoke(() -> 
cacheRule.createCache(createServerConfiguration(site1Port)));
+    serverCluster2.invoke(() -> 
cacheRule.createCache(createServerConfiguration(site2Port)));
+  }
+
+  /**
+   * The tests executes the following:
+   * - Creates region and gateway-receiver on cluster2.
+   * - Creates the region and gateway-sender on cluster1.
+   * - Populates the region and waits until WAN replication has finished.
+   * - Restarts server on cluster1, and stops it afterwards (the initial 
compaction occurs during
+   * startup and the disk validation is done offline).
+   * - Asserts that there are no orphaned drf files, neither compact-able 
records on the disks-tore.
+   */
+  @Test
+  @TestCaseName(TEST_CASE_NAME)
+  @Parameters(method = "regionAndGatewayTypes")
+  public void 
diskStoreShouldBeCompactedOnMemberRestartWhenAllEventsHaveBeenDispatched(
+      RegionShortcut regionShortcut, boolean parallel) throws Exception {
+    final int entries = 100;
+
+    // Create Region and Receiver on Cluster2
+    serverCluster2.invoke(() -> 
createServerWithRegionAndGatewayReceiver(regionShortcut));
+
+    // Create Region, DiskStore and Gateway on Cluster1
+    String diskStorePath = serverCluster1.invoke(() -> {
+      createServerWithRegionAndPersistentGatewaySender(regionShortcut, 2, 
parallel);
+      DiskStore diskStore = cacheRule.getCache().findDiskStore(DISK_STORE_ID);
+      Region<String, String> region = 
cacheRule.getCache().getRegion(REGION_NAME);
+
+      // Insert entries and wait for WAN replication to finish.
+      IntStream.range(0, entries).forEach(value -> region.put("Key" + value, 
"Value" + value));
+      awaitForQueueSize(0);
+
+      return diskStore.getDiskDirs()[0].getAbsolutePath();
+    });
+
+    // Wait for Cluster2 to receive all events.
+    serverCluster2.invoke(() -> await().untilAsserted(
+        () -> 
assertThat(cacheRule.getCache().getRegion(REGION_NAME).size()).isEqualTo(entries)));
+
+    // Restart and Stop Server on Cluster1
+    serverCluster1.invoke(() -> {
+      gracefullyDisconnect();
+      cacheRule.createCache(createServerConfiguration(site1Port));
+      createServerWithRegionAndPersistentGatewaySender(regionShortcut, 2, 
parallel);
+      gracefullyDisconnect();
+    });
+
+    // There should be no orphaned drf files, neither compact-able records on 
the disk-store.
+    File gatewayDiskStore = new File(diskStorePath);
+    assertThat(gatewayDiskStore.list())
+        .hasSize(3)
+        .contains(OplogType.BACKUP.getPrefix() + DISK_STORE_ID + ".if")
+        .contains(OplogType.BACKUP.getPrefix() + DISK_STORE_ID + "_2.drf")
+        .contains(OplogType.BACKUP.getPrefix() + DISK_STORE_ID + "_2.crf");
+
+    DiskStore diskStore =
+        DiskStoreImpl.offlineValidate(DISK_STORE_ID, new File[] 
{gatewayDiskStore});
+    assertThat(((DiskStoreImpl) diskStore).getLiveEntryCount()).isEqualTo(0);
+    assertThat(((DiskStoreImpl) diskStore).getDeadRecordCount()).isEqualTo(0);
+  }
+
+  /**
+   * The tests executes the following:
+   * - Creates the region and a gateway-sender on cluster2.
+   * - Populates the region and waits until all events have been enqueued.
+   * - Restarts server on cluster2 and stops it afterwards (the initial 
compaction occurs during
+   * startup and the validation is done offline).
+   * - Verifies that there are no orphaned files neither compact-able records 
on the disk-store.
+   * - Creates the region and a gateway-receiver on cluster1.
+   * - Starts server on cluster2 again and waits for WAN replication to finish.
+   * - Restart server on cluster2, and stop it afterwards (the initial 
compaction occurs during
+   * startup and the validation is done offline).
+   * - Asserts that there are no orphaned drf files, neither compact-able 
records on the disks-tore.
+   */
+  @Test
+  @TestCaseName(TEST_CASE_NAME)
+  @Parameters(method = "regionAndGatewayTypes")
+  public void anotherTest(RegionShortcut regionShortcut, boolean parallel) 
throws Exception {

Review comment:
       This test should be renamed.

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/PersistentGatewaySenderWithIsolatedDiskStoreDistributedTest.java
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan;
+
+import static 
org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static 
org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
+import static 
org.apache.geode.distributed.ConfigurationProperties.START_LOCATOR;
+import static 
org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPortsForDUnitSite;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import junitparams.naming.TestCaseName;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.DiskStore;
+import org.apache.geode.cache.DiskStoreFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.wan.GatewayReceiverFactory;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.cache.wan.GatewaySenderFactory;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.cache.DiskStoreImpl;
+import org.apache.geode.internal.cache.RegionQueue;
+import org.apache.geode.internal.cache.persistence.OplogType;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.DistributedDiskDirRule;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+/**
+ * Tests to verify WAN functionality when the gateway-sender(s) have isolated, 
non-shared with
+ * other region(s), disk-store(s).
+ */
+@RunWith(JUnitParamsRunner.class)
+public class PersistentGatewaySenderWithIsolatedDiskStoreDistributedTest 
implements Serializable {
+  private static final String REGION_NAME = "TestRegion";
+  private static final String DISK_STORE_ID = "testDisk";
+  private static final String GATEWAY_SENDER_ID = "testSender";
+  private static final String TEST_CASE_NAME = "[{index}] 
{method}(RegionType:{0}, Parallel:{1})";
+  private int site1Port, site2Port;
+  private VM serverCluster1, serverCluster2;
+
+  @Rule
+  public CacheRule cacheRule = new CacheRule();
+
+  @Rule
+  public DistributedRule distributedRule = new DistributedRule();
+
+  @Rule
+  public SerializableTestName testName = new SerializableTestName();
+
+  @Rule
+  public DistributedDiskDirRule distributedDiskDirRule = new 
DistributedDiskDirRule();
+
+  private Properties createLocatorConfiguration(int distributedSystemId, int 
localLocatorPort,
+      int remoteLocatorPort) {
+    Properties config = new Properties();
+    config.setProperty(MCAST_PORT, "0");
+    config.setProperty(DISTRIBUTED_SYSTEM_ID, 
String.valueOf(distributedSystemId));
+    config.setProperty(LOCATORS, "localhost[" + localLocatorPort + ']');
+    config.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocatorPort + 
']');
+    config.setProperty(START_LOCATOR,
+        "localhost[" + localLocatorPort + 
"],server=true,peer=true,hostname-for-clients=localhost");
+
+    return config;
+  }
+
+  private Properties createServerConfiguration(int localLocatorPort) {
+    Properties config = new Properties();
+    config.setProperty(MCAST_PORT, "0");
+    config.setProperty(LOCATORS, "localhost[" + localLocatorPort + ']');
+
+    return config;
+  }
+
+  private void createDiskStore() {
+    String basePath = distributedDiskDirRule.getDiskDir().getAbsolutePath();
+    File diskDirectory = new File(basePath + File.separator + DISK_STORE_ID);
+    DiskStoreFactory diskStoreFactory = 
cacheRule.getCache().createDiskStoreFactory();
+    diskStoreFactory.setAutoCompact(true);
+    diskStoreFactory.setAllowForceCompaction(true);
+    diskStoreFactory.setDiskDirs(new File[] {diskDirectory});
+    diskStoreFactory.create(DISK_STORE_ID);
+  }
+
+  private void createRegion(RegionShortcut regionShortcut) {
+    cacheRule.getCache()
+        .<String, String>createRegionFactory(regionShortcut)
+        .create(REGION_NAME);
+  }
+
+  private void createGatewayReceiver() {
+    GatewayReceiverFactory gatewayReceiverFactory =
+        cacheRule.getCache().createGatewayReceiverFactory();
+    gatewayReceiverFactory.setManualStart(false);
+    gatewayReceiverFactory.create();
+  }
+
+  private void createGatewaySender(boolean parallel, int 
remoteDistributedSystemId) {
+    GatewaySenderFactory gatewaySenderFactory = 
cacheRule.getCache().createGatewaySenderFactory();
+    gatewaySenderFactory.setParallel(parallel);
+    gatewaySenderFactory.setDiskSynchronous(true);
+    gatewaySenderFactory.setPersistenceEnabled(true);
+    gatewaySenderFactory.setDiskStoreName(DISK_STORE_ID);
+    gatewaySenderFactory.create(GATEWAY_SENDER_ID, remoteDistributedSystemId);
+  }
+
+  private void createServerWithRegionAndGatewayReceiver(RegionShortcut 
regionShortcut) {
+    createGatewayReceiver();
+    createRegion(regionShortcut);
+  }
+
+  private void createServerWithRegionAndPersistentGatewaySender(RegionShortcut 
regionShortcut,
+      int remoteDistributedSystemId, boolean parallel) {
+    createDiskStore();
+    createRegion(regionShortcut);
+    createGatewaySender(parallel, remoteDistributedSystemId);
+    Region<String, String> region = 
cacheRule.getCache().getRegion(REGION_NAME);
+    region.getAttributesMutator().addGatewaySenderId(GATEWAY_SENDER_ID);
+  }
+
+  private void gracefullyDisconnect() {
+    
InternalDistributedSystem.getConnectedInstance().stopReconnectingNoDisconnect();
+    InternalDistributedSystem.getConnectedInstance().disconnect();
+    await()
+        .untilAsserted(() -> 
assertThat(InternalDistributedSystem.getConnectedInstance()).isNull());
+  }
+
+  private void awaitForQueueSize(int queueSize) {
+    GatewaySender gatewaySender = 
cacheRule.getCache().getGatewaySender(GATEWAY_SENDER_ID);
+    await().untilAsserted(() -> {
+      Set<RegionQueue> queues = ((AbstractGatewaySender) 
gatewaySender).getQueues();
+      int totalSize = queues.stream().mapToInt(RegionQueue::size).sum();
+      assertThat(queueSize).isEqualTo(totalSize);
+    });
+  }
+
+  @SuppressWarnings("unused")
+  static Object[] regionAndGatewayTypes() {
+    ArrayList<Object[]> parameters = new ArrayList<>();
+    parameters.add(new Object[] {RegionShortcut.PARTITION, true});
+    parameters.add(new Object[] {RegionShortcut.REPLICATE, false});

Review comment:
       What about the cases where the region is `PARTITION` and parallel is 
`false`, or region is `REPLICATE` and parallel is `true`? Do those need to be 
covered?

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/PersistentGatewaySenderWithIsolatedDiskStoreDistributedTest.java
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan;
+
+import static 
org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static 
org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
+import static 
org.apache.geode.distributed.ConfigurationProperties.START_LOCATOR;
+import static 
org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPortsForDUnitSite;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import junitparams.naming.TestCaseName;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.DiskStore;
+import org.apache.geode.cache.DiskStoreFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.wan.GatewayReceiverFactory;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.cache.wan.GatewaySenderFactory;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.cache.DiskStoreImpl;
+import org.apache.geode.internal.cache.RegionQueue;
+import org.apache.geode.internal.cache.persistence.OplogType;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.DistributedDiskDirRule;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+/**
+ * Tests to verify WAN functionality when the gateway-sender(s) have isolated, 
non-shared with
+ * other region(s), disk-store(s).
+ */
+@RunWith(JUnitParamsRunner.class)
+public class PersistentGatewaySenderWithIsolatedDiskStoreDistributedTest 
implements Serializable {
+  private static final String REGION_NAME = "TestRegion";
+  private static final String DISK_STORE_ID = "testDisk";
+  private static final String GATEWAY_SENDER_ID = "testSender";
+  private static final String TEST_CASE_NAME = "[{index}] 
{method}(RegionType:{0}, Parallel:{1})";
+  private int site1Port, site2Port;
+  private VM serverCluster1, serverCluster2;
+
+  @Rule
+  public CacheRule cacheRule = new CacheRule();
+
+  @Rule
+  public DistributedRule distributedRule = new DistributedRule();
+
+  @Rule
+  public SerializableTestName testName = new SerializableTestName();
+
+  @Rule
+  public DistributedDiskDirRule distributedDiskDirRule = new 
DistributedDiskDirRule();
+
+  private Properties createLocatorConfiguration(int distributedSystemId, int 
localLocatorPort,
+      int remoteLocatorPort) {
+    Properties config = new Properties();
+    config.setProperty(MCAST_PORT, "0");
+    config.setProperty(DISTRIBUTED_SYSTEM_ID, 
String.valueOf(distributedSystemId));
+    config.setProperty(LOCATORS, "localhost[" + localLocatorPort + ']');
+    config.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocatorPort + 
']');
+    config.setProperty(START_LOCATOR,
+        "localhost[" + localLocatorPort + 
"],server=true,peer=true,hostname-for-clients=localhost");
+
+    return config;
+  }
+
+  private Properties createServerConfiguration(int localLocatorPort) {
+    Properties config = new Properties();
+    config.setProperty(MCAST_PORT, "0");
+    config.setProperty(LOCATORS, "localhost[" + localLocatorPort + ']');
+
+    return config;
+  }
+
+  private void createDiskStore() {
+    String basePath = distributedDiskDirRule.getDiskDir().getAbsolutePath();
+    File diskDirectory = new File(basePath + File.separator + DISK_STORE_ID);
+    DiskStoreFactory diskStoreFactory = 
cacheRule.getCache().createDiskStoreFactory();
+    diskStoreFactory.setAutoCompact(true);
+    diskStoreFactory.setAllowForceCompaction(true);
+    diskStoreFactory.setDiskDirs(new File[] {diskDirectory});
+    diskStoreFactory.create(DISK_STORE_ID);
+  }
+
+  private void createRegion(RegionShortcut regionShortcut) {
+    cacheRule.getCache()
+        .<String, String>createRegionFactory(regionShortcut)
+        .create(REGION_NAME);
+  }
+
+  private void createGatewayReceiver() {
+    GatewayReceiverFactory gatewayReceiverFactory =
+        cacheRule.getCache().createGatewayReceiverFactory();
+    gatewayReceiverFactory.setManualStart(false);
+    gatewayReceiverFactory.create();
+  }
+
+  private void createGatewaySender(boolean parallel, int 
remoteDistributedSystemId) {
+    GatewaySenderFactory gatewaySenderFactory = 
cacheRule.getCache().createGatewaySenderFactory();
+    gatewaySenderFactory.setParallel(parallel);
+    gatewaySenderFactory.setDiskSynchronous(true);
+    gatewaySenderFactory.setPersistenceEnabled(true);
+    gatewaySenderFactory.setDiskStoreName(DISK_STORE_ID);
+    gatewaySenderFactory.create(GATEWAY_SENDER_ID, remoteDistributedSystemId);
+  }
+
+  private void createServerWithRegionAndGatewayReceiver(RegionShortcut 
regionShortcut) {
+    createGatewayReceiver();
+    createRegion(regionShortcut);
+  }
+
+  private void createServerWithRegionAndPersistentGatewaySender(RegionShortcut 
regionShortcut,
+      int remoteDistributedSystemId, boolean parallel) {
+    createDiskStore();
+    createRegion(regionShortcut);
+    createGatewaySender(parallel, remoteDistributedSystemId);
+    Region<String, String> region = 
cacheRule.getCache().getRegion(REGION_NAME);
+    region.getAttributesMutator().addGatewaySenderId(GATEWAY_SENDER_ID);
+  }
+
+  private void gracefullyDisconnect() {
+    
InternalDistributedSystem.getConnectedInstance().stopReconnectingNoDisconnect();
+    InternalDistributedSystem.getConnectedInstance().disconnect();
+    await()
+        .untilAsserted(() -> 
assertThat(InternalDistributedSystem.getConnectedInstance()).isNull());
+  }
+
+  private void awaitForQueueSize(int queueSize) {
+    GatewaySender gatewaySender = 
cacheRule.getCache().getGatewaySender(GATEWAY_SENDER_ID);
+    await().untilAsserted(() -> {
+      Set<RegionQueue> queues = ((AbstractGatewaySender) 
gatewaySender).getQueues();
+      int totalSize = queues.stream().mapToInt(RegionQueue::size).sum();
+      assertThat(queueSize).isEqualTo(totalSize);
+    });
+  }
+
+  @SuppressWarnings("unused")
+  static Object[] regionAndGatewayTypes() {
+    ArrayList<Object[]> parameters = new ArrayList<>();
+    parameters.add(new Object[] {RegionShortcut.PARTITION, true});
+    parameters.add(new Object[] {RegionShortcut.REPLICATE, false});
+
+    return parameters.toArray();
+  }
+
+  @Before
+  public void setUp() {
+    VM locatorCluster1 = getVM(0);
+    serverCluster1 = getVM(1);
+    VM locatorCluster2 = getVM(2);
+    serverCluster2 = getVM(3);

Review comment:
       Is one server per cluster enough to test the behaviour with a parallel 
gateway sender here? Or is this a situation where that's not relevant?

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/PersistentGatewaySenderWithIsolatedDiskStoreDistributedTest.java
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan;
+
+import static 
org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static 
org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
+import static 
org.apache.geode.distributed.ConfigurationProperties.START_LOCATOR;
+import static 
org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPortsForDUnitSite;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import junitparams.naming.TestCaseName;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.DiskStore;
+import org.apache.geode.cache.DiskStoreFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.wan.GatewayReceiverFactory;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.cache.wan.GatewaySenderFactory;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.cache.DiskStoreImpl;
+import org.apache.geode.internal.cache.RegionQueue;
+import org.apache.geode.internal.cache.persistence.OplogType;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.DistributedDiskDirRule;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+/**
+ * Tests to verify WAN functionality when the gateway-sender(s) have isolated, 
non-shared with
+ * other region(s), disk-store(s).
+ */
+@RunWith(JUnitParamsRunner.class)
+public class PersistentGatewaySenderWithIsolatedDiskStoreDistributedTest 
implements Serializable {
+  private static final String REGION_NAME = "TestRegion";
+  private static final String DISK_STORE_ID = "testDisk";
+  private static final String GATEWAY_SENDER_ID = "testSender";
+  private static final String TEST_CASE_NAME = "[{index}] 
{method}(RegionType:{0}, Parallel:{1})";
+  private int site1Port, site2Port;
+  private VM serverCluster1, serverCluster2;
+
+  @Rule
+  public CacheRule cacheRule = new CacheRule();
+
+  @Rule
+  public DistributedRule distributedRule = new DistributedRule();
+
+  @Rule
+  public SerializableTestName testName = new SerializableTestName();
+
+  @Rule
+  public DistributedDiskDirRule distributedDiskDirRule = new 
DistributedDiskDirRule();
+
+  private Properties createLocatorConfiguration(int distributedSystemId, int 
localLocatorPort,
+      int remoteLocatorPort) {
+    Properties config = new Properties();
+    config.setProperty(MCAST_PORT, "0");
+    config.setProperty(DISTRIBUTED_SYSTEM_ID, 
String.valueOf(distributedSystemId));
+    config.setProperty(LOCATORS, "localhost[" + localLocatorPort + ']');
+    config.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocatorPort + 
']');
+    config.setProperty(START_LOCATOR,
+        "localhost[" + localLocatorPort + 
"],server=true,peer=true,hostname-for-clients=localhost");
+
+    return config;
+  }
+
+  private Properties createServerConfiguration(int localLocatorPort) {
+    Properties config = new Properties();
+    config.setProperty(MCAST_PORT, "0");
+    config.setProperty(LOCATORS, "localhost[" + localLocatorPort + ']');
+
+    return config;
+  }
+
+  private void createDiskStore() {
+    String basePath = distributedDiskDirRule.getDiskDir().getAbsolutePath();
+    File diskDirectory = new File(basePath + File.separator + DISK_STORE_ID);
+    DiskStoreFactory diskStoreFactory = 
cacheRule.getCache().createDiskStoreFactory();
+    diskStoreFactory.setAutoCompact(true);
+    diskStoreFactory.setAllowForceCompaction(true);
+    diskStoreFactory.setDiskDirs(new File[] {diskDirectory});
+    diskStoreFactory.create(DISK_STORE_ID);
+  }
+
+  private void createRegion(RegionShortcut regionShortcut) {
+    cacheRule.getCache()
+        .<String, String>createRegionFactory(regionShortcut)
+        .create(REGION_NAME);
+  }
+
+  private void createGatewayReceiver() {
+    GatewayReceiverFactory gatewayReceiverFactory =
+        cacheRule.getCache().createGatewayReceiverFactory();
+    gatewayReceiverFactory.setManualStart(false);
+    gatewayReceiverFactory.create();
+  }
+
+  private void createGatewaySender(boolean parallel, int 
remoteDistributedSystemId) {
+    GatewaySenderFactory gatewaySenderFactory = 
cacheRule.getCache().createGatewaySenderFactory();
+    gatewaySenderFactory.setParallel(parallel);
+    gatewaySenderFactory.setDiskSynchronous(true);
+    gatewaySenderFactory.setPersistenceEnabled(true);
+    gatewaySenderFactory.setDiskStoreName(DISK_STORE_ID);
+    gatewaySenderFactory.create(GATEWAY_SENDER_ID, remoteDistributedSystemId);
+  }
+
+  private void createServerWithRegionAndGatewayReceiver(RegionShortcut 
regionShortcut) {
+    createGatewayReceiver();
+    createRegion(regionShortcut);
+  }
+
+  private void createServerWithRegionAndPersistentGatewaySender(RegionShortcut 
regionShortcut,
+      int remoteDistributedSystemId, boolean parallel) {
+    createDiskStore();
+    createRegion(regionShortcut);
+    createGatewaySender(parallel, remoteDistributedSystemId);
+    Region<String, String> region = 
cacheRule.getCache().getRegion(REGION_NAME);
+    region.getAttributesMutator().addGatewaySenderId(GATEWAY_SENDER_ID);
+  }
+
+  private void gracefullyDisconnect() {
+    
InternalDistributedSystem.getConnectedInstance().stopReconnectingNoDisconnect();
+    InternalDistributedSystem.getConnectedInstance().disconnect();
+    await()
+        .untilAsserted(() -> 
assertThat(InternalDistributedSystem.getConnectedInstance()).isNull());
+  }
+
+  private void awaitForQueueSize(int queueSize) {
+    GatewaySender gatewaySender = 
cacheRule.getCache().getGatewaySender(GATEWAY_SENDER_ID);
+    await().untilAsserted(() -> {
+      Set<RegionQueue> queues = ((AbstractGatewaySender) 
gatewaySender).getQueues();
+      int totalSize = queues.stream().mapToInt(RegionQueue::size).sum();
+      assertThat(queueSize).isEqualTo(totalSize);
+    });
+  }
+
+  @SuppressWarnings("unused")
+  static Object[] regionAndGatewayTypes() {
+    ArrayList<Object[]> parameters = new ArrayList<>();
+    parameters.add(new Object[] {RegionShortcut.PARTITION, true});
+    parameters.add(new Object[] {RegionShortcut.REPLICATE, false});
+
+    return parameters.toArray();
+  }
+
+  @Before
+  public void setUp() {
+    VM locatorCluster1 = getVM(0);
+    serverCluster1 = getVM(1);
+    VM locatorCluster2 = getVM(2);
+    serverCluster2 = getVM(3);
+
+    int[] ports = getRandomAvailableTCPPortsForDUnitSite(2);
+    site1Port = ports[0];
+    site2Port = ports[1];
+
+    // Start 2 sites, one locator and one server per site.
+    locatorCluster1
+        .invoke(() -> cacheRule.createCache(createLocatorConfiguration(1, 
site1Port, site2Port)));
+    locatorCluster2
+        .invoke(() -> cacheRule.createCache(createLocatorConfiguration(2, 
site2Port, site1Port)));
+
+    serverCluster1.invoke(() -> 
cacheRule.createCache(createServerConfiguration(site1Port)));
+    serverCluster2.invoke(() -> 
cacheRule.createCache(createServerConfiguration(site2Port)));
+  }
+
+  /**
+   * The tests executes the following:
+   * - Creates region and gateway-receiver on cluster2.
+   * - Creates the region and gateway-sender on cluster1.
+   * - Populates the region and waits until WAN replication has finished.
+   * - Restarts server on cluster1, and stops it afterwards (the initial 
compaction occurs during
+   * startup and the disk validation is done offline).
+   * - Asserts that there are no orphaned drf files, neither compact-able 
records on the disks-tore.
+   */
+  @Test
+  @TestCaseName(TEST_CASE_NAME)
+  @Parameters(method = "regionAndGatewayTypes")
+  public void 
diskStoreShouldBeCompactedOnMemberRestartWhenAllEventsHaveBeenDispatched(
+      RegionShortcut regionShortcut, boolean parallel) throws Exception {
+    final int entries = 100;

Review comment:
       Small nitpick. The number of entries in this test is 100, but in the 
other test in this class it's 1000. Could they be the same, for consistency?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> java.lang.IllegalArgumentException: Too large (805306401 expected elements 
> with load factor 0.75)
> -------------------------------------------------------------------------------------------------
>
>                 Key: GEODE-8029
>                 URL: https://issues.apache.org/jira/browse/GEODE-8029
>             Project: Geode
>          Issue Type: Bug
>          Components: configuration, core, gfsh
>    Affects Versions: 1.9.0
>            Reporter: Jagadeesh sivasankaran
>            Assignee: Juan Ramos
>            Priority: Major
>              Labels: GeodeCommons, caching-applications
>         Attachments: Screen Shot 2020-04-27 at 12.21.19 PM.png, Screen Shot 
> 2020-04-27 at 12.21.19 PM.png, server02.log
>
>
> we have a cluster of three Locator Geode and three Cache Server running in 
> CentOS servers. Today (April 27) after patching our CENTOS servers , all 
> locator and 2 servers came up , But one Cache server was not starting . here 
> is the Exception details.  Please let me know how to resolve the beloe issue 
> and need any configuration changes to diskstore ? 
>  
>  
> Starting a Geode Server in /app/provServerHO2...
> ....................................................................................................................................................................................................................The
>  Cache Server process terminated unexpectedly with exit status 1. Please 
> refer to the log file in /app/provServerHO2 for full details.
> Exception in thread "main" java.lang.IllegalArgumentException: Too large 
> (805306401 expected elements with load factor 0.75)
> at it.unimi.dsi.fastutil.HashCommon.arraySize(HashCommon.java:222)
> at it.unimi.dsi.fastutil.ints.IntOpenHashSet.add(IntOpenHashSet.java:308)
> at 
> org.apache.geode.internal.cache.DiskStoreImpl$OplogEntryIdSet.add(DiskStoreImpl.java:3474)
> at org.apache.geode.internal.cache.Oplog.readDelEntry(Oplog.java:3007)
> at org.apache.geode.internal.cache.Oplog.recoverDrf(Oplog.java:1500)
> at 
> org.apache.geode.internal.cache.PersistentOplogSet.recoverOplogs(PersistentOplogSet.java:445)
> at 
> org.apache.geode.internal.cache.PersistentOplogSet.recoverRegionsThatAreReady(PersistentOplogSet.java:369)
> at 
> org.apache.geode.internal.cache.DiskStoreImpl.recoverRegionsThatAreReady(DiskStoreImpl.java:2053)
> at 
> org.apache.geode.internal.cache.DiskStoreImpl.initializeIfNeeded(DiskStoreImpl.java:2041)
> security-peer-auth-init=
> at 
> org.apache.geode.internal.cache.DiskStoreImpl.doInitialRecovery(DiskStoreImpl.java:2046)
> at 
> org.apache.geode.internal.cache.DiskStoreFactoryImpl.initializeDiskStore(DiskStoreFactoryImpl.java:184)
> at 
> org.apache.geode.internal.cache.DiskStoreFactoryImpl.create(DiskStoreFactoryImpl.java:150)
> at 
> org.apache.geode.internal.cache.xmlcache.CacheCreation.createDiskStore(CacheCreation.java:794)
> at 
> org.apache.geode.internal.cache.xmlcache.CacheCreation.initializePdxDiskStore(CacheCreation.java:785)
> at 
> org.apache.geode.internal.cache.xmlcache.CacheCreation.create(CacheCreation.java:509)
> at 
> org.apache.geode.internal.cache.xmlcache.CacheXmlParser.create(CacheXmlParser.java:337)
> at 
> org.apache.geode.internal.cache.GemFireCacheImpl.loadCacheXml(GemFireCacheImpl.java:4272)
> at 
> org.apache.geode.internal.cache.ClusterConfigurationLoader.applyClusterXmlConfiguration(ClusterConfigurationLoader.java:197)
> at 
> org.apache.geode.internal.cache.GemFireCacheImpl.applyJarAndXmlFromClusterConfig(GemFireCacheImpl.java:1240)
> at 
> org.apache.geode.internal.cache.GemFireCacheImpl.initialize(GemFireCacheImpl.java:1206)
> at 
> org.apache.geode.internal.cache.InternalCacheBuilder.create(InternalCacheBuilder.java:207)
> at 
> org.apache.geode.internal.cache.InternalCacheBuilder.create(InternalCacheBuilder.java:164)
> at org.apache.geode.cache.CacheFactory.create(CacheFactory.java:139)
> at 
> org.apache.geode.distributed.internal.DefaultServerLauncherCacheProvider.createCache(DefaultServerLauncherCacheProvider.java:52)
> at 
> org.apache.geode.distributed.ServerLauncher.createCache(ServerLauncher.java:869)
> at org.apache.geode.distributed.ServerLauncher.start(ServerLauncher.java:786)
> at org.apache.geode.distributed.ServerLauncher.run(ServerLauncher.java:716)
> at org.apache.geode.distributed.ServerLauncher.main(ServerLauncher.java:236)
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to