[ 
https://issues.apache.org/jira/browse/GEODE-8894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17285470#comment-17285470
 ] 

ASF GitHub Bot commented on GEODE-8894:
---------------------------------------

kirklund commented on a change in pull request #5978:
URL: https://github.com/apache/geode/pull/5978#discussion_r577067516



##########
File path: 
geode-core/src/distributedTest/java/org/apache/geode/internal/cache/DeltaForceSizingFlagDUnitTest.java
##########
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.junit.Assert.assertEquals;

Review comment:
       Please start using AssertJ instead of JUnit Assert. The API is much 
richer and the failure messages are more detailed.

##########
File path: 
geode-core/src/distributedTest/java/org/apache/geode/internal/cache/DeltaForceSizingFlagDUnitTest.java
##########
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.logging.log4j.Logger;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.DiskStoreFactory;
+import org.apache.geode.cache.EvictionAction;
+import org.apache.geode.cache.EvictionAttributes;
+import org.apache.geode.cache.InterestPolicy;
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.cache.SubscriptionAttributes;
+import org.apache.geode.cache.util.ObjectSizer;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+/**
+ * Tests the use of the per-delta "forceRecalculateSize" flag.
+ */
+
+public class DeltaForceSizingFlagDUnitTest {
+  private static final String TEST_REGION_NAME = "forceResizeTestRegionName";
+  public static final String SMALLER_DELTA_DATA = "12345";
+  public static final String LARGER_DELTA_DATA = "1234567890";
+  public static final String DELTA_KEY = "a_key";
+  public static final String RR_DISK_STORE_NAME = 
"_forceRecalculateSize_replicate_store";
+  private static final Logger logger = LogService.getLogger();
+
+  @Rule
+  public ClusterStartupRule cluster = new ClusterStartupRule();
+
+  protected MemberVM locator;
+  protected MemberVM server1;
+  protected MemberVM server2;
+
+  @Before
+  public void setup() {
+    int locatorPort;
+    locator = cluster.startLocatorVM(0);
+    locatorPort = locator.getPort();
+
+    server1 = cluster.startServerVM(1, locatorPort);
+    server2 = cluster.startServerVM(2, locatorPort);
+  }
+
+  @Test
+  public void testRRMemLRUDelta() {
+    doRRMemLRUDeltaTest(false);
+  }
+
+  @Test
+  public void testRRMemLRUDeltaAndFlag() {
+    doRRMemLRUDeltaTest(true);
+  }
+
+  @Test
+  public void testPRNoLRUDelta() {
+    doPRNoLRUDeltaTest(false);
+  }
+
+  @Test
+  public void testPRNoLRUAndFlagDelta() {
+    doPRNoLRUDeltaTest(true);
+  }
+
+  private void doRRMemLRUDeltaTest(boolean shouldSizeChange) {
+    VM vm1 = server1.getVM();
+    VM vm2 = server2.getVM();
+
+    createRR(server1);
+    createRR(server2);
+    TestDelta delta1 = new TestDelta(false, SMALLER_DELTA_DATA, 
shouldSizeChange);
+    put(vm1, delta1);
+
+    assertValueType(vm1, ValueType.RAW_VALUE);
+    assertValueType(vm2, ValueType.CD_SERIALIZED);
+    assertEquals(1, getObjectSizerInvocations(vm1));
+    assertEquals(0, getObjectSizerInvocations(vm2));
+
+    long origEvictionSize0 = getSizeFromEvictionStats(vm1);
+    long origEvictionSize1 = getSizeFromEvictionStats(vm2);
+    delta1.info = LARGER_DELTA_DATA;
+    delta1.hasDelta = true;
+    // Update the delta
+    put(vm1, delta1);
+
+    assertValueType(vm1, ValueType.RAW_VALUE);
+    assertValueType(vm2, ValueType.CD_DESERIALIZED);
+
+    assertEquals(2, getObjectSizerInvocations(vm1));
+
+    long finalEvictionSize0 = getSizeFromEvictionStats(vm1);
+    long finalEvictionSize1 = getSizeFromEvictionStats(vm2);
+    assertEquals(5, finalEvictionSize0 - origEvictionSize0);
+    if (shouldSizeChange) {
+      assertEquals(1, getObjectSizerInvocations(vm2));
+      // I'm not sure what the change in size should be, because we went
+      // from serialized to deserialized
+      assertTrue(finalEvictionSize1 - origEvictionSize1 != 0);
+    } else {
+      // we invoke the sizer once when we deserialize the original to apply 
the delta to it
+      assertEquals(0, getObjectSizerInvocations(vm2));
+      assertEquals(0, finalEvictionSize1 - origEvictionSize1);
+    }
+  }
+
+  private void doPRNoLRUDeltaTest(boolean shouldSizeChange) {
+    VM vm1 = server1.getVM();
+    VM vm2 = server2.getVM();
+
+    createPR(server1);
+    createPR(server2);
+
+    TestDelta delta1 = new TestDelta(false, SMALLER_DELTA_DATA, 
shouldSizeChange);
+    put(vm1, delta1);
+    long origPRSize0 = getSizeFromPRStats(vm1);
+    long origPRSize1 = getSizeFromPRStats(vm2);
+
+    // Update the delta
+    delta1.info = LARGER_DELTA_DATA;
+    delta1.hasDelta = true;
+    put(vm1, delta1);
+    long finalPRSize0 = getSizeFromPRStats(vm1);
+    long finalPRSize1 = getSizeFromPRStats(vm2);
+
+    if (shouldSizeChange) {
+      // I'm not sure what the change in size should be, because we went
+      // from serialized to deserialized
+      assertTrue(finalPRSize0 - origPRSize0 != 0);
+      assertTrue(finalPRSize1 - origPRSize1 != 0);
+    } else {
+      assertEquals(0, finalPRSize0 - origPRSize0);
+      assertEquals(0, finalPRSize1 - origPRSize1);
+    }
+  }
+
+  private long getSizeFromPRStats(VM vm0) {
+    return vm0.invoke("getSizeFromPRStats", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);

Review comment:
       Please use just `Region` or `InternalRegion`. In general, always prefer 
an Interface or less specific type. I've been gradually replacing all usages of 
`LocalRegion` with `InternalRegion`. Sometimes this involves adding another 
method signature to `InternalRegion`.
   
   The hope is to eventually have no references to `LocalRegion` outside of 
itself and its factory. Then the interfaces and class can be further refactored 
(possibly split-up or redesigned with Decorator pattern).

##########
File path: geode-core/src/main/java/org/apache/geode/Delta.java
##########
@@ -53,7 +51,27 @@
    * This method throws an {@link InvalidDeltaException} when the delta in the 
{@link DataInput}
    * cannot be applied to the object. GemFire automatically handles an {@link 
InvalidDeltaException}
    * by reattempting the update by sending the full application object.
-   *
    */
   void fromDelta(DataInput in) throws IOException, InvalidDeltaException;
+
+  /**

Review comment:
       Just to be sure... do we really want to document 
`gemfire.DELTAS_RECALCULATE_SIZE` in the javadocs of a User API class? Is it 
new? I think new system properties should be documented as `geode.PROPERTY` 
instead of `gemfire.`.

##########
File path: 
geode-core/src/distributedTest/java/org/apache/geode/internal/cache/DeltaForceSizingFlagDUnitTest.java
##########
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.logging.log4j.Logger;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.DiskStoreFactory;
+import org.apache.geode.cache.EvictionAction;
+import org.apache.geode.cache.EvictionAttributes;
+import org.apache.geode.cache.InterestPolicy;
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.cache.SubscriptionAttributes;
+import org.apache.geode.cache.util.ObjectSizer;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+/**
+ * Tests the use of the per-delta "forceRecalculateSize" flag.
+ */
+
+public class DeltaForceSizingFlagDUnitTest {
+  private static final String TEST_REGION_NAME = "forceResizeTestRegionName";
+  public static final String SMALLER_DELTA_DATA = "12345";
+  public static final String LARGER_DELTA_DATA = "1234567890";
+  public static final String DELTA_KEY = "a_key";
+  public static final String RR_DISK_STORE_NAME = 
"_forceRecalculateSize_replicate_store";
+  private static final Logger logger = LogService.getLogger();
+
+  @Rule
+  public ClusterStartupRule cluster = new ClusterStartupRule();
+
+  protected MemberVM locator;
+  protected MemberVM server1;
+  protected MemberVM server2;
+
+  @Before
+  public void setup() {
+    int locatorPort;
+    locator = cluster.startLocatorVM(0);
+    locatorPort = locator.getPort();
+
+    server1 = cluster.startServerVM(1, locatorPort);
+    server2 = cluster.startServerVM(2, locatorPort);
+  }
+
+  @Test
+  public void testRRMemLRUDelta() {
+    doRRMemLRUDeltaTest(false);
+  }
+
+  @Test
+  public void testRRMemLRUDeltaAndFlag() {
+    doRRMemLRUDeltaTest(true);
+  }
+
+  @Test
+  public void testPRNoLRUDelta() {
+    doPRNoLRUDeltaTest(false);
+  }
+
+  @Test
+  public void testPRNoLRUAndFlagDelta() {
+    doPRNoLRUDeltaTest(true);
+  }
+
+  private void doRRMemLRUDeltaTest(boolean shouldSizeChange) {
+    VM vm1 = server1.getVM();
+    VM vm2 = server2.getVM();
+
+    createRR(server1);
+    createRR(server2);
+    TestDelta delta1 = new TestDelta(false, SMALLER_DELTA_DATA, 
shouldSizeChange);
+    put(vm1, delta1);
+
+    assertValueType(vm1, ValueType.RAW_VALUE);
+    assertValueType(vm2, ValueType.CD_SERIALIZED);
+    assertEquals(1, getObjectSizerInvocations(vm1));
+    assertEquals(0, getObjectSizerInvocations(vm2));
+
+    long origEvictionSize0 = getSizeFromEvictionStats(vm1);
+    long origEvictionSize1 = getSizeFromEvictionStats(vm2);
+    delta1.info = LARGER_DELTA_DATA;
+    delta1.hasDelta = true;
+    // Update the delta
+    put(vm1, delta1);
+
+    assertValueType(vm1, ValueType.RAW_VALUE);
+    assertValueType(vm2, ValueType.CD_DESERIALIZED);
+
+    assertEquals(2, getObjectSizerInvocations(vm1));
+
+    long finalEvictionSize0 = getSizeFromEvictionStats(vm1);
+    long finalEvictionSize1 = getSizeFromEvictionStats(vm2);
+    assertEquals(5, finalEvictionSize0 - origEvictionSize0);
+    if (shouldSizeChange) {
+      assertEquals(1, getObjectSizerInvocations(vm2));
+      // I'm not sure what the change in size should be, because we went
+      // from serialized to deserialized
+      assertTrue(finalEvictionSize1 - origEvictionSize1 != 0);
+    } else {
+      // we invoke the sizer once when we deserialize the original to apply 
the delta to it
+      assertEquals(0, getObjectSizerInvocations(vm2));
+      assertEquals(0, finalEvictionSize1 - origEvictionSize1);
+    }
+  }
+
+  private void doPRNoLRUDeltaTest(boolean shouldSizeChange) {
+    VM vm1 = server1.getVM();
+    VM vm2 = server2.getVM();
+
+    createPR(server1);
+    createPR(server2);
+
+    TestDelta delta1 = new TestDelta(false, SMALLER_DELTA_DATA, 
shouldSizeChange);
+    put(vm1, delta1);
+    long origPRSize0 = getSizeFromPRStats(vm1);
+    long origPRSize1 = getSizeFromPRStats(vm2);
+
+    // Update the delta
+    delta1.info = LARGER_DELTA_DATA;
+    delta1.hasDelta = true;
+    put(vm1, delta1);
+    long finalPRSize0 = getSizeFromPRStats(vm1);
+    long finalPRSize1 = getSizeFromPRStats(vm2);
+
+    if (shouldSizeChange) {
+      // I'm not sure what the change in size should be, because we went
+      // from serialized to deserialized
+      assertTrue(finalPRSize0 - origPRSize0 != 0);
+      assertTrue(finalPRSize1 - origPRSize1 != 0);
+    } else {
+      assertEquals(0, finalPRSize0 - origPRSize0);
+      assertEquals(0, finalPRSize1 - origPRSize1);
+    }
+  }
+
+  private long getSizeFromPRStats(VM vm0) {
+    return vm0.invoke("getSizeFromPRStats", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      if (region instanceof PartitionedRegion) {
+        long total = 0;
+        PartitionedRegion pr = (PartitionedRegion) region;
+        int totalNumBuckets = pr.getPartitionAttributes().getTotalNumBuckets();
+        for (int i = 0; i < totalNumBuckets; i++) {
+          total += pr.getDataStore().getBucketSize(i);
+        }
+        return total;
+      } else {
+        return 0L;
+      }
+    });
+  }
+
+  private long getSizeFromEvictionStats(VM vm0) {
+    return vm0.invoke("getSizeFromEvictionStats", () -> {
+
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      return region.getEvictionCounter();
+    });
+  }
+
+  private int getObjectSizerInvocations(VM vm0) {
+    return vm0.invoke("getObjectSizerInvocations", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      return getObjectSizerInvocations(region);
+    });
+  }
+
+  private void put(VM vm0, final Object value) {
+    vm0.invoke("Put data", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      region.put(DeltaForceSizingFlagDUnitTest.DELTA_KEY, value);
+    });
+  }
+
+  protected static int getObjectSizerInvocations(LocalRegion region) {
+    TestObjectSizer sizer = (TestObjectSizer) 
region.getEvictionAttributes().getObjectSizer();
+    int result = sizer.invocations.get();
+    logger.info("objectSizerInvocations=" + result);
+    return result;
+  }
+
+  private void createRR(MemberVM memberVM) {
+    memberVM.invoke("Create replicateRegion", () -> {

Review comment:
       I recommend removing `VM` and `MemberVM` from these methods. I realize 
it's a bit subjective but I really prefer not passing the VMs to method. I 
think this makes these methods more flexible and pushes the VM usage closer to 
the test method.
   
   From:
   ```
   public void someTest() {
     createRR(memberVM);
     ...
   }
   
   private void createRR(MemberVM memberVM) {
     memberVM.invoke("Create replicateRegion", () -> {
       Cache cache = ClusterStartupRule.getCache();
       ...
     }
   }
   ```
   To:
   ```
   public void someTest() {
     memberVM.invoke("Create replicateRegion", () -> createRR());
     // or
     memberVM.invoke("Create replicateRegion", this::createRR);
     ...
   }
   
   private void createRR() {
     Cache cache = ClusterStartupRule.getCache();
     ...
   }
   ```

##########
File path: 
geode-core/src/distributedTest/java/org/apache/geode/internal/cache/DeltaForceSizingFlagDUnitTest.java
##########
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.logging.log4j.Logger;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.DiskStoreFactory;
+import org.apache.geode.cache.EvictionAction;
+import org.apache.geode.cache.EvictionAttributes;
+import org.apache.geode.cache.InterestPolicy;
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.cache.SubscriptionAttributes;
+import org.apache.geode.cache.util.ObjectSizer;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+/**
+ * Tests the use of the per-delta "forceRecalculateSize" flag.
+ */
+
+public class DeltaForceSizingFlagDUnitTest {
+  private static final String TEST_REGION_NAME = "forceResizeTestRegionName";
+  public static final String SMALLER_DELTA_DATA = "12345";
+  public static final String LARGER_DELTA_DATA = "1234567890";
+  public static final String DELTA_KEY = "a_key";
+  public static final String RR_DISK_STORE_NAME = 
"_forceRecalculateSize_replicate_store";
+  private static final Logger logger = LogService.getLogger();
+
+  @Rule
+  public ClusterStartupRule cluster = new ClusterStartupRule();
+
+  protected MemberVM locator;
+  protected MemberVM server1;
+  protected MemberVM server2;
+
+  @Before
+  public void setup() {
+    int locatorPort;
+    locator = cluster.startLocatorVM(0);
+    locatorPort = locator.getPort();
+
+    server1 = cluster.startServerVM(1, locatorPort);
+    server2 = cluster.startServerVM(2, locatorPort);
+  }
+
+  @Test
+  public void testRRMemLRUDelta() {
+    doRRMemLRUDeltaTest(false);
+  }
+
+  @Test
+  public void testRRMemLRUDeltaAndFlag() {
+    doRRMemLRUDeltaTest(true);
+  }
+
+  @Test
+  public void testPRNoLRUDelta() {
+    doPRNoLRUDeltaTest(false);
+  }
+
+  @Test
+  public void testPRNoLRUAndFlagDelta() {
+    doPRNoLRUDeltaTest(true);
+  }
+
+  private void doRRMemLRUDeltaTest(boolean shouldSizeChange) {
+    VM vm1 = server1.getVM();
+    VM vm2 = server2.getVM();
+
+    createRR(server1);
+    createRR(server2);
+    TestDelta delta1 = new TestDelta(false, SMALLER_DELTA_DATA, 
shouldSizeChange);
+    put(vm1, delta1);
+
+    assertValueType(vm1, ValueType.RAW_VALUE);
+    assertValueType(vm2, ValueType.CD_SERIALIZED);
+    assertEquals(1, getObjectSizerInvocations(vm1));
+    assertEquals(0, getObjectSizerInvocations(vm2));
+
+    long origEvictionSize0 = getSizeFromEvictionStats(vm1);
+    long origEvictionSize1 = getSizeFromEvictionStats(vm2);
+    delta1.info = LARGER_DELTA_DATA;
+    delta1.hasDelta = true;
+    // Update the delta
+    put(vm1, delta1);
+
+    assertValueType(vm1, ValueType.RAW_VALUE);
+    assertValueType(vm2, ValueType.CD_DESERIALIZED);
+
+    assertEquals(2, getObjectSizerInvocations(vm1));
+
+    long finalEvictionSize0 = getSizeFromEvictionStats(vm1);
+    long finalEvictionSize1 = getSizeFromEvictionStats(vm2);
+    assertEquals(5, finalEvictionSize0 - origEvictionSize0);
+    if (shouldSizeChange) {
+      assertEquals(1, getObjectSizerInvocations(vm2));
+      // I'm not sure what the change in size should be, because we went
+      // from serialized to deserialized
+      assertTrue(finalEvictionSize1 - origEvictionSize1 != 0);
+    } else {
+      // we invoke the sizer once when we deserialize the original to apply 
the delta to it
+      assertEquals(0, getObjectSizerInvocations(vm2));
+      assertEquals(0, finalEvictionSize1 - origEvictionSize1);
+    }
+  }
+
+  private void doPRNoLRUDeltaTest(boolean shouldSizeChange) {
+    VM vm1 = server1.getVM();
+    VM vm2 = server2.getVM();
+
+    createPR(server1);
+    createPR(server2);
+
+    TestDelta delta1 = new TestDelta(false, SMALLER_DELTA_DATA, 
shouldSizeChange);
+    put(vm1, delta1);
+    long origPRSize0 = getSizeFromPRStats(vm1);
+    long origPRSize1 = getSizeFromPRStats(vm2);
+
+    // Update the delta
+    delta1.info = LARGER_DELTA_DATA;
+    delta1.hasDelta = true;
+    put(vm1, delta1);
+    long finalPRSize0 = getSizeFromPRStats(vm1);
+    long finalPRSize1 = getSizeFromPRStats(vm2);
+
+    if (shouldSizeChange) {
+      // I'm not sure what the change in size should be, because we went
+      // from serialized to deserialized
+      assertTrue(finalPRSize0 - origPRSize0 != 0);
+      assertTrue(finalPRSize1 - origPRSize1 != 0);
+    } else {
+      assertEquals(0, finalPRSize0 - origPRSize0);
+      assertEquals(0, finalPRSize1 - origPRSize1);
+    }
+  }
+
+  private long getSizeFromPRStats(VM vm0) {
+    return vm0.invoke("getSizeFromPRStats", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      if (region instanceof PartitionedRegion) {
+        long total = 0;
+        PartitionedRegion pr = (PartitionedRegion) region;
+        int totalNumBuckets = pr.getPartitionAttributes().getTotalNumBuckets();
+        for (int i = 0; i < totalNumBuckets; i++) {
+          total += pr.getDataStore().getBucketSize(i);
+        }
+        return total;
+      } else {
+        return 0L;
+      }
+    });
+  }
+
+  private long getSizeFromEvictionStats(VM vm0) {
+    return vm0.invoke("getSizeFromEvictionStats", () -> {
+
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      return region.getEvictionCounter();
+    });
+  }
+
+  private int getObjectSizerInvocations(VM vm0) {
+    return vm0.invoke("getObjectSizerInvocations", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      return getObjectSizerInvocations(region);
+    });
+  }
+
+  private void put(VM vm0, final Object value) {
+    vm0.invoke("Put data", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      region.put(DeltaForceSizingFlagDUnitTest.DELTA_KEY, value);
+    });
+  }
+
+  protected static int getObjectSizerInvocations(LocalRegion region) {
+    TestObjectSizer sizer = (TestObjectSizer) 
region.getEvictionAttributes().getObjectSizer();
+    int result = sizer.invocations.get();
+    logger.info("objectSizerInvocations=" + result);
+    return result;
+  }
+
+  private void createRR(MemberVM memberVM) {
+    memberVM.invoke("Create replicateRegion", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      RegionFactory<Integer, TestDelta> regionFactory = 
cache.createRegionFactory();
+      regionFactory.setDiskSynchronous(true);
+      regionFactory.setDataPolicy(DataPolicy.REPLICATE);
+      regionFactory.setScope(Scope.DISTRIBUTED_ACK);
+      
regionFactory.setEvictionAttributes(EvictionAttributes.createLRUMemoryAttributes(1,
+          new TestObjectSizer(), EvictionAction.OVERFLOW_TO_DISK));
+
+      DiskStoreFactory diskStoreFactory = cache.createDiskStoreFactory();
+      diskStoreFactory.setDiskDirs(getMyDiskDirs());
+      diskStoreFactory.create(RR_DISK_STORE_NAME);
+      regionFactory.setDiskStoreName(RR_DISK_STORE_NAME);
+
+      regionFactory.create(TEST_REGION_NAME);
+    });
+  }
+
+  private void assertValueType(VM vm, final ValueType expectedType) {
+    vm.invoke("assertValueType", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      Object value = 
region.getValueInVM(DeltaForceSizingFlagDUnitTest.DELTA_KEY);
+      switch (expectedType) {
+        case RAW_VALUE:
+          assertFalse("Value was " + value + " type " + value.getClass(),
+              (value instanceof CachedDeserializable));
+          break;
+        case CD_SERIALIZED:
+          assertTrue("Value was " + value + " type " + value.getClass(),
+              value instanceof CachedDeserializable);
+          assertTrue("Value not serialized",
+              ((CachedDeserializable) value).getValue() instanceof byte[]);
+          break;
+        case CD_DESERIALIZED:
+          assertTrue("Value was " + value + " type " + value.getClass(),
+              value instanceof CachedDeserializable);
+          assertFalse("Value was serialized",
+              (((CachedDeserializable) value).getValue() instanceof byte[]));
+          break;
+        case EVICTED:
+          assertNull(value);
+          break;
+      }
+    });
+  }
+
+  private static File[] getMyDiskDirs() {

Review comment:
       This is creating random directories on the HD. It would be better to use 
`SerializableTemporaryFolder` Rule (from geode-junit). The Rule will clean it 
up for you and you can delete any tearDown code that would delete these dirs.

##########
File path: 
geode-core/src/distributedTest/java/org/apache/geode/internal/cache/DeltaForceSizingFlagDUnitTest.java
##########
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.logging.log4j.Logger;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.DiskStoreFactory;
+import org.apache.geode.cache.EvictionAction;
+import org.apache.geode.cache.EvictionAttributes;
+import org.apache.geode.cache.InterestPolicy;
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.cache.SubscriptionAttributes;
+import org.apache.geode.cache.util.ObjectSizer;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+/**
+ * Tests the use of the per-delta "forceRecalculateSize" flag.
+ */
+
+public class DeltaForceSizingFlagDUnitTest {
+  private static final String TEST_REGION_NAME = "forceResizeTestRegionName";
+  public static final String SMALLER_DELTA_DATA = "12345";
+  public static final String LARGER_DELTA_DATA = "1234567890";
+  public static final String DELTA_KEY = "a_key";
+  public static final String RR_DISK_STORE_NAME = 
"_forceRecalculateSize_replicate_store";
+  private static final Logger logger = LogService.getLogger();
+
+  @Rule
+  public ClusterStartupRule cluster = new ClusterStartupRule();
+
+  protected MemberVM locator;
+  protected MemberVM server1;
+  protected MemberVM server2;
+
+  @Before
+  public void setup() {
+    int locatorPort;
+    locator = cluster.startLocatorVM(0);
+    locatorPort = locator.getPort();
+
+    server1 = cluster.startServerVM(1, locatorPort);
+    server2 = cluster.startServerVM(2, locatorPort);
+  }
+
+  @Test
+  public void testRRMemLRUDelta() {
+    doRRMemLRUDeltaTest(false);
+  }
+
+  @Test
+  public void testRRMemLRUDeltaAndFlag() {
+    doRRMemLRUDeltaTest(true);
+  }
+
+  @Test
+  public void testPRNoLRUDelta() {
+    doPRNoLRUDeltaTest(false);
+  }
+
+  @Test
+  public void testPRNoLRUAndFlagDelta() {
+    doPRNoLRUDeltaTest(true);
+  }
+
+  private void doRRMemLRUDeltaTest(boolean shouldSizeChange) {
+    VM vm1 = server1.getVM();
+    VM vm2 = server2.getVM();
+
+    createRR(server1);
+    createRR(server2);
+    TestDelta delta1 = new TestDelta(false, SMALLER_DELTA_DATA, 
shouldSizeChange);
+    put(vm1, delta1);
+
+    assertValueType(vm1, ValueType.RAW_VALUE);
+    assertValueType(vm2, ValueType.CD_SERIALIZED);
+    assertEquals(1, getObjectSizerInvocations(vm1));
+    assertEquals(0, getObjectSizerInvocations(vm2));
+
+    long origEvictionSize0 = getSizeFromEvictionStats(vm1);
+    long origEvictionSize1 = getSizeFromEvictionStats(vm2);
+    delta1.info = LARGER_DELTA_DATA;
+    delta1.hasDelta = true;
+    // Update the delta
+    put(vm1, delta1);
+
+    assertValueType(vm1, ValueType.RAW_VALUE);
+    assertValueType(vm2, ValueType.CD_DESERIALIZED);
+
+    assertEquals(2, getObjectSizerInvocations(vm1));
+
+    long finalEvictionSize0 = getSizeFromEvictionStats(vm1);
+    long finalEvictionSize1 = getSizeFromEvictionStats(vm2);
+    assertEquals(5, finalEvictionSize0 - origEvictionSize0);
+    if (shouldSizeChange) {
+      assertEquals(1, getObjectSizerInvocations(vm2));
+      // I'm not sure what the change in size should be, because we went
+      // from serialized to deserialized
+      assertTrue(finalEvictionSize1 - origEvictionSize1 != 0);
+    } else {
+      // we invoke the sizer once when we deserialize the original to apply 
the delta to it
+      assertEquals(0, getObjectSizerInvocations(vm2));
+      assertEquals(0, finalEvictionSize1 - origEvictionSize1);
+    }
+  }
+
+  private void doPRNoLRUDeltaTest(boolean shouldSizeChange) {
+    VM vm1 = server1.getVM();
+    VM vm2 = server2.getVM();
+
+    createPR(server1);
+    createPR(server2);
+
+    TestDelta delta1 = new TestDelta(false, SMALLER_DELTA_DATA, 
shouldSizeChange);
+    put(vm1, delta1);
+    long origPRSize0 = getSizeFromPRStats(vm1);
+    long origPRSize1 = getSizeFromPRStats(vm2);
+
+    // Update the delta
+    delta1.info = LARGER_DELTA_DATA;
+    delta1.hasDelta = true;
+    put(vm1, delta1);
+    long finalPRSize0 = getSizeFromPRStats(vm1);
+    long finalPRSize1 = getSizeFromPRStats(vm2);
+
+    if (shouldSizeChange) {
+      // I'm not sure what the change in size should be, because we went
+      // from serialized to deserialized
+      assertTrue(finalPRSize0 - origPRSize0 != 0);
+      assertTrue(finalPRSize1 - origPRSize1 != 0);
+    } else {
+      assertEquals(0, finalPRSize0 - origPRSize0);
+      assertEquals(0, finalPRSize1 - origPRSize1);
+    }
+  }
+
+  private long getSizeFromPRStats(VM vm0) {
+    return vm0.invoke("getSizeFromPRStats", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      if (region instanceof PartitionedRegion) {
+        long total = 0;
+        PartitionedRegion pr = (PartitionedRegion) region;
+        int totalNumBuckets = pr.getPartitionAttributes().getTotalNumBuckets();
+        for (int i = 0; i < totalNumBuckets; i++) {
+          total += pr.getDataStore().getBucketSize(i);
+        }
+        return total;
+      } else {
+        return 0L;
+      }
+    });
+  }
+
+  private long getSizeFromEvictionStats(VM vm0) {
+    return vm0.invoke("getSizeFromEvictionStats", () -> {
+
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      return region.getEvictionCounter();
+    });
+  }
+
+  private int getObjectSizerInvocations(VM vm0) {
+    return vm0.invoke("getObjectSizerInvocations", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      return getObjectSizerInvocations(region);
+    });
+  }
+
+  private void put(VM vm0, final Object value) {
+    vm0.invoke("Put data", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      region.put(DeltaForceSizingFlagDUnitTest.DELTA_KEY, value);
+    });
+  }
+
+  protected static int getObjectSizerInvocations(LocalRegion region) {
+    TestObjectSizer sizer = (TestObjectSizer) 
region.getEvictionAttributes().getObjectSizer();
+    int result = sizer.invocations.get();
+    logger.info("objectSizerInvocations=" + result);
+    return result;
+  }
+
+  private void createRR(MemberVM memberVM) {
+    memberVM.invoke("Create replicateRegion", () -> {

Review comment:
       This approach also makes it easier to change back and forth between 
static and non-static -- lots of DUnit tests such as the WAN tests have 
everything static which makes it harder to transition from extending TestCase 
classes to using JUnit Rules.

##########
File path: 
geode-core/src/distributedTest/java/org/apache/geode/internal/cache/DeltaForceSizingFlagDUnitTest.java
##########
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.logging.log4j.Logger;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.DiskStoreFactory;
+import org.apache.geode.cache.EvictionAction;
+import org.apache.geode.cache.EvictionAttributes;
+import org.apache.geode.cache.InterestPolicy;
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.cache.SubscriptionAttributes;
+import org.apache.geode.cache.util.ObjectSizer;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+/**
+ * Tests the use of the per-delta "forceRecalculateSize" flag.
+ */
+
+public class DeltaForceSizingFlagDUnitTest {
+  private static final String TEST_REGION_NAME = "forceResizeTestRegionName";
+  public static final String SMALLER_DELTA_DATA = "12345";
+  public static final String LARGER_DELTA_DATA = "1234567890";
+  public static final String DELTA_KEY = "a_key";
+  public static final String RR_DISK_STORE_NAME = 
"_forceRecalculateSize_replicate_store";
+  private static final Logger logger = LogService.getLogger();
+
+  @Rule
+  public ClusterStartupRule cluster = new ClusterStartupRule();
+
+  protected MemberVM locator;
+  protected MemberVM server1;
+  protected MemberVM server2;
+
+  @Before
+  public void setup() {
+    int locatorPort;
+    locator = cluster.startLocatorVM(0);
+    locatorPort = locator.getPort();
+
+    server1 = cluster.startServerVM(1, locatorPort);
+    server2 = cluster.startServerVM(2, locatorPort);
+  }
+
+  @Test
+  public void testRRMemLRUDelta() {
+    doRRMemLRUDeltaTest(false);
+  }
+
+  @Test
+  public void testRRMemLRUDeltaAndFlag() {
+    doRRMemLRUDeltaTest(true);
+  }
+
+  @Test
+  public void testPRNoLRUDelta() {
+    doPRNoLRUDeltaTest(false);
+  }
+
+  @Test
+  public void testPRNoLRUAndFlagDelta() {
+    doPRNoLRUDeltaTest(true);
+  }
+
+  private void doRRMemLRUDeltaTest(boolean shouldSizeChange) {
+    VM vm1 = server1.getVM();
+    VM vm2 = server2.getVM();
+
+    createRR(server1);
+    createRR(server2);
+    TestDelta delta1 = new TestDelta(false, SMALLER_DELTA_DATA, 
shouldSizeChange);
+    put(vm1, delta1);
+
+    assertValueType(vm1, ValueType.RAW_VALUE);
+    assertValueType(vm2, ValueType.CD_SERIALIZED);
+    assertEquals(1, getObjectSizerInvocations(vm1));
+    assertEquals(0, getObjectSizerInvocations(vm2));
+
+    long origEvictionSize0 = getSizeFromEvictionStats(vm1);
+    long origEvictionSize1 = getSizeFromEvictionStats(vm2);
+    delta1.info = LARGER_DELTA_DATA;
+    delta1.hasDelta = true;
+    // Update the delta
+    put(vm1, delta1);
+
+    assertValueType(vm1, ValueType.RAW_VALUE);
+    assertValueType(vm2, ValueType.CD_DESERIALIZED);
+
+    assertEquals(2, getObjectSizerInvocations(vm1));
+
+    long finalEvictionSize0 = getSizeFromEvictionStats(vm1);
+    long finalEvictionSize1 = getSizeFromEvictionStats(vm2);
+    assertEquals(5, finalEvictionSize0 - origEvictionSize0);
+    if (shouldSizeChange) {
+      assertEquals(1, getObjectSizerInvocations(vm2));
+      // I'm not sure what the change in size should be, because we went
+      // from serialized to deserialized
+      assertTrue(finalEvictionSize1 - origEvictionSize1 != 0);
+    } else {
+      // we invoke the sizer once when we deserialize the original to apply 
the delta to it
+      assertEquals(0, getObjectSizerInvocations(vm2));
+      assertEquals(0, finalEvictionSize1 - origEvictionSize1);
+    }
+  }
+
+  private void doPRNoLRUDeltaTest(boolean shouldSizeChange) {
+    VM vm1 = server1.getVM();
+    VM vm2 = server2.getVM();
+
+    createPR(server1);
+    createPR(server2);
+
+    TestDelta delta1 = new TestDelta(false, SMALLER_DELTA_DATA, 
shouldSizeChange);
+    put(vm1, delta1);
+    long origPRSize0 = getSizeFromPRStats(vm1);
+    long origPRSize1 = getSizeFromPRStats(vm2);
+
+    // Update the delta
+    delta1.info = LARGER_DELTA_DATA;
+    delta1.hasDelta = true;
+    put(vm1, delta1);
+    long finalPRSize0 = getSizeFromPRStats(vm1);
+    long finalPRSize1 = getSizeFromPRStats(vm2);
+
+    if (shouldSizeChange) {
+      // I'm not sure what the change in size should be, because we went
+      // from serialized to deserialized
+      assertTrue(finalPRSize0 - origPRSize0 != 0);
+      assertTrue(finalPRSize1 - origPRSize1 != 0);
+    } else {
+      assertEquals(0, finalPRSize0 - origPRSize0);
+      assertEquals(0, finalPRSize1 - origPRSize1);
+    }
+  }
+
+  private long getSizeFromPRStats(VM vm0) {
+    return vm0.invoke("getSizeFromPRStats", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      if (region instanceof PartitionedRegion) {
+        long total = 0;
+        PartitionedRegion pr = (PartitionedRegion) region;
+        int totalNumBuckets = pr.getPartitionAttributes().getTotalNumBuckets();
+        for (int i = 0; i < totalNumBuckets; i++) {
+          total += pr.getDataStore().getBucketSize(i);
+        }
+        return total;
+      } else {
+        return 0L;
+      }
+    });
+  }
+
+  private long getSizeFromEvictionStats(VM vm0) {
+    return vm0.invoke("getSizeFromEvictionStats", () -> {
+
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      return region.getEvictionCounter();
+    });
+  }
+
+  private int getObjectSizerInvocations(VM vm0) {
+    return vm0.invoke("getObjectSizerInvocations", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      return getObjectSizerInvocations(region);
+    });
+  }
+
+  private void put(VM vm0, final Object value) {
+    vm0.invoke("Put data", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      region.put(DeltaForceSizingFlagDUnitTest.DELTA_KEY, value);
+    });
+  }
+
+  protected static int getObjectSizerInvocations(LocalRegion region) {
+    TestObjectSizer sizer = (TestObjectSizer) 
region.getEvictionAttributes().getObjectSizer();
+    int result = sizer.invocations.get();
+    logger.info("objectSizerInvocations=" + result);
+    return result;
+  }
+
+  private void createRR(MemberVM memberVM) {
+    memberVM.invoke("Create replicateRegion", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      RegionFactory<Integer, TestDelta> regionFactory = 
cache.createRegionFactory();
+      regionFactory.setDiskSynchronous(true);
+      regionFactory.setDataPolicy(DataPolicy.REPLICATE);
+      regionFactory.setScope(Scope.DISTRIBUTED_ACK);
+      
regionFactory.setEvictionAttributes(EvictionAttributes.createLRUMemoryAttributes(1,
+          new TestObjectSizer(), EvictionAction.OVERFLOW_TO_DISK));
+
+      DiskStoreFactory diskStoreFactory = cache.createDiskStoreFactory();
+      diskStoreFactory.setDiskDirs(getMyDiskDirs());
+      diskStoreFactory.create(RR_DISK_STORE_NAME);
+      regionFactory.setDiskStoreName(RR_DISK_STORE_NAME);
+
+      regionFactory.create(TEST_REGION_NAME);
+    });
+  }
+
+  private void assertValueType(VM vm, final ValueType expectedType) {
+    vm.invoke("assertValueType", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      Object value = 
region.getValueInVM(DeltaForceSizingFlagDUnitTest.DELTA_KEY);
+      switch (expectedType) {
+        case RAW_VALUE:
+          assertFalse("Value was " + value + " type " + value.getClass(),

Review comment:
       Converting these to AssertJ, will end up a nice simple syntax:
   ```
       assertThat(value).isInstanceOf(CachedDeserializable.class);
   ```
   And the failure message will include the value and its type, so it does a 
lot of work for you.

##########
File path: 
geode-core/src/distributedTest/java/org/apache/geode/internal/cache/DeltaForceSizingFlagDUnitTest.java
##########
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.logging.log4j.Logger;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.DiskStoreFactory;
+import org.apache.geode.cache.EvictionAction;
+import org.apache.geode.cache.EvictionAttributes;
+import org.apache.geode.cache.InterestPolicy;
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.cache.SubscriptionAttributes;
+import org.apache.geode.cache.util.ObjectSizer;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+/**
+ * Tests the use of the per-delta "forceRecalculateSize" flag.
+ */
+
+public class DeltaForceSizingFlagDUnitTest {
+  private static final String TEST_REGION_NAME = "forceResizeTestRegionName";
+  public static final String SMALLER_DELTA_DATA = "12345";
+  public static final String LARGER_DELTA_DATA = "1234567890";
+  public static final String DELTA_KEY = "a_key";
+  public static final String RR_DISK_STORE_NAME = 
"_forceRecalculateSize_replicate_store";
+  private static final Logger logger = LogService.getLogger();
+
+  @Rule
+  public ClusterStartupRule cluster = new ClusterStartupRule();
+
+  protected MemberVM locator;
+  protected MemberVM server1;
+  protected MemberVM server2;
+
+  @Before
+  public void setup() {
+    int locatorPort;
+    locator = cluster.startLocatorVM(0);
+    locatorPort = locator.getPort();
+
+    server1 = cluster.startServerVM(1, locatorPort);
+    server2 = cluster.startServerVM(2, locatorPort);
+  }
+
+  @Test
+  public void testRRMemLRUDelta() {
+    doRRMemLRUDeltaTest(false);
+  }
+
+  @Test
+  public void testRRMemLRUDeltaAndFlag() {
+    doRRMemLRUDeltaTest(true);
+  }
+
+  @Test
+  public void testPRNoLRUDelta() {
+    doPRNoLRUDeltaTest(false);
+  }
+
+  @Test
+  public void testPRNoLRUAndFlagDelta() {
+    doPRNoLRUDeltaTest(true);
+  }
+
+  private void doRRMemLRUDeltaTest(boolean shouldSizeChange) {
+    VM vm1 = server1.getVM();
+    VM vm2 = server2.getVM();
+
+    createRR(server1);
+    createRR(server2);
+    TestDelta delta1 = new TestDelta(false, SMALLER_DELTA_DATA, 
shouldSizeChange);
+    put(vm1, delta1);
+
+    assertValueType(vm1, ValueType.RAW_VALUE);
+    assertValueType(vm2, ValueType.CD_SERIALIZED);
+    assertEquals(1, getObjectSizerInvocations(vm1));
+    assertEquals(0, getObjectSizerInvocations(vm2));
+
+    long origEvictionSize0 = getSizeFromEvictionStats(vm1);
+    long origEvictionSize1 = getSizeFromEvictionStats(vm2);
+    delta1.info = LARGER_DELTA_DATA;
+    delta1.hasDelta = true;
+    // Update the delta
+    put(vm1, delta1);
+
+    assertValueType(vm1, ValueType.RAW_VALUE);
+    assertValueType(vm2, ValueType.CD_DESERIALIZED);
+
+    assertEquals(2, getObjectSizerInvocations(vm1));
+
+    long finalEvictionSize0 = getSizeFromEvictionStats(vm1);
+    long finalEvictionSize1 = getSizeFromEvictionStats(vm2);
+    assertEquals(5, finalEvictionSize0 - origEvictionSize0);
+    if (shouldSizeChange) {
+      assertEquals(1, getObjectSizerInvocations(vm2));
+      // I'm not sure what the change in size should be, because we went
+      // from serialized to deserialized
+      assertTrue(finalEvictionSize1 - origEvictionSize1 != 0);
+    } else {
+      // we invoke the sizer once when we deserialize the original to apply 
the delta to it
+      assertEquals(0, getObjectSizerInvocations(vm2));
+      assertEquals(0, finalEvictionSize1 - origEvictionSize1);
+    }
+  }
+
+  private void doPRNoLRUDeltaTest(boolean shouldSizeChange) {
+    VM vm1 = server1.getVM();
+    VM vm2 = server2.getVM();
+
+    createPR(server1);
+    createPR(server2);
+
+    TestDelta delta1 = new TestDelta(false, SMALLER_DELTA_DATA, 
shouldSizeChange);
+    put(vm1, delta1);
+    long origPRSize0 = getSizeFromPRStats(vm1);
+    long origPRSize1 = getSizeFromPRStats(vm2);
+
+    // Update the delta
+    delta1.info = LARGER_DELTA_DATA;
+    delta1.hasDelta = true;
+    put(vm1, delta1);
+    long finalPRSize0 = getSizeFromPRStats(vm1);
+    long finalPRSize1 = getSizeFromPRStats(vm2);
+
+    if (shouldSizeChange) {
+      // I'm not sure what the change in size should be, because we went
+      // from serialized to deserialized
+      assertTrue(finalPRSize0 - origPRSize0 != 0);
+      assertTrue(finalPRSize1 - origPRSize1 != 0);
+    } else {
+      assertEquals(0, finalPRSize0 - origPRSize0);
+      assertEquals(0, finalPRSize1 - origPRSize1);
+    }
+  }
+
+  private long getSizeFromPRStats(VM vm0) {
+    return vm0.invoke("getSizeFromPRStats", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);

Review comment:
       I would leave out all of these `assertNotNull(cache)` assertions. It's 
commonly recommended to just let code throw exceptions.
   
   More history on this in our code base: many of the previous devs on this 
code used `assertNotNull` very gratuitously because they didn't trust the code 
including `getCache` and `getSystem` in the original DUnit TestCase classes. 
Doing this is actually an Anti-Pattern in JUnit (and other frameworks).

##########
File path: 
geode-core/src/distributedTest/java/org/apache/geode/internal/cache/DeltaForceSizingFlagDUnitTest.java
##########
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.logging.log4j.Logger;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.DiskStoreFactory;
+import org.apache.geode.cache.EvictionAction;
+import org.apache.geode.cache.EvictionAttributes;
+import org.apache.geode.cache.InterestPolicy;
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.cache.SubscriptionAttributes;
+import org.apache.geode.cache.util.ObjectSizer;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+/**
+ * Tests the use of the per-delta "forceRecalculateSize" flag.
+ */
+
+public class DeltaForceSizingFlagDUnitTest {
+  private static final String TEST_REGION_NAME = "forceResizeTestRegionName";
+  public static final String SMALLER_DELTA_DATA = "12345";
+  public static final String LARGER_DELTA_DATA = "1234567890";
+  public static final String DELTA_KEY = "a_key";
+  public static final String RR_DISK_STORE_NAME = 
"_forceRecalculateSize_replicate_store";
+  private static final Logger logger = LogService.getLogger();
+
+  @Rule
+  public ClusterStartupRule cluster = new ClusterStartupRule();
+
+  protected MemberVM locator;
+  protected MemberVM server1;
+  protected MemberVM server2;
+
+  @Before
+  public void setup() {
+    int locatorPort;
+    locator = cluster.startLocatorVM(0);
+    locatorPort = locator.getPort();
+
+    server1 = cluster.startServerVM(1, locatorPort);
+    server2 = cluster.startServerVM(2, locatorPort);
+  }
+
+  @Test
+  public void testRRMemLRUDelta() {
+    doRRMemLRUDeltaTest(false);
+  }
+
+  @Test
+  public void testRRMemLRUDeltaAndFlag() {
+    doRRMemLRUDeltaTest(true);
+  }
+
+  @Test
+  public void testPRNoLRUDelta() {
+    doPRNoLRUDeltaTest(false);
+  }
+
+  @Test
+  public void testPRNoLRUAndFlagDelta() {
+    doPRNoLRUDeltaTest(true);
+  }
+
+  private void doRRMemLRUDeltaTest(boolean shouldSizeChange) {
+    VM vm1 = server1.getVM();
+    VM vm2 = server2.getVM();
+
+    createRR(server1);
+    createRR(server2);
+    TestDelta delta1 = new TestDelta(false, SMALLER_DELTA_DATA, 
shouldSizeChange);
+    put(vm1, delta1);
+
+    assertValueType(vm1, ValueType.RAW_VALUE);
+    assertValueType(vm2, ValueType.CD_SERIALIZED);
+    assertEquals(1, getObjectSizerInvocations(vm1));
+    assertEquals(0, getObjectSizerInvocations(vm2));
+
+    long origEvictionSize0 = getSizeFromEvictionStats(vm1);
+    long origEvictionSize1 = getSizeFromEvictionStats(vm2);
+    delta1.info = LARGER_DELTA_DATA;
+    delta1.hasDelta = true;
+    // Update the delta
+    put(vm1, delta1);
+
+    assertValueType(vm1, ValueType.RAW_VALUE);
+    assertValueType(vm2, ValueType.CD_DESERIALIZED);
+
+    assertEquals(2, getObjectSizerInvocations(vm1));
+
+    long finalEvictionSize0 = getSizeFromEvictionStats(vm1);
+    long finalEvictionSize1 = getSizeFromEvictionStats(vm2);
+    assertEquals(5, finalEvictionSize0 - origEvictionSize0);
+    if (shouldSizeChange) {
+      assertEquals(1, getObjectSizerInvocations(vm2));
+      // I'm not sure what the change in size should be, because we went
+      // from serialized to deserialized
+      assertTrue(finalEvictionSize1 - origEvictionSize1 != 0);
+    } else {
+      // we invoke the sizer once when we deserialize the original to apply 
the delta to it
+      assertEquals(0, getObjectSizerInvocations(vm2));
+      assertEquals(0, finalEvictionSize1 - origEvictionSize1);
+    }
+  }
+
+  private void doPRNoLRUDeltaTest(boolean shouldSizeChange) {
+    VM vm1 = server1.getVM();
+    VM vm2 = server2.getVM();
+
+    createPR(server1);
+    createPR(server2);
+
+    TestDelta delta1 = new TestDelta(false, SMALLER_DELTA_DATA, 
shouldSizeChange);
+    put(vm1, delta1);
+    long origPRSize0 = getSizeFromPRStats(vm1);
+    long origPRSize1 = getSizeFromPRStats(vm2);
+
+    // Update the delta
+    delta1.info = LARGER_DELTA_DATA;
+    delta1.hasDelta = true;
+    put(vm1, delta1);
+    long finalPRSize0 = getSizeFromPRStats(vm1);
+    long finalPRSize1 = getSizeFromPRStats(vm2);
+
+    if (shouldSizeChange) {
+      // I'm not sure what the change in size should be, because we went
+      // from serialized to deserialized
+      assertTrue(finalPRSize0 - origPRSize0 != 0);
+      assertTrue(finalPRSize1 - origPRSize1 != 0);
+    } else {
+      assertEquals(0, finalPRSize0 - origPRSize0);
+      assertEquals(0, finalPRSize1 - origPRSize1);
+    }
+  }
+
+  private long getSizeFromPRStats(VM vm0) {
+    return vm0.invoke("getSizeFromPRStats", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      if (region instanceof PartitionedRegion) {
+        long total = 0;
+        PartitionedRegion pr = (PartitionedRegion) region;
+        int totalNumBuckets = pr.getPartitionAttributes().getTotalNumBuckets();
+        for (int i = 0; i < totalNumBuckets; i++) {
+          total += pr.getDataStore().getBucketSize(i);
+        }
+        return total;
+      } else {
+        return 0L;
+      }
+    });
+  }
+
+  private long getSizeFromEvictionStats(VM vm0) {
+    return vm0.invoke("getSizeFromEvictionStats", () -> {
+
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      return region.getEvictionCounter();
+    });
+  }
+
+  private int getObjectSizerInvocations(VM vm0) {
+    return vm0.invoke("getObjectSizerInvocations", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      return getObjectSizerInvocations(region);
+    });
+  }
+
+  private void put(VM vm0, final Object value) {
+    vm0.invoke("Put data", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      region.put(DeltaForceSizingFlagDUnitTest.DELTA_KEY, value);
+    });
+  }
+
+  protected static int getObjectSizerInvocations(LocalRegion region) {
+    TestObjectSizer sizer = (TestObjectSizer) 
region.getEvictionAttributes().getObjectSizer();
+    int result = sizer.invocations.get();
+    logger.info("objectSizerInvocations=" + result);
+    return result;
+  }
+
+  private void createRR(MemberVM memberVM) {
+    memberVM.invoke("Create replicateRegion", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      RegionFactory<Integer, TestDelta> regionFactory = 
cache.createRegionFactory();
+      regionFactory.setDiskSynchronous(true);
+      regionFactory.setDataPolicy(DataPolicy.REPLICATE);
+      regionFactory.setScope(Scope.DISTRIBUTED_ACK);
+      
regionFactory.setEvictionAttributes(EvictionAttributes.createLRUMemoryAttributes(1,
+          new TestObjectSizer(), EvictionAction.OVERFLOW_TO_DISK));
+
+      DiskStoreFactory diskStoreFactory = cache.createDiskStoreFactory();
+      diskStoreFactory.setDiskDirs(getMyDiskDirs());
+      diskStoreFactory.create(RR_DISK_STORE_NAME);
+      regionFactory.setDiskStoreName(RR_DISK_STORE_NAME);
+
+      regionFactory.create(TEST_REGION_NAME);
+    });
+  }
+
+  private void assertValueType(VM vm, final ValueType expectedType) {
+    vm.invoke("assertValueType", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      Object value = 
region.getValueInVM(DeltaForceSizingFlagDUnitTest.DELTA_KEY);
+      switch (expectedType) {
+        case RAW_VALUE:
+          assertFalse("Value was " + value + " type " + value.getClass(),
+              (value instanceof CachedDeserializable));
+          break;
+        case CD_SERIALIZED:
+          assertTrue("Value was " + value + " type " + value.getClass(),
+              value instanceof CachedDeserializable);
+          assertTrue("Value not serialized",
+              ((CachedDeserializable) value).getValue() instanceof byte[]);
+          break;
+        case CD_DESERIALIZED:
+          assertTrue("Value was " + value + " type " + value.getClass(),
+              value instanceof CachedDeserializable);
+          assertFalse("Value was serialized",
+              (((CachedDeserializable) value).getValue() instanceof byte[]));
+          break;
+        case EVICTED:
+          assertNull(value);
+          break;
+      }
+    });
+  }
+
+  private static File[] getMyDiskDirs() {
+    long random = new Random().nextLong();
+    File file = new File(Long.toString(random));
+    assertTrue(file.mkdirs());
+    return new File[] {file};
+  }
+
+  private void createPR(MemberVM memberVM) {
+    memberVM.invoke("Create partitioned region", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+
+      RegionFactory<Integer, TestDelta> regionFactory = 
cache.createRegionFactory();

Review comment:
       These create region type methods are cleaner if you group the code by 
the factories. This becomes important in some tests that have a LOT of code to 
define a Region:
   ```
       PartitionAttributesFactory<Integer, TestDelta> partitionFactory = 
           new PartitionAttributesFactory<>();
       partitionFactory.setRedundantCopies(1);
   
       RegionFactory<Integer, TestDelta> regionFactory = 
cache.createRegionFactory();
       regionFactory.setDataPolicy(DataPolicy.PARTITION);
       regionFactory.setDiskSynchronous(true);
       regionFactory.setPartitionAttributes(partitionFactory.create());
       regionFactory.setSubscriptionAttributes(new 
SubscriptionAttributes(InterestPolicy.ALL));
       
       regionFactory.create(TEST_REGION_NAME);
   ```
   I alphabetize the parameter setters so it's easier to search through if it's 
a long list of attributes being set. I also inline the creation of supporting 
attributes like I did above for partitionFactory.
   
   Also, you might want to switch to using `RegionShortcut` and 
`ClientRegionShortcut` in tests as these are the newer preferred User APIs for 
creating regions:
   ```
       RegionFactory<Integer, TestDelta> regionFactory = 
           cache.createRegionFactory(RegionShortcut.PARTITION);
       regionFactory.setDiskSynchronous(true);
       regionFactory.setPartitionAttributes(partitionFactory.create());
       regionFactory.setSubscriptionAttributes(new 
SubscriptionAttributes(InterestPolicy.ALL));
   ```
   None of this is necessary... just sharing some of what I do in tests.

##########
File path: geode-core/src/main/java/org/apache/geode/Delta.java
##########
@@ -53,7 +51,27 @@
    * This method throws an {@link InvalidDeltaException} when the delta in the 
{@link DataInput}
    * cannot be applied to the object. GemFire automatically handles an {@link 
InvalidDeltaException}
    * by reattempting the update by sending the full application object.
-   *
    */
   void fromDelta(DataInput in) throws IOException, InvalidDeltaException;
+
+  /**

Review comment:
       The JDK classes use a single unclosed `<p>` at the start of each 
paragraph in javadocs:
   ```
   /**
      * By default, entry sizes are not recalculated when deltas are applied. 
This optimizes for the
      * case where the size of an entry does not change. However, if an entry 
size does increase or
      * decrease, this default behavior can result in the memory usage 
statistics becoming inaccurate.
      * These are used to monitor the health of Geode instances, and for 
balancing memory usage across
      * partitioned regions.
      *
      * <p>
      * There is a system property, gemfire.DELTAS_RECALCULATE_SIZE, which can 
be used to cause all
      * deltas to trigger entry size recalculation when deltas are applied. By 
default, this is set
      * to 'false' because of potential performance impacts when every delta 
triggers a recalculation.
      *
      * <p>
      * To allow entry size recalculation on a per-delta basis, classes that 
extend the Delta interface
      * should override this method to return 'true'. This may impact 
performance of specific delta
      * types, but will not globally affect the performance of other Geode 
delta operations.
      *
      * @since 1.14
      */
    ```  

##########
File path: 
geode-core/src/distributedTest/java/org/apache/geode/internal/cache/DeltaForceSizingFlagDUnitTest.java
##########
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.logging.log4j.Logger;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.DiskStoreFactory;
+import org.apache.geode.cache.EvictionAction;
+import org.apache.geode.cache.EvictionAttributes;
+import org.apache.geode.cache.InterestPolicy;
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.cache.SubscriptionAttributes;
+import org.apache.geode.cache.util.ObjectSizer;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+/**
+ * Tests the use of the per-delta "forceRecalculateSize" flag.
+ */
+
+public class DeltaForceSizingFlagDUnitTest {
+  private static final String TEST_REGION_NAME = "forceResizeTestRegionName";
+  public static final String SMALLER_DELTA_DATA = "12345";
+  public static final String LARGER_DELTA_DATA = "1234567890";
+  public static final String DELTA_KEY = "a_key";
+  public static final String RR_DISK_STORE_NAME = 
"_forceRecalculateSize_replicate_store";
+  private static final Logger logger = LogService.getLogger();
+
+  @Rule
+  public ClusterStartupRule cluster = new ClusterStartupRule();
+
+  protected MemberVM locator;
+  protected MemberVM server1;
+  protected MemberVM server2;
+
+  @Before
+  public void setup() {
+    int locatorPort;
+    locator = cluster.startLocatorVM(0);
+    locatorPort = locator.getPort();
+
+    server1 = cluster.startServerVM(1, locatorPort);
+    server2 = cluster.startServerVM(2, locatorPort);
+  }
+
+  @Test
+  public void testRRMemLRUDelta() {
+    doRRMemLRUDeltaTest(false);
+  }
+
+  @Test
+  public void testRRMemLRUDeltaAndFlag() {
+    doRRMemLRUDeltaTest(true);
+  }
+
+  @Test
+  public void testPRNoLRUDelta() {
+    doPRNoLRUDeltaTest(false);
+  }
+
+  @Test
+  public void testPRNoLRUAndFlagDelta() {
+    doPRNoLRUDeltaTest(true);
+  }
+
+  private void doRRMemLRUDeltaTest(boolean shouldSizeChange) {
+    VM vm1 = server1.getVM();
+    VM vm2 = server2.getVM();
+
+    createRR(server1);
+    createRR(server2);
+    TestDelta delta1 = new TestDelta(false, SMALLER_DELTA_DATA, 
shouldSizeChange);
+    put(vm1, delta1);
+
+    assertValueType(vm1, ValueType.RAW_VALUE);
+    assertValueType(vm2, ValueType.CD_SERIALIZED);
+    assertEquals(1, getObjectSizerInvocations(vm1));
+    assertEquals(0, getObjectSizerInvocations(vm2));
+
+    long origEvictionSize0 = getSizeFromEvictionStats(vm1);
+    long origEvictionSize1 = getSizeFromEvictionStats(vm2);
+    delta1.info = LARGER_DELTA_DATA;
+    delta1.hasDelta = true;
+    // Update the delta
+    put(vm1, delta1);
+
+    assertValueType(vm1, ValueType.RAW_VALUE);
+    assertValueType(vm2, ValueType.CD_DESERIALIZED);
+
+    assertEquals(2, getObjectSizerInvocations(vm1));
+
+    long finalEvictionSize0 = getSizeFromEvictionStats(vm1);
+    long finalEvictionSize1 = getSizeFromEvictionStats(vm2);
+    assertEquals(5, finalEvictionSize0 - origEvictionSize0);
+    if (shouldSizeChange) {
+      assertEquals(1, getObjectSizerInvocations(vm2));
+      // I'm not sure what the change in size should be, because we went
+      // from serialized to deserialized
+      assertTrue(finalEvictionSize1 - origEvictionSize1 != 0);
+    } else {
+      // we invoke the sizer once when we deserialize the original to apply 
the delta to it
+      assertEquals(0, getObjectSizerInvocations(vm2));
+      assertEquals(0, finalEvictionSize1 - origEvictionSize1);
+    }
+  }
+
+  private void doPRNoLRUDeltaTest(boolean shouldSizeChange) {
+    VM vm1 = server1.getVM();
+    VM vm2 = server2.getVM();
+
+    createPR(server1);
+    createPR(server2);
+
+    TestDelta delta1 = new TestDelta(false, SMALLER_DELTA_DATA, 
shouldSizeChange);
+    put(vm1, delta1);
+    long origPRSize0 = getSizeFromPRStats(vm1);
+    long origPRSize1 = getSizeFromPRStats(vm2);
+
+    // Update the delta
+    delta1.info = LARGER_DELTA_DATA;
+    delta1.hasDelta = true;
+    put(vm1, delta1);
+    long finalPRSize0 = getSizeFromPRStats(vm1);
+    long finalPRSize1 = getSizeFromPRStats(vm2);
+
+    if (shouldSizeChange) {
+      // I'm not sure what the change in size should be, because we went
+      // from serialized to deserialized
+      assertTrue(finalPRSize0 - origPRSize0 != 0);
+      assertTrue(finalPRSize1 - origPRSize1 != 0);
+    } else {
+      assertEquals(0, finalPRSize0 - origPRSize0);
+      assertEquals(0, finalPRSize1 - origPRSize1);
+    }
+  }
+
+  private long getSizeFromPRStats(VM vm0) {
+    return vm0.invoke("getSizeFromPRStats", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      if (region instanceof PartitionedRegion) {
+        long total = 0;
+        PartitionedRegion pr = (PartitionedRegion) region;
+        int totalNumBuckets = pr.getPartitionAttributes().getTotalNumBuckets();
+        for (int i = 0; i < totalNumBuckets; i++) {
+          total += pr.getDataStore().getBucketSize(i);
+        }
+        return total;
+      } else {
+        return 0L;
+      }
+    });
+  }
+
+  private long getSizeFromEvictionStats(VM vm0) {
+    return vm0.invoke("getSizeFromEvictionStats", () -> {
+
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      return region.getEvictionCounter();
+    });
+  }
+
+  private int getObjectSizerInvocations(VM vm0) {
+    return vm0.invoke("getObjectSizerInvocations", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      return getObjectSizerInvocations(region);
+    });
+  }
+
+  private void put(VM vm0, final Object value) {
+    vm0.invoke("Put data", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      region.put(DeltaForceSizingFlagDUnitTest.DELTA_KEY, value);
+    });
+  }
+
+  protected static int getObjectSizerInvocations(LocalRegion region) {
+    TestObjectSizer sizer = (TestObjectSizer) 
region.getEvictionAttributes().getObjectSizer();
+    int result = sizer.invocations.get();
+    logger.info("objectSizerInvocations=" + result);
+    return result;
+  }
+
+  private void createRR(MemberVM memberVM) {
+    memberVM.invoke("Create replicateRegion", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      RegionFactory<Integer, TestDelta> regionFactory = 
cache.createRegionFactory();
+      regionFactory.setDiskSynchronous(true);
+      regionFactory.setDataPolicy(DataPolicy.REPLICATE);
+      regionFactory.setScope(Scope.DISTRIBUTED_ACK);
+      
regionFactory.setEvictionAttributes(EvictionAttributes.createLRUMemoryAttributes(1,
+          new TestObjectSizer(), EvictionAction.OVERFLOW_TO_DISK));
+
+      DiskStoreFactory diskStoreFactory = cache.createDiskStoreFactory();
+      diskStoreFactory.setDiskDirs(getMyDiskDirs());
+      diskStoreFactory.create(RR_DISK_STORE_NAME);
+      regionFactory.setDiskStoreName(RR_DISK_STORE_NAME);
+
+      regionFactory.create(TEST_REGION_NAME);
+    });
+  }
+
+  private void assertValueType(VM vm, final ValueType expectedType) {
+    vm.invoke("assertValueType", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      Object value = 
region.getValueInVM(DeltaForceSizingFlagDUnitTest.DELTA_KEY);
+      switch (expectedType) {
+        case RAW_VALUE:
+          assertFalse("Value was " + value + " type " + value.getClass(),
+              (value instanceof CachedDeserializable));
+          break;
+        case CD_SERIALIZED:
+          assertTrue("Value was " + value + " type " + value.getClass(),
+              value instanceof CachedDeserializable);
+          assertTrue("Value not serialized",
+              ((CachedDeserializable) value).getValue() instanceof byte[]);
+          break;
+        case CD_DESERIALIZED:
+          assertTrue("Value was " + value + " type " + value.getClass(),
+              value instanceof CachedDeserializable);
+          assertFalse("Value was serialized",
+              (((CachedDeserializable) value).getValue() instanceof byte[]));
+          break;
+        case EVICTED:
+          assertNull(value);
+          break;
+      }
+    });
+  }
+
+  private static File[] getMyDiskDirs() {

Review comment:
       ```
     @Rule
     public SerializableTemporaryFolder temporaryFolder = new 
SerializableTemporaryFolder();
   
     @Test
     public void usesDiskDirs() throws Exception {
       File[] files = getMyDiskDirs();
     }
   
     private File[] getMyDiskDirs() throws IOException {
       long random = new Random().nextLong();
       File file = temporaryFolder.newFolder(String.valueOf(random));
       return new File[] {file};
     }
   ```

##########
File path: 
geode-core/src/distributedTest/java/org/apache/geode/internal/cache/DeltaForceSizingFlagDUnitTest.java
##########
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.logging.log4j.Logger;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.DiskStoreFactory;
+import org.apache.geode.cache.EvictionAction;
+import org.apache.geode.cache.EvictionAttributes;
+import org.apache.geode.cache.InterestPolicy;
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.cache.SubscriptionAttributes;
+import org.apache.geode.cache.util.ObjectSizer;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+/**
+ * Tests the use of the per-delta "forceRecalculateSize" flag.
+ */
+
+public class DeltaForceSizingFlagDUnitTest {
+  private static final String TEST_REGION_NAME = "forceResizeTestRegionName";
+  public static final String SMALLER_DELTA_DATA = "12345";
+  public static final String LARGER_DELTA_DATA = "1234567890";
+  public static final String DELTA_KEY = "a_key";
+  public static final String RR_DISK_STORE_NAME = 
"_forceRecalculateSize_replicate_store";
+  private static final Logger logger = LogService.getLogger();
+
+  @Rule
+  public ClusterStartupRule cluster = new ClusterStartupRule();
+
+  protected MemberVM locator;
+  protected MemberVM server1;
+  protected MemberVM server2;
+
+  @Before
+  public void setup() {
+    int locatorPort;
+    locator = cluster.startLocatorVM(0);
+    locatorPort = locator.getPort();
+
+    server1 = cluster.startServerVM(1, locatorPort);
+    server2 = cluster.startServerVM(2, locatorPort);
+  }
+
+  @Test
+  public void testRRMemLRUDelta() {
+    doRRMemLRUDeltaTest(false);
+  }
+
+  @Test
+  public void testRRMemLRUDeltaAndFlag() {
+    doRRMemLRUDeltaTest(true);
+  }
+
+  @Test
+  public void testPRNoLRUDelta() {
+    doPRNoLRUDeltaTest(false);
+  }
+
+  @Test
+  public void testPRNoLRUAndFlagDelta() {
+    doPRNoLRUDeltaTest(true);
+  }
+
+  private void doRRMemLRUDeltaTest(boolean shouldSizeChange) {
+    VM vm1 = server1.getVM();
+    VM vm2 = server2.getVM();
+
+    createRR(server1);
+    createRR(server2);
+    TestDelta delta1 = new TestDelta(false, SMALLER_DELTA_DATA, 
shouldSizeChange);
+    put(vm1, delta1);
+
+    assertValueType(vm1, ValueType.RAW_VALUE);
+    assertValueType(vm2, ValueType.CD_SERIALIZED);
+    assertEquals(1, getObjectSizerInvocations(vm1));
+    assertEquals(0, getObjectSizerInvocations(vm2));
+
+    long origEvictionSize0 = getSizeFromEvictionStats(vm1);
+    long origEvictionSize1 = getSizeFromEvictionStats(vm2);
+    delta1.info = LARGER_DELTA_DATA;
+    delta1.hasDelta = true;
+    // Update the delta
+    put(vm1, delta1);
+
+    assertValueType(vm1, ValueType.RAW_VALUE);
+    assertValueType(vm2, ValueType.CD_DESERIALIZED);
+
+    assertEquals(2, getObjectSizerInvocations(vm1));
+
+    long finalEvictionSize0 = getSizeFromEvictionStats(vm1);
+    long finalEvictionSize1 = getSizeFromEvictionStats(vm2);
+    assertEquals(5, finalEvictionSize0 - origEvictionSize0);
+    if (shouldSizeChange) {
+      assertEquals(1, getObjectSizerInvocations(vm2));
+      // I'm not sure what the change in size should be, because we went
+      // from serialized to deserialized
+      assertTrue(finalEvictionSize1 - origEvictionSize1 != 0);
+    } else {
+      // we invoke the sizer once when we deserialize the original to apply 
the delta to it
+      assertEquals(0, getObjectSizerInvocations(vm2));
+      assertEquals(0, finalEvictionSize1 - origEvictionSize1);
+    }
+  }
+
+  private void doPRNoLRUDeltaTest(boolean shouldSizeChange) {
+    VM vm1 = server1.getVM();
+    VM vm2 = server2.getVM();
+
+    createPR(server1);
+    createPR(server2);
+
+    TestDelta delta1 = new TestDelta(false, SMALLER_DELTA_DATA, 
shouldSizeChange);
+    put(vm1, delta1);
+    long origPRSize0 = getSizeFromPRStats(vm1);
+    long origPRSize1 = getSizeFromPRStats(vm2);
+
+    // Update the delta
+    delta1.info = LARGER_DELTA_DATA;
+    delta1.hasDelta = true;
+    put(vm1, delta1);
+    long finalPRSize0 = getSizeFromPRStats(vm1);
+    long finalPRSize1 = getSizeFromPRStats(vm2);
+
+    if (shouldSizeChange) {
+      // I'm not sure what the change in size should be, because we went
+      // from serialized to deserialized
+      assertTrue(finalPRSize0 - origPRSize0 != 0);
+      assertTrue(finalPRSize1 - origPRSize1 != 0);
+    } else {
+      assertEquals(0, finalPRSize0 - origPRSize0);
+      assertEquals(0, finalPRSize1 - origPRSize1);
+    }
+  }
+
+  private long getSizeFromPRStats(VM vm0) {
+    return vm0.invoke("getSizeFromPRStats", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      if (region instanceof PartitionedRegion) {
+        long total = 0;
+        PartitionedRegion pr = (PartitionedRegion) region;
+        int totalNumBuckets = pr.getPartitionAttributes().getTotalNumBuckets();
+        for (int i = 0; i < totalNumBuckets; i++) {
+          total += pr.getDataStore().getBucketSize(i);
+        }
+        return total;
+      } else {
+        return 0L;
+      }
+    });
+  }
+
+  private long getSizeFromEvictionStats(VM vm0) {
+    return vm0.invoke("getSizeFromEvictionStats", () -> {
+
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      return region.getEvictionCounter();
+    });
+  }
+
+  private int getObjectSizerInvocations(VM vm0) {
+    return vm0.invoke("getObjectSizerInvocations", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      return getObjectSizerInvocations(region);
+    });
+  }
+
+  private void put(VM vm0, final Object value) {
+    vm0.invoke("Put data", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      region.put(DeltaForceSizingFlagDUnitTest.DELTA_KEY, value);
+    });
+  }
+
+  protected static int getObjectSizerInvocations(LocalRegion region) {
+    TestObjectSizer sizer = (TestObjectSizer) 
region.getEvictionAttributes().getObjectSizer();
+    int result = sizer.invocations.get();
+    logger.info("objectSizerInvocations=" + result);
+    return result;
+  }
+
+  private void createRR(MemberVM memberVM) {
+    memberVM.invoke("Create replicateRegion", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      RegionFactory<Integer, TestDelta> regionFactory = 
cache.createRegionFactory();
+      regionFactory.setDiskSynchronous(true);
+      regionFactory.setDataPolicy(DataPolicy.REPLICATE);
+      regionFactory.setScope(Scope.DISTRIBUTED_ACK);
+      
regionFactory.setEvictionAttributes(EvictionAttributes.createLRUMemoryAttributes(1,
+          new TestObjectSizer(), EvictionAction.OVERFLOW_TO_DISK));
+
+      DiskStoreFactory diskStoreFactory = cache.createDiskStoreFactory();
+      diskStoreFactory.setDiskDirs(getMyDiskDirs());
+      diskStoreFactory.create(RR_DISK_STORE_NAME);
+      regionFactory.setDiskStoreName(RR_DISK_STORE_NAME);
+
+      regionFactory.create(TEST_REGION_NAME);
+    });
+  }
+
+  private void assertValueType(VM vm, final ValueType expectedType) {
+    vm.invoke("assertValueType", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      Object value = 
region.getValueInVM(DeltaForceSizingFlagDUnitTest.DELTA_KEY);
+      switch (expectedType) {
+        case RAW_VALUE:
+          assertFalse("Value was " + value + " type " + value.getClass(),
+              (value instanceof CachedDeserializable));
+          break;
+        case CD_SERIALIZED:
+          assertTrue("Value was " + value + " type " + value.getClass(),

Review comment:
       AssertJ would be:
   ```
   assertThat(value).isInstanceOf(CachedDeserializable.class);
   
   Object deserializedValue = (CachedDeserializable) value.getValue();
   assertThat(deserializedValue).isInstanceOf(byte[].class);
   ```
   Extracting `value.getValue()` to a variable is very subjective and maybe 
makes things more readable -- although this line is small so it's not helping 
as much as it does in some other places.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow individual deltas to trigger bucket size recalculation
> ------------------------------------------------------------
>
>                 Key: GEODE-8894
>                 URL: https://issues.apache.org/jira/browse/GEODE-8894
>             Project: Geode
>          Issue Type: New Feature
>          Components: core, serialization
>    Affects Versions: 1.14.0
>            Reporter: Raymond Ingles
>            Priority: Major
>              Labels: pull-request-available
>
> The Redis subsystem uses Deltas heavily, but by default deltas do not trigger 
> an update to the size of their buckets. This leads to incorrect memory usage 
> accounting over the long term, especially with the use of Redis commands like 
> "APPEND".
> It is possible to set the system property "DELTAS_RECALCULATE_SIZE", but this 
> is a global value that would affect the processing of all deltas, including 
> non-Redis operations.
> Instead, we will add a new default method to the Delta interface, that can be 
> overridden by individual Delta implementations (such as Redis). This will 
> trigger the same behavior as DELTAS_RECALCULATE_SIZE, but on a per-delta 
> basis. Thus, other Geode operations will not force bucket size recalculations 
> unless the global property is set, but Redis statistics will be correct.
> Other types of delta operations may find this useful in the future.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to