[ 
https://issues.apache.org/jira/browse/GEODE-8894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17280001#comment-17280001
 ] 

ASF GitHub Bot commented on GEODE-8894:
---------------------------------------

DonalEvans commented on a change in pull request #5978:
URL: https://github.com/apache/geode/pull/5978#discussion_r571248550



##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
##########
@@ -1737,7 +1737,6 @@ private void extractDeltaIntoEvent(Object value, 
EntryEventImpl event) {
               throw new DeltaSerializationException("Caught exception while 
sending delta", e);
             }
             event.setDeltaBytes(hdos.toByteArray());
-            getCachePerfStats().endDeltaPrepared(start);

Review comment:
       With the removal of this line, the `start` variable on line 1731 can now 
be removed as it's no longer used.

##########
File path: 
geode-core/src/distributedTest/java/org/apache/geode/internal/cache/DeltaForceSizingFlagDUnitTest.java
##########
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.logging.log4j.Logger;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.DataSerializable;
+import org.apache.geode.DataSerializer;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.DiskStoreFactory;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.EvictionAction;
+import org.apache.geode.cache.EvictionAttributes;
+import org.apache.geode.cache.InterestPolicy;
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.cache.SubscriptionAttributes;
+import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.cache.util.ObjectSizer;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+/**
+ * Tests the use of the per-delta "forceRecalculateSize" flag.
+ */
+
+public class DeltaForceSizingFlagDUnitTest {
+  private static final String TEST_REGION_NAME = "forceResizeTestRegionName";
+  public static final String SMALLER_DELTA_DATA = "12345";
+  public static final String LARGER_DELTA_DATA = "1234567890";
+  public static final String DELTA_KEY = "a_key";
+  public static final String RR_DISK_STORE_NAME = 
"_forceRecalculateSize_replicate_store";
+  private static final Logger logger = LogService.getLogger();
+
+  @Rule
+  public ClusterStartupRule cluster = new ClusterStartupRule();
+
+  protected MemberVM locator;
+  protected MemberVM server1;
+  protected MemberVM server2;
+
+  @Before
+  public void setup() {
+    int locatorPort;
+    locator = cluster.startLocatorVM(0);
+    locatorPort = locator.getPort();
+
+    server1 = cluster.startServerVM(1, locatorPort);
+    server2 = cluster.startServerVM(2, locatorPort);
+  }
+
+  @Test
+  public void testRRMemLRUDelta() {
+    doRRMemLRUDeltaTest(false);
+  }
+
+  @Test
+  public void testRRMemLRUDeltaAndFlag() {
+    doRRMemLRUDeltaTest(true);
+  }
+
+  @Test
+  public void testPRNoLRUDelta() {
+    doPRNoLRUDeltaTest(false);
+  }
+
+  @Test
+  public void testPRNoLRUAndFlagDelta() {
+    doPRNoLRUDeltaTest(true);
+  }
+
+  private void doRRMemLRUDeltaTest(boolean shouldSizeChange) {
+    VM vm1 = server1.getVM();
+    VM vm2 = server2.getVM();
+
+    createRR(server1);
+    createRR(server2);
+    TestDelta delta1 = new TestDelta(false, SMALLER_DELTA_DATA, 
shouldSizeChange);
+    put(vm1, delta1);
+
+    assertValueType(vm1, ValueType.RAW_VALUE);
+    assertValueType(vm2, ValueType.CD_SERIALIZED);
+    assertEquals(1, getObjectSizerInvocations(vm1));
+    assertEquals(0, getObjectSizerInvocations(vm2));
+
+    long origEvictionSize0 = getSizeFromEvictionStats(vm1);
+    long origEvictionSize1 = getSizeFromEvictionStats(vm2);
+    delta1.info = LARGER_DELTA_DATA;
+    delta1.hasDelta = true;
+    // Update the delta
+    put(vm1, delta1);
+
+    assertValueType(vm1, ValueType.RAW_VALUE);
+    assertValueType(vm2, ValueType.CD_DESERIALIZED);
+
+    assertEquals(2, getObjectSizerInvocations(vm1));
+
+    long finalEvictionSize0 = getSizeFromEvictionStats(vm1);
+    long finalEvictionSize1 = getSizeFromEvictionStats(vm2);
+    assertEquals(5, finalEvictionSize0 - origEvictionSize0);
+    if (shouldSizeChange) {
+      assertEquals(1, getObjectSizerInvocations(vm2));
+      // I'm not sure what the change in size should be, because we went
+      // from serialized to deserialized
+      assertTrue(finalEvictionSize1 - origEvictionSize1 != 0);
+    } else {
+      // we invoke the sizer once when we deserialize the original to apply 
the delta to it
+      assertEquals(0, getObjectSizerInvocations(vm2));
+      assertEquals(0, finalEvictionSize1 - origEvictionSize1);
+    }
+  }
+
+  private void doPRNoLRUDeltaTest(boolean shouldSizeChange) {
+    VM vm1 = server1.getVM();
+    VM vm2 = server2.getVM();
+
+    createPR(server1);
+    createPR(server2);
+
+    TestDelta delta1 = new TestDelta(false, SMALLER_DELTA_DATA, 
shouldSizeChange);
+    put(vm1, delta1);
+    long origPRSize0 = getSizeFromPRStats(vm1);
+    long origPRSize1 = getSizeFromPRStats(vm2);
+
+    // Update the delta
+    delta1.info = LARGER_DELTA_DATA;
+    delta1.hasDelta = true;
+    put(vm1, delta1);
+    long finalPRSize0 = getSizeFromPRStats(vm1);
+    long finalPRSize1 = getSizeFromPRStats(vm2);
+
+    if (shouldSizeChange) {
+      // I'm not sure what the change in size should be, because we went
+      // from serialized to deserialized
+      assertTrue(finalPRSize0 - origPRSize0 != 0);
+      assertTrue(finalPRSize1 - origPRSize1 != 0);
+    } else {
+      assertEquals(0, finalPRSize0 - origPRSize0);
+      assertEquals(0, finalPRSize1 - origPRSize1);
+    }
+  }
+
+  private long getSizeFromPRStats(VM vm0) {
+    return vm0.invoke("getSizeFromPRStats", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      if (region instanceof PartitionedRegion) {
+        long total = 0;
+        PartitionedRegion pr = (PartitionedRegion) region;
+        int totalNumBuckets = pr.getPartitionAttributes().getTotalNumBuckets();
+        for (int i = 0; i < totalNumBuckets; i++) {
+          total += pr.getDataStore().getBucketSize(i);
+        }
+        return total;
+      } else {
+        return 0L;
+      }
+    });
+  }
+
+  private long getSizeFromEvictionStats(VM vm0) {
+    return vm0.invoke("getSizeFromEvictionStats", () -> {
+
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      return region.getEvictionCounter();
+    });
+  }
+
+  private int getObjectSizerInvocations(VM vm0) {
+    return vm0.invoke("getObjectSizerInvocations", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      return getObjectSizerInvocations(region);
+    });
+  }
+
+  private void put(VM vm0, final Object value) {
+    vm0.invoke("Put data", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      region.put(DeltaForceSizingFlagDUnitTest.DELTA_KEY, value);
+    });
+  }
+
+  protected static int getObjectSizerInvocations(LocalRegion region) {
+    TestObjectSizer sizer = (TestObjectSizer) 
region.getEvictionAttributes().getObjectSizer();
+    int result = sizer.invocations.get();
+    logger.info("objectSizerInvocations=" + result);
+    return result;
+  }
+
+  private void createRR(MemberVM memberVM) {
+    memberVM.invoke("Create replicateRegion", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      RegionFactory<Integer, TestDelta> regionFactory = 
cache.createRegionFactory();
+      regionFactory.setDiskSynchronous(true);
+      regionFactory.setDataPolicy(DataPolicy.REPLICATE);
+      regionFactory.setScope(Scope.DISTRIBUTED_ACK);
+      
regionFactory.setEvictionAttributes(EvictionAttributes.createLRUMemoryAttributes(1,
+          new TestObjectSizer(), EvictionAction.OVERFLOW_TO_DISK));
+
+      DiskStoreFactory diskStoreFactory = cache.createDiskStoreFactory();
+      diskStoreFactory.setDiskDirs(getMyDiskDirs());
+      diskStoreFactory.create(RR_DISK_STORE_NAME);
+      regionFactory.setDiskStoreName(RR_DISK_STORE_NAME);
+
+      regionFactory.create(TEST_REGION_NAME);
+    });
+  }
+
+  private void assertValueType(VM vm, final ValueType expectedType) {
+    vm.invoke("assertValueType", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      Object value = 
region.getValueInVM(DeltaForceSizingFlagDUnitTest.DELTA_KEY);
+      switch (expectedType) {
+        case RAW_VALUE:
+          assertFalse("Value was " + value + " type " + value.getClass(),
+              (value instanceof CachedDeserializable));
+          break;
+        case CD_SERIALIZED:
+          assertTrue("Value was " + value + " type " + value.getClass(),
+              value instanceof CachedDeserializable);
+          assertTrue("Value not serialized",
+              ((CachedDeserializable) value).getValue() instanceof byte[]);
+          break;
+        case CD_DESERIALIZED:
+          assertTrue("Value was " + value + " type " + value.getClass(),
+              value instanceof CachedDeserializable);
+          assertFalse("Value was serialized",
+              (((CachedDeserializable) value).getValue() instanceof byte[]));
+          break;
+        case EVICTED:
+          assertNull(value);
+          break;
+      }
+    });
+  }
+
+  private static File[] getMyDiskDirs() {
+    long random = new Random().nextLong();
+    File file = new File(Long.toString(random));
+    assertTrue(file.mkdirs());
+    return new File[] {file};
+  }
+
+  private void createPR(MemberVM memberVM) {
+    memberVM.invoke("Create partitioned region", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+
+      RegionFactory<Integer, TestDelta> regionFactory = 
cache.createRegionFactory();
+
+      regionFactory.setDiskSynchronous(true);
+      PartitionAttributesFactory<Integer, TestDelta> paf =
+          new PartitionAttributesFactory<>();
+      paf.setRedundantCopies(1);
+      PartitionAttributes<Integer, TestDelta> prAttr = paf.create();
+      regionFactory.setPartitionAttributes(prAttr);
+      regionFactory.setDataPolicy(DataPolicy.PARTITION);
+      regionFactory.setSubscriptionAttributes(new 
SubscriptionAttributes(InterestPolicy.ALL));
+      regionFactory.create(TEST_REGION_NAME);
+    });
+  }
+
+  private static class TestObjectSizer implements ObjectSizer {
+    private final AtomicInteger invocations = new AtomicInteger();
+
+    @Override
+    public int sizeof(Object o) {
+      logger.debug("TestObjectSizer invoked"/* , new Exception("stack trace") 
*/);
+      if (o instanceof TestObject) {
+        invocations.incrementAndGet();
+        return ((TestObject) o).sizeForSizer;
+      }
+      if (o instanceof TestDelta) {
+        invocations.incrementAndGet();
+        return ((TestDelta) o).info.length();
+      }
+      if (o instanceof Integer) {
+        return 0;
+      }
+      if (o instanceof TestKey) {
+        invocations.incrementAndGet();
+        return ((TestKey) o).value.length();
+      }
+      throw new RuntimeException("Unexpected type to be sized " + o.getClass() 
+ ", object=" + o);
+    }
+  }
+
+  private static class TestKey implements DataSerializable {

Review comment:
       This class is no longer instantiated anywhere in the test, so can be 
removed.

##########
File path: 
geode-core/src/distributedTest/java/org/apache/geode/internal/cache/DeltaForceSizingFlagDUnitTest.java
##########
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.logging.log4j.Logger;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.DataSerializable;
+import org.apache.geode.DataSerializer;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.DiskStoreFactory;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.EvictionAction;
+import org.apache.geode.cache.EvictionAttributes;
+import org.apache.geode.cache.InterestPolicy;
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.cache.SubscriptionAttributes;
+import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.cache.util.ObjectSizer;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+/**
+ * Tests the use of the per-delta "forceRecalculateSize" flag.
+ */
+
+public class DeltaForceSizingFlagDUnitTest {
+  private static final String TEST_REGION_NAME = "forceResizeTestRegionName";
+  public static final String SMALLER_DELTA_DATA = "12345";
+  public static final String LARGER_DELTA_DATA = "1234567890";
+  public static final String DELTA_KEY = "a_key";
+  public static final String RR_DISK_STORE_NAME = 
"_forceRecalculateSize_replicate_store";
+  private static final Logger logger = LogService.getLogger();
+
+  @Rule
+  public ClusterStartupRule cluster = new ClusterStartupRule();
+
+  protected MemberVM locator;
+  protected MemberVM server1;
+  protected MemberVM server2;
+
+  @Before
+  public void setup() {
+    int locatorPort;
+    locator = cluster.startLocatorVM(0);
+    locatorPort = locator.getPort();
+
+    server1 = cluster.startServerVM(1, locatorPort);
+    server2 = cluster.startServerVM(2, locatorPort);
+  }
+
+  @Test
+  public void testRRMemLRUDelta() {
+    doRRMemLRUDeltaTest(false);
+  }
+
+  @Test
+  public void testRRMemLRUDeltaAndFlag() {
+    doRRMemLRUDeltaTest(true);
+  }
+
+  @Test
+  public void testPRNoLRUDelta() {
+    doPRNoLRUDeltaTest(false);
+  }
+
+  @Test
+  public void testPRNoLRUAndFlagDelta() {
+    doPRNoLRUDeltaTest(true);
+  }
+
+  private void doRRMemLRUDeltaTest(boolean shouldSizeChange) {
+    VM vm1 = server1.getVM();
+    VM vm2 = server2.getVM();
+
+    createRR(server1);
+    createRR(server2);
+    TestDelta delta1 = new TestDelta(false, SMALLER_DELTA_DATA, 
shouldSizeChange);
+    put(vm1, delta1);
+
+    assertValueType(vm1, ValueType.RAW_VALUE);
+    assertValueType(vm2, ValueType.CD_SERIALIZED);
+    assertEquals(1, getObjectSizerInvocations(vm1));
+    assertEquals(0, getObjectSizerInvocations(vm2));
+
+    long origEvictionSize0 = getSizeFromEvictionStats(vm1);
+    long origEvictionSize1 = getSizeFromEvictionStats(vm2);
+    delta1.info = LARGER_DELTA_DATA;
+    delta1.hasDelta = true;
+    // Update the delta
+    put(vm1, delta1);
+
+    assertValueType(vm1, ValueType.RAW_VALUE);
+    assertValueType(vm2, ValueType.CD_DESERIALIZED);
+
+    assertEquals(2, getObjectSizerInvocations(vm1));
+
+    long finalEvictionSize0 = getSizeFromEvictionStats(vm1);
+    long finalEvictionSize1 = getSizeFromEvictionStats(vm2);
+    assertEquals(5, finalEvictionSize0 - origEvictionSize0);
+    if (shouldSizeChange) {
+      assertEquals(1, getObjectSizerInvocations(vm2));
+      // I'm not sure what the change in size should be, because we went
+      // from serialized to deserialized
+      assertTrue(finalEvictionSize1 - origEvictionSize1 != 0);
+    } else {
+      // we invoke the sizer once when we deserialize the original to apply 
the delta to it
+      assertEquals(0, getObjectSizerInvocations(vm2));
+      assertEquals(0, finalEvictionSize1 - origEvictionSize1);
+    }
+  }
+
+  private void doPRNoLRUDeltaTest(boolean shouldSizeChange) {
+    VM vm1 = server1.getVM();
+    VM vm2 = server2.getVM();
+
+    createPR(server1);
+    createPR(server2);
+
+    TestDelta delta1 = new TestDelta(false, SMALLER_DELTA_DATA, 
shouldSizeChange);
+    put(vm1, delta1);
+    long origPRSize0 = getSizeFromPRStats(vm1);
+    long origPRSize1 = getSizeFromPRStats(vm2);
+
+    // Update the delta
+    delta1.info = LARGER_DELTA_DATA;
+    delta1.hasDelta = true;
+    put(vm1, delta1);
+    long finalPRSize0 = getSizeFromPRStats(vm1);
+    long finalPRSize1 = getSizeFromPRStats(vm2);
+
+    if (shouldSizeChange) {
+      // I'm not sure what the change in size should be, because we went
+      // from serialized to deserialized
+      assertTrue(finalPRSize0 - origPRSize0 != 0);
+      assertTrue(finalPRSize1 - origPRSize1 != 0);
+    } else {
+      assertEquals(0, finalPRSize0 - origPRSize0);
+      assertEquals(0, finalPRSize1 - origPRSize1);
+    }
+  }
+
+  private long getSizeFromPRStats(VM vm0) {
+    return vm0.invoke("getSizeFromPRStats", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      if (region instanceof PartitionedRegion) {
+        long total = 0;
+        PartitionedRegion pr = (PartitionedRegion) region;
+        int totalNumBuckets = pr.getPartitionAttributes().getTotalNumBuckets();
+        for (int i = 0; i < totalNumBuckets; i++) {
+          total += pr.getDataStore().getBucketSize(i);
+        }
+        return total;
+      } else {
+        return 0L;
+      }
+    });
+  }
+
+  private long getSizeFromEvictionStats(VM vm0) {
+    return vm0.invoke("getSizeFromEvictionStats", () -> {
+
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      return region.getEvictionCounter();
+    });
+  }
+
+  private int getObjectSizerInvocations(VM vm0) {
+    return vm0.invoke("getObjectSizerInvocations", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      return getObjectSizerInvocations(region);
+    });
+  }
+
+  private void put(VM vm0, final Object value) {
+    vm0.invoke("Put data", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      region.put(DeltaForceSizingFlagDUnitTest.DELTA_KEY, value);
+    });
+  }
+
+  protected static int getObjectSizerInvocations(LocalRegion region) {
+    TestObjectSizer sizer = (TestObjectSizer) 
region.getEvictionAttributes().getObjectSizer();
+    int result = sizer.invocations.get();
+    logger.info("objectSizerInvocations=" + result);
+    return result;
+  }
+
+  private void createRR(MemberVM memberVM) {
+    memberVM.invoke("Create replicateRegion", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      RegionFactory<Integer, TestDelta> regionFactory = 
cache.createRegionFactory();
+      regionFactory.setDiskSynchronous(true);
+      regionFactory.setDataPolicy(DataPolicy.REPLICATE);
+      regionFactory.setScope(Scope.DISTRIBUTED_ACK);
+      
regionFactory.setEvictionAttributes(EvictionAttributes.createLRUMemoryAttributes(1,
+          new TestObjectSizer(), EvictionAction.OVERFLOW_TO_DISK));
+
+      DiskStoreFactory diskStoreFactory = cache.createDiskStoreFactory();
+      diskStoreFactory.setDiskDirs(getMyDiskDirs());
+      diskStoreFactory.create(RR_DISK_STORE_NAME);
+      regionFactory.setDiskStoreName(RR_DISK_STORE_NAME);
+
+      regionFactory.create(TEST_REGION_NAME);
+    });
+  }
+
+  private void assertValueType(VM vm, final ValueType expectedType) {
+    vm.invoke("assertValueType", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      Object value = 
region.getValueInVM(DeltaForceSizingFlagDUnitTest.DELTA_KEY);
+      switch (expectedType) {
+        case RAW_VALUE:
+          assertFalse("Value was " + value + " type " + value.getClass(),
+              (value instanceof CachedDeserializable));
+          break;
+        case CD_SERIALIZED:
+          assertTrue("Value was " + value + " type " + value.getClass(),
+              value instanceof CachedDeserializable);
+          assertTrue("Value not serialized",
+              ((CachedDeserializable) value).getValue() instanceof byte[]);
+          break;
+        case CD_DESERIALIZED:
+          assertTrue("Value was " + value + " type " + value.getClass(),
+              value instanceof CachedDeserializable);
+          assertFalse("Value was serialized",
+              (((CachedDeserializable) value).getValue() instanceof byte[]));
+          break;
+        case EVICTED:
+          assertNull(value);
+          break;
+      }
+    });
+  }
+
+  private static File[] getMyDiskDirs() {
+    long random = new Random().nextLong();
+    File file = new File(Long.toString(random));
+    assertTrue(file.mkdirs());
+    return new File[] {file};
+  }
+
+  private void createPR(MemberVM memberVM) {
+    memberVM.invoke("Create partitioned region", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+
+      RegionFactory<Integer, TestDelta> regionFactory = 
cache.createRegionFactory();
+
+      regionFactory.setDiskSynchronous(true);
+      PartitionAttributesFactory<Integer, TestDelta> paf =
+          new PartitionAttributesFactory<>();
+      paf.setRedundantCopies(1);
+      PartitionAttributes<Integer, TestDelta> prAttr = paf.create();
+      regionFactory.setPartitionAttributes(prAttr);
+      regionFactory.setDataPolicy(DataPolicy.PARTITION);
+      regionFactory.setSubscriptionAttributes(new 
SubscriptionAttributes(InterestPolicy.ALL));
+      regionFactory.create(TEST_REGION_NAME);
+    });
+  }
+
+  private static class TestObjectSizer implements ObjectSizer {
+    private final AtomicInteger invocations = new AtomicInteger();
+
+    @Override
+    public int sizeof(Object o) {
+      logger.debug("TestObjectSizer invoked"/* , new Exception("stack trace") 
*/);
+      if (o instanceof TestObject) {
+        invocations.incrementAndGet();
+        return ((TestObject) o).sizeForSizer;
+      }
+      if (o instanceof TestDelta) {
+        invocations.incrementAndGet();
+        return ((TestDelta) o).info.length();
+      }
+      if (o instanceof Integer) {
+        return 0;
+      }
+      if (o instanceof TestKey) {
+        invocations.incrementAndGet();
+        return ((TestKey) o).value.length();
+      }
+      throw new RuntimeException("Unexpected type to be sized " + o.getClass() 
+ ", object=" + o);
+    }
+  }
+
+  private static class TestKey implements DataSerializable {
+    String value;
+
+    public TestKey(String value) {
+      this.value = value;
+    }
+
+    @Override
+    public void fromData(DataInput in) throws IOException, 
ClassNotFoundException {
+      value = DataSerializer.readString(in);
+    }
+
+    @Override
+    public void toData(DataOutput out) throws IOException {
+      DataSerializer.writeString(value, out);
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((value == null) ? 0 : value.hashCode());
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (!(obj instanceof TestKey)) {
+        return false;
+      }
+      TestKey other = (TestKey) obj;
+      if (value == null) {
+        return other.value == null;
+      } else {
+        return value.equals(other.value);
+      }
+    }
+
+  }
+
+  private static class TestObject implements DataSerializable {
+    public int sizeForSizer;
+    public int sizeForSerialization;
+
+    public TestObject(int sizeForSerialization, int sizeForSizer) {
+      super();
+      this.sizeForSizer = sizeForSizer;
+      this.sizeForSerialization = sizeForSerialization;
+    }
+
+    @Override
+    public void fromData(DataInput in) throws IOException, 
ClassNotFoundException {
+      sizeForSizer = in.readInt();
+      sizeForSerialization = in.readInt();
+      // We don't actually need these things.
+      in.skipBytes(sizeForSerialization);
+    }
+
+    @Override
+    public void toData(DataOutput out) throws IOException {
+      out.writeInt(sizeForSizer);
+      out.writeInt(sizeForSerialization);
+      out.write(new byte[sizeForSerialization]);
+
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + sizeForSerialization;
+      result = prime * result + sizeForSizer;
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (!(obj instanceof TestObject)) {
+        return false;
+      }
+      TestObject other = (TestObject) obj;
+      if (sizeForSerialization != other.sizeForSerialization) {
+        return false;
+      }
+      return sizeForSizer == other.sizeForSizer;
+    }
+
+    @Override
+    public String toString() {
+      return "TestObject [sizeForSerialization=" + sizeForSerialization + ", 
sizeForSizer="
+          + sizeForSizer + "]";
+    }
+  }
+
+  public static class TestCacheListener<K, V> extends CacheListenerAdapter<K, 
V> {
+
+    @Override
+    public void afterCreate(EntryEvent<K, V> event) {
+      // Make sure we deserialize the new value
+      logger.debug("invoked afterCreate with " + event);
+      logger.info(String.format("%s",

Review comment:
       The use of `String.format()` here is unnecessary.

##########
File path: 
geode-core/src/distributedTest/java/org/apache/geode/internal/cache/DeltaForceSizingFlagDUnitTest.java
##########
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.logging.log4j.Logger;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.DataSerializable;
+import org.apache.geode.DataSerializer;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.DiskStoreFactory;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.EvictionAction;
+import org.apache.geode.cache.EvictionAttributes;
+import org.apache.geode.cache.InterestPolicy;
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.cache.SubscriptionAttributes;
+import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.cache.util.ObjectSizer;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+/**
+ * Tests the use of the per-delta "forceRecalculateSize" flag.
+ */
+
+public class DeltaForceSizingFlagDUnitTest {
+  private static final String TEST_REGION_NAME = "forceResizeTestRegionName";
+  public static final String SMALLER_DELTA_DATA = "12345";
+  public static final String LARGER_DELTA_DATA = "1234567890";
+  public static final String DELTA_KEY = "a_key";
+  public static final String RR_DISK_STORE_NAME = 
"_forceRecalculateSize_replicate_store";
+  private static final Logger logger = LogService.getLogger();
+
+  @Rule
+  public ClusterStartupRule cluster = new ClusterStartupRule();
+
+  protected MemberVM locator;
+  protected MemberVM server1;
+  protected MemberVM server2;
+
+  @Before
+  public void setup() {
+    int locatorPort;
+    locator = cluster.startLocatorVM(0);
+    locatorPort = locator.getPort();
+
+    server1 = cluster.startServerVM(1, locatorPort);
+    server2 = cluster.startServerVM(2, locatorPort);
+  }
+
+  @Test
+  public void testRRMemLRUDelta() {
+    doRRMemLRUDeltaTest(false);
+  }
+
+  @Test
+  public void testRRMemLRUDeltaAndFlag() {
+    doRRMemLRUDeltaTest(true);
+  }
+
+  @Test
+  public void testPRNoLRUDelta() {
+    doPRNoLRUDeltaTest(false);
+  }
+
+  @Test
+  public void testPRNoLRUAndFlagDelta() {
+    doPRNoLRUDeltaTest(true);
+  }
+
+  private void doRRMemLRUDeltaTest(boolean shouldSizeChange) {
+    VM vm1 = server1.getVM();
+    VM vm2 = server2.getVM();
+
+    createRR(server1);
+    createRR(server2);
+    TestDelta delta1 = new TestDelta(false, SMALLER_DELTA_DATA, 
shouldSizeChange);
+    put(vm1, delta1);
+
+    assertValueType(vm1, ValueType.RAW_VALUE);
+    assertValueType(vm2, ValueType.CD_SERIALIZED);
+    assertEquals(1, getObjectSizerInvocations(vm1));
+    assertEquals(0, getObjectSizerInvocations(vm2));
+
+    long origEvictionSize0 = getSizeFromEvictionStats(vm1);
+    long origEvictionSize1 = getSizeFromEvictionStats(vm2);
+    delta1.info = LARGER_DELTA_DATA;
+    delta1.hasDelta = true;
+    // Update the delta
+    put(vm1, delta1);
+
+    assertValueType(vm1, ValueType.RAW_VALUE);
+    assertValueType(vm2, ValueType.CD_DESERIALIZED);
+
+    assertEquals(2, getObjectSizerInvocations(vm1));
+
+    long finalEvictionSize0 = getSizeFromEvictionStats(vm1);
+    long finalEvictionSize1 = getSizeFromEvictionStats(vm2);
+    assertEquals(5, finalEvictionSize0 - origEvictionSize0);
+    if (shouldSizeChange) {
+      assertEquals(1, getObjectSizerInvocations(vm2));
+      // I'm not sure what the change in size should be, because we went
+      // from serialized to deserialized
+      assertTrue(finalEvictionSize1 - origEvictionSize1 != 0);
+    } else {
+      // we invoke the sizer once when we deserialize the original to apply 
the delta to it
+      assertEquals(0, getObjectSizerInvocations(vm2));
+      assertEquals(0, finalEvictionSize1 - origEvictionSize1);
+    }
+  }
+
+  private void doPRNoLRUDeltaTest(boolean shouldSizeChange) {
+    VM vm1 = server1.getVM();
+    VM vm2 = server2.getVM();
+
+    createPR(server1);
+    createPR(server2);
+
+    TestDelta delta1 = new TestDelta(false, SMALLER_DELTA_DATA, 
shouldSizeChange);
+    put(vm1, delta1);
+    long origPRSize0 = getSizeFromPRStats(vm1);
+    long origPRSize1 = getSizeFromPRStats(vm2);
+
+    // Update the delta
+    delta1.info = LARGER_DELTA_DATA;
+    delta1.hasDelta = true;
+    put(vm1, delta1);
+    long finalPRSize0 = getSizeFromPRStats(vm1);
+    long finalPRSize1 = getSizeFromPRStats(vm2);
+
+    if (shouldSizeChange) {
+      // I'm not sure what the change in size should be, because we went
+      // from serialized to deserialized
+      assertTrue(finalPRSize0 - origPRSize0 != 0);
+      assertTrue(finalPRSize1 - origPRSize1 != 0);
+    } else {
+      assertEquals(0, finalPRSize0 - origPRSize0);
+      assertEquals(0, finalPRSize1 - origPRSize1);
+    }
+  }
+
+  private long getSizeFromPRStats(VM vm0) {
+    return vm0.invoke("getSizeFromPRStats", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      if (region instanceof PartitionedRegion) {
+        long total = 0;
+        PartitionedRegion pr = (PartitionedRegion) region;
+        int totalNumBuckets = pr.getPartitionAttributes().getTotalNumBuckets();
+        for (int i = 0; i < totalNumBuckets; i++) {
+          total += pr.getDataStore().getBucketSize(i);
+        }
+        return total;
+      } else {
+        return 0L;
+      }
+    });
+  }
+
+  private long getSizeFromEvictionStats(VM vm0) {
+    return vm0.invoke("getSizeFromEvictionStats", () -> {
+
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      return region.getEvictionCounter();
+    });
+  }
+
+  private int getObjectSizerInvocations(VM vm0) {
+    return vm0.invoke("getObjectSizerInvocations", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      return getObjectSizerInvocations(region);
+    });
+  }
+
+  private void put(VM vm0, final Object value) {
+    vm0.invoke("Put data", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      region.put(DeltaForceSizingFlagDUnitTest.DELTA_KEY, value);
+    });
+  }
+
+  protected static int getObjectSizerInvocations(LocalRegion region) {
+    TestObjectSizer sizer = (TestObjectSizer) 
region.getEvictionAttributes().getObjectSizer();
+    int result = sizer.invocations.get();
+    logger.info("objectSizerInvocations=" + result);
+    return result;
+  }
+
+  private void createRR(MemberVM memberVM) {
+    memberVM.invoke("Create replicateRegion", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      RegionFactory<Integer, TestDelta> regionFactory = 
cache.createRegionFactory();
+      regionFactory.setDiskSynchronous(true);
+      regionFactory.setDataPolicy(DataPolicy.REPLICATE);
+      regionFactory.setScope(Scope.DISTRIBUTED_ACK);
+      
regionFactory.setEvictionAttributes(EvictionAttributes.createLRUMemoryAttributes(1,
+          new TestObjectSizer(), EvictionAction.OVERFLOW_TO_DISK));
+
+      DiskStoreFactory diskStoreFactory = cache.createDiskStoreFactory();
+      diskStoreFactory.setDiskDirs(getMyDiskDirs());
+      diskStoreFactory.create(RR_DISK_STORE_NAME);
+      regionFactory.setDiskStoreName(RR_DISK_STORE_NAME);
+
+      regionFactory.create(TEST_REGION_NAME);
+    });
+  }
+
+  private void assertValueType(VM vm, final ValueType expectedType) {
+    vm.invoke("assertValueType", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      Object value = 
region.getValueInVM(DeltaForceSizingFlagDUnitTest.DELTA_KEY);
+      switch (expectedType) {
+        case RAW_VALUE:
+          assertFalse("Value was " + value + " type " + value.getClass(),
+              (value instanceof CachedDeserializable));
+          break;
+        case CD_SERIALIZED:
+          assertTrue("Value was " + value + " type " + value.getClass(),
+              value instanceof CachedDeserializable);
+          assertTrue("Value not serialized",
+              ((CachedDeserializable) value).getValue() instanceof byte[]);
+          break;
+        case CD_DESERIALIZED:
+          assertTrue("Value was " + value + " type " + value.getClass(),
+              value instanceof CachedDeserializable);
+          assertFalse("Value was serialized",
+              (((CachedDeserializable) value).getValue() instanceof byte[]));
+          break;
+        case EVICTED:
+          assertNull(value);
+          break;
+      }
+    });
+  }
+
+  private static File[] getMyDiskDirs() {
+    long random = new Random().nextLong();
+    File file = new File(Long.toString(random));
+    assertTrue(file.mkdirs());
+    return new File[] {file};
+  }
+
+  private void createPR(MemberVM memberVM) {
+    memberVM.invoke("Create partitioned region", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+
+      RegionFactory<Integer, TestDelta> regionFactory = 
cache.createRegionFactory();
+
+      regionFactory.setDiskSynchronous(true);
+      PartitionAttributesFactory<Integer, TestDelta> paf =
+          new PartitionAttributesFactory<>();
+      paf.setRedundantCopies(1);
+      PartitionAttributes<Integer, TestDelta> prAttr = paf.create();
+      regionFactory.setPartitionAttributes(prAttr);
+      regionFactory.setDataPolicy(DataPolicy.PARTITION);
+      regionFactory.setSubscriptionAttributes(new 
SubscriptionAttributes(InterestPolicy.ALL));
+      regionFactory.create(TEST_REGION_NAME);
+    });
+  }
+
+  private static class TestObjectSizer implements ObjectSizer {
+    private final AtomicInteger invocations = new AtomicInteger();
+
+    @Override
+    public int sizeof(Object o) {
+      logger.debug("TestObjectSizer invoked"/* , new Exception("stack trace") 
*/);
+      if (o instanceof TestObject) {
+        invocations.incrementAndGet();
+        return ((TestObject) o).sizeForSizer;
+      }
+      if (o instanceof TestDelta) {
+        invocations.incrementAndGet();
+        return ((TestDelta) o).info.length();
+      }
+      if (o instanceof Integer) {
+        return 0;
+      }
+      if (o instanceof TestKey) {
+        invocations.incrementAndGet();
+        return ((TestKey) o).value.length();
+      }
+      throw new RuntimeException("Unexpected type to be sized " + o.getClass() 
+ ", object=" + o);
+    }
+  }
+
+  private static class TestKey implements DataSerializable {
+    String value;
+
+    public TestKey(String value) {
+      this.value = value;
+    }
+
+    @Override
+    public void fromData(DataInput in) throws IOException, 
ClassNotFoundException {
+      value = DataSerializer.readString(in);
+    }
+
+    @Override
+    public void toData(DataOutput out) throws IOException {
+      DataSerializer.writeString(value, out);
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((value == null) ? 0 : value.hashCode());
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (!(obj instanceof TestKey)) {
+        return false;
+      }
+      TestKey other = (TestKey) obj;
+      if (value == null) {
+        return other.value == null;
+      } else {
+        return value.equals(other.value);
+      }
+    }
+
+  }
+
+  private static class TestObject implements DataSerializable {
+    public int sizeForSizer;
+    public int sizeForSerialization;
+
+    public TestObject(int sizeForSerialization, int sizeForSizer) {
+      super();
+      this.sizeForSizer = sizeForSizer;
+      this.sizeForSerialization = sizeForSerialization;
+    }
+
+    @Override
+    public void fromData(DataInput in) throws IOException, 
ClassNotFoundException {
+      sizeForSizer = in.readInt();
+      sizeForSerialization = in.readInt();
+      // We don't actually need these things.
+      in.skipBytes(sizeForSerialization);
+    }
+
+    @Override
+    public void toData(DataOutput out) throws IOException {
+      out.writeInt(sizeForSizer);
+      out.writeInt(sizeForSerialization);
+      out.write(new byte[sizeForSerialization]);
+
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + sizeForSerialization;
+      result = prime * result + sizeForSizer;
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (!(obj instanceof TestObject)) {
+        return false;
+      }
+      TestObject other = (TestObject) obj;
+      if (sizeForSerialization != other.sizeForSerialization) {
+        return false;
+      }
+      return sizeForSizer == other.sizeForSizer;
+    }
+
+    @Override
+    public String toString() {
+      return "TestObject [sizeForSerialization=" + sizeForSerialization + ", 
sizeForSizer="
+          + sizeForSizer + "]";
+    }
+  }
+
+  public static class TestCacheListener<K, V> extends CacheListenerAdapter<K, 
V> {
+
+    @Override
+    public void afterCreate(EntryEvent<K, V> event) {
+      // Make sure we deserialize the new value
+      logger.debug("invoked afterCreate with " + event);

Review comment:
       Since this is a test, the only time this logging should ever really need 
to be looked at is if the test fails. It would make life easier in that 
scenario if this was logged at info level so that as much information is 
available for anyone looking at a failure without having to rerun the test with 
finer logging enabled.

##########
File path: 
geode-core/src/distributedTest/java/org/apache/geode/internal/cache/DeltaForceSizingFlagDUnitTest.java
##########
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.logging.log4j.Logger;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.DataSerializable;
+import org.apache.geode.DataSerializer;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.DiskStoreFactory;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.EvictionAction;
+import org.apache.geode.cache.EvictionAttributes;
+import org.apache.geode.cache.InterestPolicy;
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.cache.SubscriptionAttributes;
+import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.cache.util.ObjectSizer;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+/**
+ * Tests the use of the per-delta "forceRecalculateSize" flag.
+ */
+
+public class DeltaForceSizingFlagDUnitTest {
+  private static final String TEST_REGION_NAME = "forceResizeTestRegionName";
+  public static final String SMALLER_DELTA_DATA = "12345";
+  public static final String LARGER_DELTA_DATA = "1234567890";
+  public static final String DELTA_KEY = "a_key";
+  public static final String RR_DISK_STORE_NAME = 
"_forceRecalculateSize_replicate_store";
+  private static final Logger logger = LogService.getLogger();
+
+  @Rule
+  public ClusterStartupRule cluster = new ClusterStartupRule();
+
+  protected MemberVM locator;
+  protected MemberVM server1;
+  protected MemberVM server2;
+
+  @Before
+  public void setup() {
+    int locatorPort;
+    locator = cluster.startLocatorVM(0);
+    locatorPort = locator.getPort();
+
+    server1 = cluster.startServerVM(1, locatorPort);
+    server2 = cluster.startServerVM(2, locatorPort);
+  }
+
+  @Test
+  public void testRRMemLRUDelta() {
+    doRRMemLRUDeltaTest(false);
+  }
+
+  @Test
+  public void testRRMemLRUDeltaAndFlag() {
+    doRRMemLRUDeltaTest(true);
+  }
+
+  @Test
+  public void testPRNoLRUDelta() {
+    doPRNoLRUDeltaTest(false);
+  }
+
+  @Test
+  public void testPRNoLRUAndFlagDelta() {
+    doPRNoLRUDeltaTest(true);
+  }
+
+  private void doRRMemLRUDeltaTest(boolean shouldSizeChange) {
+    VM vm1 = server1.getVM();
+    VM vm2 = server2.getVM();
+
+    createRR(server1);
+    createRR(server2);
+    TestDelta delta1 = new TestDelta(false, SMALLER_DELTA_DATA, 
shouldSizeChange);
+    put(vm1, delta1);
+
+    assertValueType(vm1, ValueType.RAW_VALUE);
+    assertValueType(vm2, ValueType.CD_SERIALIZED);
+    assertEquals(1, getObjectSizerInvocations(vm1));
+    assertEquals(0, getObjectSizerInvocations(vm2));
+
+    long origEvictionSize0 = getSizeFromEvictionStats(vm1);
+    long origEvictionSize1 = getSizeFromEvictionStats(vm2);
+    delta1.info = LARGER_DELTA_DATA;
+    delta1.hasDelta = true;
+    // Update the delta
+    put(vm1, delta1);
+
+    assertValueType(vm1, ValueType.RAW_VALUE);
+    assertValueType(vm2, ValueType.CD_DESERIALIZED);
+
+    assertEquals(2, getObjectSizerInvocations(vm1));
+
+    long finalEvictionSize0 = getSizeFromEvictionStats(vm1);
+    long finalEvictionSize1 = getSizeFromEvictionStats(vm2);
+    assertEquals(5, finalEvictionSize0 - origEvictionSize0);
+    if (shouldSizeChange) {
+      assertEquals(1, getObjectSizerInvocations(vm2));
+      // I'm not sure what the change in size should be, because we went
+      // from serialized to deserialized
+      assertTrue(finalEvictionSize1 - origEvictionSize1 != 0);
+    } else {
+      // we invoke the sizer once when we deserialize the original to apply 
the delta to it
+      assertEquals(0, getObjectSizerInvocations(vm2));
+      assertEquals(0, finalEvictionSize1 - origEvictionSize1);
+    }
+  }
+
+  private void doPRNoLRUDeltaTest(boolean shouldSizeChange) {
+    VM vm1 = server1.getVM();
+    VM vm2 = server2.getVM();
+
+    createPR(server1);
+    createPR(server2);
+
+    TestDelta delta1 = new TestDelta(false, SMALLER_DELTA_DATA, 
shouldSizeChange);
+    put(vm1, delta1);
+    long origPRSize0 = getSizeFromPRStats(vm1);
+    long origPRSize1 = getSizeFromPRStats(vm2);
+
+    // Update the delta
+    delta1.info = LARGER_DELTA_DATA;
+    delta1.hasDelta = true;
+    put(vm1, delta1);
+    long finalPRSize0 = getSizeFromPRStats(vm1);
+    long finalPRSize1 = getSizeFromPRStats(vm2);
+
+    if (shouldSizeChange) {
+      // I'm not sure what the change in size should be, because we went
+      // from serialized to deserialized
+      assertTrue(finalPRSize0 - origPRSize0 != 0);
+      assertTrue(finalPRSize1 - origPRSize1 != 0);
+    } else {
+      assertEquals(0, finalPRSize0 - origPRSize0);
+      assertEquals(0, finalPRSize1 - origPRSize1);
+    }
+  }
+
+  private long getSizeFromPRStats(VM vm0) {
+    return vm0.invoke("getSizeFromPRStats", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      if (region instanceof PartitionedRegion) {
+        long total = 0;
+        PartitionedRegion pr = (PartitionedRegion) region;
+        int totalNumBuckets = pr.getPartitionAttributes().getTotalNumBuckets();
+        for (int i = 0; i < totalNumBuckets; i++) {
+          total += pr.getDataStore().getBucketSize(i);
+        }
+        return total;
+      } else {
+        return 0L;
+      }
+    });
+  }
+
+  private long getSizeFromEvictionStats(VM vm0) {
+    return vm0.invoke("getSizeFromEvictionStats", () -> {
+
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      return region.getEvictionCounter();
+    });
+  }
+
+  private int getObjectSizerInvocations(VM vm0) {
+    return vm0.invoke("getObjectSizerInvocations", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      return getObjectSizerInvocations(region);
+    });
+  }
+
+  private void put(VM vm0, final Object value) {
+    vm0.invoke("Put data", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      region.put(DeltaForceSizingFlagDUnitTest.DELTA_KEY, value);
+    });
+  }
+
+  protected static int getObjectSizerInvocations(LocalRegion region) {
+    TestObjectSizer sizer = (TestObjectSizer) 
region.getEvictionAttributes().getObjectSizer();
+    int result = sizer.invocations.get();
+    logger.info("objectSizerInvocations=" + result);
+    return result;
+  }
+
+  private void createRR(MemberVM memberVM) {
+    memberVM.invoke("Create replicateRegion", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      RegionFactory<Integer, TestDelta> regionFactory = 
cache.createRegionFactory();
+      regionFactory.setDiskSynchronous(true);
+      regionFactory.setDataPolicy(DataPolicy.REPLICATE);
+      regionFactory.setScope(Scope.DISTRIBUTED_ACK);
+      
regionFactory.setEvictionAttributes(EvictionAttributes.createLRUMemoryAttributes(1,
+          new TestObjectSizer(), EvictionAction.OVERFLOW_TO_DISK));
+
+      DiskStoreFactory diskStoreFactory = cache.createDiskStoreFactory();
+      diskStoreFactory.setDiskDirs(getMyDiskDirs());
+      diskStoreFactory.create(RR_DISK_STORE_NAME);
+      regionFactory.setDiskStoreName(RR_DISK_STORE_NAME);
+
+      regionFactory.create(TEST_REGION_NAME);
+    });
+  }
+
+  private void assertValueType(VM vm, final ValueType expectedType) {
+    vm.invoke("assertValueType", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      Object value = 
region.getValueInVM(DeltaForceSizingFlagDUnitTest.DELTA_KEY);
+      switch (expectedType) {
+        case RAW_VALUE:
+          assertFalse("Value was " + value + " type " + value.getClass(),
+              (value instanceof CachedDeserializable));
+          break;
+        case CD_SERIALIZED:
+          assertTrue("Value was " + value + " type " + value.getClass(),
+              value instanceof CachedDeserializable);
+          assertTrue("Value not serialized",
+              ((CachedDeserializable) value).getValue() instanceof byte[]);
+          break;
+        case CD_DESERIALIZED:
+          assertTrue("Value was " + value + " type " + value.getClass(),
+              value instanceof CachedDeserializable);
+          assertFalse("Value was serialized",
+              (((CachedDeserializable) value).getValue() instanceof byte[]));
+          break;
+        case EVICTED:
+          assertNull(value);
+          break;
+      }
+    });
+  }
+
+  private static File[] getMyDiskDirs() {
+    long random = new Random().nextLong();
+    File file = new File(Long.toString(random));
+    assertTrue(file.mkdirs());
+    return new File[] {file};
+  }
+
+  private void createPR(MemberVM memberVM) {
+    memberVM.invoke("Create partitioned region", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+
+      RegionFactory<Integer, TestDelta> regionFactory = 
cache.createRegionFactory();
+
+      regionFactory.setDiskSynchronous(true);
+      PartitionAttributesFactory<Integer, TestDelta> paf =
+          new PartitionAttributesFactory<>();
+      paf.setRedundantCopies(1);
+      PartitionAttributes<Integer, TestDelta> prAttr = paf.create();
+      regionFactory.setPartitionAttributes(prAttr);
+      regionFactory.setDataPolicy(DataPolicy.PARTITION);
+      regionFactory.setSubscriptionAttributes(new 
SubscriptionAttributes(InterestPolicy.ALL));
+      regionFactory.create(TEST_REGION_NAME);
+    });
+  }
+
+  private static class TestObjectSizer implements ObjectSizer {
+    private final AtomicInteger invocations = new AtomicInteger();
+
+    @Override
+    public int sizeof(Object o) {
+      logger.debug("TestObjectSizer invoked"/* , new Exception("stack trace") 
*/);
+      if (o instanceof TestObject) {
+        invocations.incrementAndGet();
+        return ((TestObject) o).sizeForSizer;
+      }
+      if (o instanceof TestDelta) {
+        invocations.incrementAndGet();
+        return ((TestDelta) o).info.length();
+      }
+      if (o instanceof Integer) {
+        return 0;
+      }
+      if (o instanceof TestKey) {
+        invocations.incrementAndGet();
+        return ((TestKey) o).value.length();
+      }
+      throw new RuntimeException("Unexpected type to be sized " + o.getClass() 
+ ", object=" + o);
+    }
+  }
+
+  private static class TestKey implements DataSerializable {
+    String value;
+
+    public TestKey(String value) {
+      this.value = value;
+    }
+
+    @Override
+    public void fromData(DataInput in) throws IOException, 
ClassNotFoundException {
+      value = DataSerializer.readString(in);
+    }
+
+    @Override
+    public void toData(DataOutput out) throws IOException {
+      DataSerializer.writeString(value, out);
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((value == null) ? 0 : value.hashCode());
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (!(obj instanceof TestKey)) {
+        return false;
+      }
+      TestKey other = (TestKey) obj;
+      if (value == null) {
+        return other.value == null;
+      } else {
+        return value.equals(other.value);
+      }
+    }
+
+  }
+
+  private static class TestObject implements DataSerializable {
+    public int sizeForSizer;
+    public int sizeForSerialization;
+
+    public TestObject(int sizeForSerialization, int sizeForSizer) {
+      super();
+      this.sizeForSizer = sizeForSizer;
+      this.sizeForSerialization = sizeForSerialization;
+    }
+
+    @Override
+    public void fromData(DataInput in) throws IOException, 
ClassNotFoundException {
+      sizeForSizer = in.readInt();
+      sizeForSerialization = in.readInt();
+      // We don't actually need these things.
+      in.skipBytes(sizeForSerialization);
+    }
+
+    @Override
+    public void toData(DataOutput out) throws IOException {
+      out.writeInt(sizeForSizer);
+      out.writeInt(sizeForSerialization);
+      out.write(new byte[sizeForSerialization]);
+
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + sizeForSerialization;
+      result = prime * result + sizeForSizer;
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (!(obj instanceof TestObject)) {
+        return false;
+      }
+      TestObject other = (TestObject) obj;
+      if (sizeForSerialization != other.sizeForSerialization) {
+        return false;
+      }
+      return sizeForSizer == other.sizeForSizer;
+    }
+
+    @Override
+    public String toString() {
+      return "TestObject [sizeForSerialization=" + sizeForSerialization + ", 
sizeForSizer="
+          + sizeForSizer + "]";
+    }
+  }
+
+  public static class TestCacheListener<K, V> extends CacheListenerAdapter<K, 
V> {
+
+    @Override
+    public void afterCreate(EntryEvent<K, V> event) {
+      // Make sure we deserialize the new value
+      logger.debug("invoked afterCreate with " + event);
+      logger.info(String.format("%s",
+          "value is " + event.getNewValue()));
+    }
+
+    @Override
+    public void afterUpdate(EntryEvent<K, V> event) {
+      // Make sure we deserialize the new value
+      logger.debug("invoked afterUpdate with ");
+      logger.info(String.format("%s",
+          "value is " + event.getNewValue()));

Review comment:
       See above comments in the `afterCreate()` method.

##########
File path: 
geode-core/src/distributedTest/java/org/apache/geode/internal/cache/DeltaForceSizingFlagDUnitTest.java
##########
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.logging.log4j.Logger;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.DataSerializable;
+import org.apache.geode.DataSerializer;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.DiskStoreFactory;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.EvictionAction;
+import org.apache.geode.cache.EvictionAttributes;
+import org.apache.geode.cache.InterestPolicy;
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.cache.SubscriptionAttributes;
+import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.cache.util.ObjectSizer;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+/**
+ * Tests the use of the per-delta "forceRecalculateSize" flag.
+ */
+
+public class DeltaForceSizingFlagDUnitTest {
+  private static final String TEST_REGION_NAME = "forceResizeTestRegionName";
+  public static final String SMALLER_DELTA_DATA = "12345";
+  public static final String LARGER_DELTA_DATA = "1234567890";
+  public static final String DELTA_KEY = "a_key";
+  public static final String RR_DISK_STORE_NAME = 
"_forceRecalculateSize_replicate_store";
+  private static final Logger logger = LogService.getLogger();
+
+  @Rule
+  public ClusterStartupRule cluster = new ClusterStartupRule();
+
+  protected MemberVM locator;
+  protected MemberVM server1;
+  protected MemberVM server2;
+
+  @Before
+  public void setup() {
+    int locatorPort;
+    locator = cluster.startLocatorVM(0);
+    locatorPort = locator.getPort();
+
+    server1 = cluster.startServerVM(1, locatorPort);
+    server2 = cluster.startServerVM(2, locatorPort);
+  }
+
+  @Test
+  public void testRRMemLRUDelta() {
+    doRRMemLRUDeltaTest(false);
+  }
+
+  @Test
+  public void testRRMemLRUDeltaAndFlag() {
+    doRRMemLRUDeltaTest(true);
+  }
+
+  @Test
+  public void testPRNoLRUDelta() {
+    doPRNoLRUDeltaTest(false);
+  }
+
+  @Test
+  public void testPRNoLRUAndFlagDelta() {
+    doPRNoLRUDeltaTest(true);
+  }
+
+  private void doRRMemLRUDeltaTest(boolean shouldSizeChange) {
+    VM vm1 = server1.getVM();
+    VM vm2 = server2.getVM();
+
+    createRR(server1);
+    createRR(server2);
+    TestDelta delta1 = new TestDelta(false, SMALLER_DELTA_DATA, 
shouldSizeChange);
+    put(vm1, delta1);
+
+    assertValueType(vm1, ValueType.RAW_VALUE);
+    assertValueType(vm2, ValueType.CD_SERIALIZED);
+    assertEquals(1, getObjectSizerInvocations(vm1));
+    assertEquals(0, getObjectSizerInvocations(vm2));
+
+    long origEvictionSize0 = getSizeFromEvictionStats(vm1);
+    long origEvictionSize1 = getSizeFromEvictionStats(vm2);
+    delta1.info = LARGER_DELTA_DATA;
+    delta1.hasDelta = true;
+    // Update the delta
+    put(vm1, delta1);
+
+    assertValueType(vm1, ValueType.RAW_VALUE);
+    assertValueType(vm2, ValueType.CD_DESERIALIZED);
+
+    assertEquals(2, getObjectSizerInvocations(vm1));
+
+    long finalEvictionSize0 = getSizeFromEvictionStats(vm1);
+    long finalEvictionSize1 = getSizeFromEvictionStats(vm2);
+    assertEquals(5, finalEvictionSize0 - origEvictionSize0);
+    if (shouldSizeChange) {
+      assertEquals(1, getObjectSizerInvocations(vm2));
+      // I'm not sure what the change in size should be, because we went
+      // from serialized to deserialized
+      assertTrue(finalEvictionSize1 - origEvictionSize1 != 0);
+    } else {
+      // we invoke the sizer once when we deserialize the original to apply 
the delta to it
+      assertEquals(0, getObjectSizerInvocations(vm2));
+      assertEquals(0, finalEvictionSize1 - origEvictionSize1);
+    }
+  }
+
+  private void doPRNoLRUDeltaTest(boolean shouldSizeChange) {
+    VM vm1 = server1.getVM();
+    VM vm2 = server2.getVM();
+
+    createPR(server1);
+    createPR(server2);
+
+    TestDelta delta1 = new TestDelta(false, SMALLER_DELTA_DATA, 
shouldSizeChange);
+    put(vm1, delta1);
+    long origPRSize0 = getSizeFromPRStats(vm1);
+    long origPRSize1 = getSizeFromPRStats(vm2);
+
+    // Update the delta
+    delta1.info = LARGER_DELTA_DATA;
+    delta1.hasDelta = true;
+    put(vm1, delta1);
+    long finalPRSize0 = getSizeFromPRStats(vm1);
+    long finalPRSize1 = getSizeFromPRStats(vm2);
+
+    if (shouldSizeChange) {
+      // I'm not sure what the change in size should be, because we went
+      // from serialized to deserialized
+      assertTrue(finalPRSize0 - origPRSize0 != 0);
+      assertTrue(finalPRSize1 - origPRSize1 != 0);
+    } else {
+      assertEquals(0, finalPRSize0 - origPRSize0);
+      assertEquals(0, finalPRSize1 - origPRSize1);
+    }
+  }
+
+  private long getSizeFromPRStats(VM vm0) {
+    return vm0.invoke("getSizeFromPRStats", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      if (region instanceof PartitionedRegion) {
+        long total = 0;
+        PartitionedRegion pr = (PartitionedRegion) region;
+        int totalNumBuckets = pr.getPartitionAttributes().getTotalNumBuckets();
+        for (int i = 0; i < totalNumBuckets; i++) {
+          total += pr.getDataStore().getBucketSize(i);
+        }
+        return total;
+      } else {
+        return 0L;
+      }
+    });
+  }
+
+  private long getSizeFromEvictionStats(VM vm0) {
+    return vm0.invoke("getSizeFromEvictionStats", () -> {
+
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      return region.getEvictionCounter();
+    });
+  }
+
+  private int getObjectSizerInvocations(VM vm0) {
+    return vm0.invoke("getObjectSizerInvocations", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      return getObjectSizerInvocations(region);
+    });
+  }
+
+  private void put(VM vm0, final Object value) {
+    vm0.invoke("Put data", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      region.put(DeltaForceSizingFlagDUnitTest.DELTA_KEY, value);
+    });
+  }
+
+  protected static int getObjectSizerInvocations(LocalRegion region) {
+    TestObjectSizer sizer = (TestObjectSizer) 
region.getEvictionAttributes().getObjectSizer();
+    int result = sizer.invocations.get();
+    logger.info("objectSizerInvocations=" + result);
+    return result;
+  }
+
+  private void createRR(MemberVM memberVM) {
+    memberVM.invoke("Create replicateRegion", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      RegionFactory<Integer, TestDelta> regionFactory = 
cache.createRegionFactory();
+      regionFactory.setDiskSynchronous(true);
+      regionFactory.setDataPolicy(DataPolicy.REPLICATE);
+      regionFactory.setScope(Scope.DISTRIBUTED_ACK);
+      
regionFactory.setEvictionAttributes(EvictionAttributes.createLRUMemoryAttributes(1,
+          new TestObjectSizer(), EvictionAction.OVERFLOW_TO_DISK));
+
+      DiskStoreFactory diskStoreFactory = cache.createDiskStoreFactory();
+      diskStoreFactory.setDiskDirs(getMyDiskDirs());
+      diskStoreFactory.create(RR_DISK_STORE_NAME);
+      regionFactory.setDiskStoreName(RR_DISK_STORE_NAME);
+
+      regionFactory.create(TEST_REGION_NAME);
+    });
+  }
+
+  private void assertValueType(VM vm, final ValueType expectedType) {
+    vm.invoke("assertValueType", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      Object value = 
region.getValueInVM(DeltaForceSizingFlagDUnitTest.DELTA_KEY);
+      switch (expectedType) {
+        case RAW_VALUE:
+          assertFalse("Value was " + value + " type " + value.getClass(),
+              (value instanceof CachedDeserializable));
+          break;
+        case CD_SERIALIZED:
+          assertTrue("Value was " + value + " type " + value.getClass(),
+              value instanceof CachedDeserializable);
+          assertTrue("Value not serialized",
+              ((CachedDeserializable) value).getValue() instanceof byte[]);
+          break;
+        case CD_DESERIALIZED:
+          assertTrue("Value was " + value + " type " + value.getClass(),
+              value instanceof CachedDeserializable);
+          assertFalse("Value was serialized",
+              (((CachedDeserializable) value).getValue() instanceof byte[]));
+          break;
+        case EVICTED:
+          assertNull(value);
+          break;
+      }
+    });
+  }
+
+  private static File[] getMyDiskDirs() {
+    long random = new Random().nextLong();
+    File file = new File(Long.toString(random));
+    assertTrue(file.mkdirs());
+    return new File[] {file};
+  }
+
+  private void createPR(MemberVM memberVM) {
+    memberVM.invoke("Create partitioned region", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+
+      RegionFactory<Integer, TestDelta> regionFactory = 
cache.createRegionFactory();
+
+      regionFactory.setDiskSynchronous(true);
+      PartitionAttributesFactory<Integer, TestDelta> paf =
+          new PartitionAttributesFactory<>();
+      paf.setRedundantCopies(1);
+      PartitionAttributes<Integer, TestDelta> prAttr = paf.create();
+      regionFactory.setPartitionAttributes(prAttr);
+      regionFactory.setDataPolicy(DataPolicy.PARTITION);
+      regionFactory.setSubscriptionAttributes(new 
SubscriptionAttributes(InterestPolicy.ALL));
+      regionFactory.create(TEST_REGION_NAME);
+    });
+  }
+
+  private static class TestObjectSizer implements ObjectSizer {
+    private final AtomicInteger invocations = new AtomicInteger();
+
+    @Override
+    public int sizeof(Object o) {
+      logger.debug("TestObjectSizer invoked"/* , new Exception("stack trace") 
*/);

Review comment:
       The commented out code here should be removed, possibly along with the 
rest of the log message. It doesn't seem to add much in terms of helping a 
developer debug a possible future failure. If the logging is left in, it should 
be changed to info level.

##########
File path: 
geode-core/src/distributedTest/java/org/apache/geode/internal/cache/DeltaForceSizingFlagDUnitTest.java
##########
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.logging.log4j.Logger;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.DataSerializable;
+import org.apache.geode.DataSerializer;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.DiskStoreFactory;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.EvictionAction;
+import org.apache.geode.cache.EvictionAttributes;
+import org.apache.geode.cache.InterestPolicy;
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.cache.SubscriptionAttributes;
+import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.cache.util.ObjectSizer;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+/**
+ * Tests the use of the per-delta "forceRecalculateSize" flag.
+ */
+
+public class DeltaForceSizingFlagDUnitTest {
+  private static final String TEST_REGION_NAME = "forceResizeTestRegionName";
+  public static final String SMALLER_DELTA_DATA = "12345";
+  public static final String LARGER_DELTA_DATA = "1234567890";
+  public static final String DELTA_KEY = "a_key";
+  public static final String RR_DISK_STORE_NAME = 
"_forceRecalculateSize_replicate_store";
+  private static final Logger logger = LogService.getLogger();
+
+  @Rule
+  public ClusterStartupRule cluster = new ClusterStartupRule();
+
+  protected MemberVM locator;
+  protected MemberVM server1;
+  protected MemberVM server2;
+
+  @Before
+  public void setup() {
+    int locatorPort;
+    locator = cluster.startLocatorVM(0);
+    locatorPort = locator.getPort();
+
+    server1 = cluster.startServerVM(1, locatorPort);
+    server2 = cluster.startServerVM(2, locatorPort);
+  }
+
+  @Test
+  public void testRRMemLRUDelta() {
+    doRRMemLRUDeltaTest(false);
+  }
+
+  @Test
+  public void testRRMemLRUDeltaAndFlag() {
+    doRRMemLRUDeltaTest(true);
+  }
+
+  @Test
+  public void testPRNoLRUDelta() {
+    doPRNoLRUDeltaTest(false);
+  }
+
+  @Test
+  public void testPRNoLRUAndFlagDelta() {
+    doPRNoLRUDeltaTest(true);
+  }
+
+  private void doRRMemLRUDeltaTest(boolean shouldSizeChange) {
+    VM vm1 = server1.getVM();
+    VM vm2 = server2.getVM();
+
+    createRR(server1);
+    createRR(server2);
+    TestDelta delta1 = new TestDelta(false, SMALLER_DELTA_DATA, 
shouldSizeChange);
+    put(vm1, delta1);
+
+    assertValueType(vm1, ValueType.RAW_VALUE);
+    assertValueType(vm2, ValueType.CD_SERIALIZED);
+    assertEquals(1, getObjectSizerInvocations(vm1));
+    assertEquals(0, getObjectSizerInvocations(vm2));
+
+    long origEvictionSize0 = getSizeFromEvictionStats(vm1);
+    long origEvictionSize1 = getSizeFromEvictionStats(vm2);
+    delta1.info = LARGER_DELTA_DATA;
+    delta1.hasDelta = true;
+    // Update the delta
+    put(vm1, delta1);
+
+    assertValueType(vm1, ValueType.RAW_VALUE);
+    assertValueType(vm2, ValueType.CD_DESERIALIZED);
+
+    assertEquals(2, getObjectSizerInvocations(vm1));
+
+    long finalEvictionSize0 = getSizeFromEvictionStats(vm1);
+    long finalEvictionSize1 = getSizeFromEvictionStats(vm2);
+    assertEquals(5, finalEvictionSize0 - origEvictionSize0);
+    if (shouldSizeChange) {
+      assertEquals(1, getObjectSizerInvocations(vm2));
+      // I'm not sure what the change in size should be, because we went
+      // from serialized to deserialized
+      assertTrue(finalEvictionSize1 - origEvictionSize1 != 0);
+    } else {
+      // we invoke the sizer once when we deserialize the original to apply 
the delta to it
+      assertEquals(0, getObjectSizerInvocations(vm2));
+      assertEquals(0, finalEvictionSize1 - origEvictionSize1);
+    }
+  }
+
+  private void doPRNoLRUDeltaTest(boolean shouldSizeChange) {
+    VM vm1 = server1.getVM();
+    VM vm2 = server2.getVM();
+
+    createPR(server1);
+    createPR(server2);
+
+    TestDelta delta1 = new TestDelta(false, SMALLER_DELTA_DATA, 
shouldSizeChange);
+    put(vm1, delta1);
+    long origPRSize0 = getSizeFromPRStats(vm1);
+    long origPRSize1 = getSizeFromPRStats(vm2);
+
+    // Update the delta
+    delta1.info = LARGER_DELTA_DATA;
+    delta1.hasDelta = true;
+    put(vm1, delta1);
+    long finalPRSize0 = getSizeFromPRStats(vm1);
+    long finalPRSize1 = getSizeFromPRStats(vm2);
+
+    if (shouldSizeChange) {
+      // I'm not sure what the change in size should be, because we went
+      // from serialized to deserialized
+      assertTrue(finalPRSize0 - origPRSize0 != 0);
+      assertTrue(finalPRSize1 - origPRSize1 != 0);
+    } else {
+      assertEquals(0, finalPRSize0 - origPRSize0);
+      assertEquals(0, finalPRSize1 - origPRSize1);
+    }
+  }
+
+  private long getSizeFromPRStats(VM vm0) {
+    return vm0.invoke("getSizeFromPRStats", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      if (region instanceof PartitionedRegion) {
+        long total = 0;
+        PartitionedRegion pr = (PartitionedRegion) region;
+        int totalNumBuckets = pr.getPartitionAttributes().getTotalNumBuckets();
+        for (int i = 0; i < totalNumBuckets; i++) {
+          total += pr.getDataStore().getBucketSize(i);
+        }
+        return total;
+      } else {
+        return 0L;
+      }
+    });
+  }
+
+  private long getSizeFromEvictionStats(VM vm0) {
+    return vm0.invoke("getSizeFromEvictionStats", () -> {
+
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      return region.getEvictionCounter();
+    });
+  }
+
+  private int getObjectSizerInvocations(VM vm0) {
+    return vm0.invoke("getObjectSizerInvocations", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      return getObjectSizerInvocations(region);
+    });
+  }
+
+  private void put(VM vm0, final Object value) {
+    vm0.invoke("Put data", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      region.put(DeltaForceSizingFlagDUnitTest.DELTA_KEY, value);
+    });
+  }
+
+  protected static int getObjectSizerInvocations(LocalRegion region) {
+    TestObjectSizer sizer = (TestObjectSizer) 
region.getEvictionAttributes().getObjectSizer();
+    int result = sizer.invocations.get();
+    logger.info("objectSizerInvocations=" + result);
+    return result;
+  }
+
+  private void createRR(MemberVM memberVM) {
+    memberVM.invoke("Create replicateRegion", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      RegionFactory<Integer, TestDelta> regionFactory = 
cache.createRegionFactory();
+      regionFactory.setDiskSynchronous(true);
+      regionFactory.setDataPolicy(DataPolicy.REPLICATE);
+      regionFactory.setScope(Scope.DISTRIBUTED_ACK);
+      
regionFactory.setEvictionAttributes(EvictionAttributes.createLRUMemoryAttributes(1,
+          new TestObjectSizer(), EvictionAction.OVERFLOW_TO_DISK));
+
+      DiskStoreFactory diskStoreFactory = cache.createDiskStoreFactory();
+      diskStoreFactory.setDiskDirs(getMyDiskDirs());
+      diskStoreFactory.create(RR_DISK_STORE_NAME);
+      regionFactory.setDiskStoreName(RR_DISK_STORE_NAME);
+
+      regionFactory.create(TEST_REGION_NAME);
+    });
+  }
+
+  private void assertValueType(VM vm, final ValueType expectedType) {
+    vm.invoke("assertValueType", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+      LocalRegion region = (LocalRegion) cache.getRegion(TEST_REGION_NAME);
+      Object value = 
region.getValueInVM(DeltaForceSizingFlagDUnitTest.DELTA_KEY);
+      switch (expectedType) {
+        case RAW_VALUE:
+          assertFalse("Value was " + value + " type " + value.getClass(),
+              (value instanceof CachedDeserializable));
+          break;
+        case CD_SERIALIZED:
+          assertTrue("Value was " + value + " type " + value.getClass(),
+              value instanceof CachedDeserializable);
+          assertTrue("Value not serialized",
+              ((CachedDeserializable) value).getValue() instanceof byte[]);
+          break;
+        case CD_DESERIALIZED:
+          assertTrue("Value was " + value + " type " + value.getClass(),
+              value instanceof CachedDeserializable);
+          assertFalse("Value was serialized",
+              (((CachedDeserializable) value).getValue() instanceof byte[]));
+          break;
+        case EVICTED:
+          assertNull(value);
+          break;
+      }
+    });
+  }
+
+  private static File[] getMyDiskDirs() {
+    long random = new Random().nextLong();
+    File file = new File(Long.toString(random));
+    assertTrue(file.mkdirs());
+    return new File[] {file};
+  }
+
+  private void createPR(MemberVM memberVM) {
+    memberVM.invoke("Create partitioned region", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertNotNull(cache);
+
+      RegionFactory<Integer, TestDelta> regionFactory = 
cache.createRegionFactory();
+
+      regionFactory.setDiskSynchronous(true);
+      PartitionAttributesFactory<Integer, TestDelta> paf =
+          new PartitionAttributesFactory<>();
+      paf.setRedundantCopies(1);
+      PartitionAttributes<Integer, TestDelta> prAttr = paf.create();
+      regionFactory.setPartitionAttributes(prAttr);
+      regionFactory.setDataPolicy(DataPolicy.PARTITION);
+      regionFactory.setSubscriptionAttributes(new 
SubscriptionAttributes(InterestPolicy.ALL));
+      regionFactory.create(TEST_REGION_NAME);
+    });
+  }
+
+  private static class TestObjectSizer implements ObjectSizer {
+    private final AtomicInteger invocations = new AtomicInteger();
+
+    @Override
+    public int sizeof(Object o) {
+      logger.debug("TestObjectSizer invoked"/* , new Exception("stack trace") 
*/);
+      if (o instanceof TestObject) {
+        invocations.incrementAndGet();
+        return ((TestObject) o).sizeForSizer;
+      }
+      if (o instanceof TestDelta) {
+        invocations.incrementAndGet();
+        return ((TestDelta) o).info.length();
+      }
+      if (o instanceof Integer) {
+        return 0;
+      }
+      if (o instanceof TestKey) {
+        invocations.incrementAndGet();
+        return ((TestKey) o).value.length();
+      }
+      throw new RuntimeException("Unexpected type to be sized " + o.getClass() 
+ ", object=" + o);
+    }
+  }
+
+  private static class TestKey implements DataSerializable {
+    String value;
+
+    public TestKey(String value) {
+      this.value = value;
+    }
+
+    @Override
+    public void fromData(DataInput in) throws IOException, 
ClassNotFoundException {
+      value = DataSerializer.readString(in);
+    }
+
+    @Override
+    public void toData(DataOutput out) throws IOException {
+      DataSerializer.writeString(value, out);
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((value == null) ? 0 : value.hashCode());
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (!(obj instanceof TestKey)) {
+        return false;
+      }
+      TestKey other = (TestKey) obj;
+      if (value == null) {
+        return other.value == null;
+      } else {
+        return value.equals(other.value);
+      }
+    }
+
+  }
+
+  private static class TestObject implements DataSerializable {
+    public int sizeForSizer;
+    public int sizeForSerialization;
+
+    public TestObject(int sizeForSerialization, int sizeForSizer) {

Review comment:
       This class is no longer instantiated anywhere in the test, so can be 
removed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow individual deltas to trigger bucket size recalculation
> ------------------------------------------------------------
>
>                 Key: GEODE-8894
>                 URL: https://issues.apache.org/jira/browse/GEODE-8894
>             Project: Geode
>          Issue Type: New Feature
>          Components: core, serialization
>    Affects Versions: 1.14.0
>            Reporter: Raymond Ingles
>            Priority: Major
>              Labels: pull-request-available
>
> The Redis subsystem uses Deltas heavily, but by default deltas do not trigger 
> an update to the size of their buckets. This leads to incorrect memory usage 
> accounting over the long term, especially with the use of Redis commands like 
> "APPEND".
> It is possible to set the system property "DELTAS_RECALCULATE_SIZE", but this 
> is a global value that would affect the processing of all deltas, including 
> non-Redis operations.
> Instead, we will add a new default method to the Delta interface, that can be 
> overridden by individual Delta implementations (such as Redis). This will 
> trigger the same behavior as DELTAS_RECALCULATE_SIZE, but on a per-delta 
> basis. Thus, other Geode operations will not force bucket size recalculations 
> unless the global property is set, but Redis statistics will be correct.
> Other types of delta operations may find this useful in the future.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to