This is an automated email from the ASF dual-hosted git repository.

mhanson pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 30bd1cef01 GEODE-9704: Ensure that register interest is called before 
ready for events (#7442)
30bd1cef01 is described below

commit 30bd1cef01b555c84e970c548cfb0e55f06fbf1c
Author: mhansonp <[email protected]>
AuthorDate: Wed Apr 6 09:27:03 2022 -0700

    GEODE-9704: Ensure that register interest is called before ready for events 
(#7442)
    
    
    - RegisterInterestOps need to happen before ReadyForEventsOp is sent
      These changes make sure that happens.
    
    - Added an InterestResultPolicyCheck
    
    Authored-by: Barry Oglesby <[email protected]>
    
    Un-ignored a test that will reproduce the issue
    periodically during a number of runs. It is a flaky
    test without the core fix.
    
    Authored-by: Jinmei Liao <[email protected]>
---
 .../tier/sockets/DurableRegistrationDUnitTest.java | 802 ---------------------
 .../DurableRegistrationDistributedTest.java        | 703 ++++++++++++++++++
 ...est.java => AuthExpirationDistributedTest.java} |  18 +-
 ...tTest.java => QueueManagerIntegrationTest.java} | 178 +++--
 .../cache/client/internal/QueueManagerImpl.java    |  59 +-
 .../cache/client/internal/ReadyForEventsOp.java    |   4 +-
 .../client/internal/RegisterInterestListOp.java    |   4 +-
 .../client/internal/RegisterInterestTracker.java   |  52 +-
 .../cache/client/internal/ServerRegionProxy.java   |   2 +-
 .../apache/geode/internal/cache/LocalRegion.java   |   8 +-
 .../internal/cache/LocalRegionUpdateUnitTest.java  | 149 ++++
 .../dunit/internal/JUnit4DistributedTestCase.java  |   2 +-
 12 files changed, 1084 insertions(+), 897 deletions(-)

diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableRegistrationDUnitTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableRegistrationDUnitTest.java
deleted file mode 100644
index 51e69cff23..0000000000
--- 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableRegistrationDUnitTest.java
+++ /dev/null
@@ -1,802 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
- * agreements. See the NOTICE file distributed with this work for additional 
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the 
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
- * or implied. See the License for the specific language governing permissions 
and limitations under
- * the License.
- */
-package org.apache.geode.internal.cache.tier.sockets;
-
-import static 
org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID;
-import static 
org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_TIMEOUT;
-import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
-import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
-import static 
org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.Properties;
-import java.util.Set;
-
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.CacheException;
-import org.apache.geode.cache.InterestResultPolicy;
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.client.Pool;
-import org.apache.geode.cache.client.PoolFactory;
-import org.apache.geode.cache.client.PoolManager;
-import org.apache.geode.cache30.CacheSerializableRunnable;
-import org.apache.geode.distributed.internal.DistributionConfig;
-import org.apache.geode.internal.cache.CacheServerImpl;
-import org.apache.geode.internal.cache.FilterProfile;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
-import org.apache.geode.internal.cache.LocalRegion;
-import org.apache.geode.internal.cache.PoolFactoryImpl;
-import org.apache.geode.test.awaitility.GeodeAwaitility;
-import org.apache.geode.test.dunit.Assert;
-import org.apache.geode.test.dunit.Host;
-import org.apache.geode.test.dunit.LogWriterUtils;
-import org.apache.geode.test.dunit.NetworkUtils;
-import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.WaitCriterion;
-import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
-import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
-
-/**
- * We have 2 servers and One client which registers some keys with durable 
interest and some without
- * it. We maintain queues on only One server as redundancy level is one. 
Following 2 tests have the
- * two TestCase scenarios
- *
- * There are two Tests First Test does the follows : // Step 1: Starting the 
servers // Step 2:
- * Bring Up the Client // Step 3: Client registers Interests // Step 4: Update 
Values on the Server
- * for Keys // Step 5: Verify Updates on the Client // Step 6: Close Cache of 
the DurableClient //
- * Step 7: Update the Values // Step 8: Re-start the Client // Step 9: Verify 
Updates on the Client
- * // Step 10 : Stop all VMs
- *
- * For Test 2 the steps are as follows // Step 1: Starting the servers // Step 
2: Bring Up the
- * Client // Step 3: Client registers Interests // Step 4: Update Values on 
the Server for Keys //
- * Step 5: Verify Updates on the Client // Step 6: Close Cache of the 
DurableClient // Step 7:
- * Update the Values // Step 8: Re-start the Client // Step 9: Send Client 
Ready Message // Step 10:
- * Register all Keys (K1, K2 as Non-Durable. K3, K4 as Durable) // Step 11: 
Unregister Some Keys
- * (Here K1, K3) // Step 12: Modify values on the server for all the Keys // 
Step 13: Check the
- * values for the ones not unregistered and the Unregistered Keys' Values 
should be null
- */
-@Category({ClientSubscriptionTest.class})
-public class DurableRegistrationDUnitTest extends JUnit4DistributedTestCase {
-
-  private VM server1VM;
-
-  private VM server2VM;
-
-  private VM durableClientVM;
-
-  private String regionName;
-
-  private int PORT1;
-
-  private int PORT2;
-
-  private static final String K1 = "KEY_STONE1";
-
-  private static final String K2 = "KEY_STONE2";
-
-  private static final String K3 = "KEY_STONE3";
-
-  private static final String K4 = "KEY_STONE4";
-
-  public DurableRegistrationDUnitTest() {
-    super();
-  }
-
-  @Override
-  public final void postSetUp() throws Exception {
-    Host host = Host.getHost(0);
-    server1VM = host.getVM(0);
-    server2VM = host.getVM(1);
-    durableClientVM = host.getVM(2);
-    regionName = DurableRegistrationDUnitTest.class.getName() + "_region";
-    CacheServerTestUtil.disableShufflingOfEndpoints();
-  }
-
-  @Test
-  public void testSimpleDurableClient() {
-
-    // Step 1: Starting the servers
-    PORT1 = server1VM
-        .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, 
Boolean.TRUE));
-    PORT2 = server2VM
-        .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, 
Boolean.TRUE));
-
-    // Step 2: Bring Up the Client
-    // Start a durable client that is not kept alive on the server when it
-    // stops normally
-    final String durableClientId = getName() + "_client";
-
-    final int durableClientTimeout = 600; // keep the client alive for 600
-    // seconds
-    durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
-        
getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), PORT1, 
PORT2, true,
-            0),
-        regionName, getClientDistributedSystemProperties(durableClientId, 
durableClientTimeout),
-        Boolean.TRUE));
-
-    // Send clientReady message
-    durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
-      @Override
-      public void run2() throws CacheException {
-        CacheServerTestUtil.getCache().readyForEvents();
-      }
-    });
-
-    // Step 3: Client registers Interests
-    // KEY_STONE1, KEY_STONE2 are registered as durableKeys & KEY_STONE3,
-    // KEY_STONE4 as non-durableKeys
-
-    durableClientVM
-        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K1, 
Boolean.FALSE));
-    durableClientVM
-        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K2, 
Boolean.FALSE));
-    durableClientVM
-        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K3, 
Boolean.TRUE));
-    durableClientVM
-        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K4, 
Boolean.TRUE));
-
-    // Step 4: Update Values on the Server for KEY_STONE1, KEY_STONE2,
-    // KEY_STONE3, KEY_STONE4
-
-    server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K1, 
"Value1"));
-    server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K2, 
"Value2"));
-    server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K3, 
"Value3"));
-    server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K4, 
"Value4"));
-
-    Wait.pause(1000);
-    // Step 5: Verify Updates on the Client
-
-    assertEquals("Value1", server2VM.invoke(() -> 
DurableRegistrationDUnitTest.getValue(K1)));
-    assertEquals("Value1", server1VM.invoke(() -> 
DurableRegistrationDUnitTest.getValue(K1)));
-
-    assertEquals("Value1",
-        durableClientVM.invoke(() -> 
DurableRegistrationDUnitTest.getValue(K1)));
-    assertEquals("Value2",
-        durableClientVM.invoke(() -> 
DurableRegistrationDUnitTest.getValue(K2)));
-    assertEquals("Value3",
-        durableClientVM.invoke(() -> 
DurableRegistrationDUnitTest.getValue(K3)));
-    assertEquals("Value4",
-        durableClientVM.invoke(() -> 
DurableRegistrationDUnitTest.getValue(K4)));
-
-    // Step 6: Close Cache of the DurableClient
-    durableClientVM.invoke(DurableRegistrationDUnitTest::closeCache);
-    // pause(5000);
-    // Step 7: Update KEY_STONE1,KEY_STONE2,KEY_STONE3,KEY_STONE4 on the
-    // Server say with values PingPong1, PingPong2, PingPong3, PingPong4
-    server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K1, 
"PingPong1"));
-    server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K2, 
"PingPong2"));
-    server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K3, 
"PingPong3"));
-    server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K4, 
"PingPong4"));
-
-    // Step 8: Re-start the Client
-    durableClientVM
-        .invoke(() -> CacheServerTestUtil.createCacheClient(
-            
getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), PORT1, 
PORT2,
-                true, 0),
-            regionName, getClientDistributedSystemProperties(durableClientId), 
Boolean.TRUE));
-
-    // Send clientReady message
-    durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
-      @Override
-      public void run2() throws CacheException {
-        CacheServerTestUtil.getCache().readyForEvents();
-      }
-    });
-
-    Wait.pause(5000);
-
-    assertNull(durableClientVM.invoke(() -> 
DurableRegistrationDUnitTest.getValue(K1)));
-    assertNull(durableClientVM.invoke(() -> 
DurableRegistrationDUnitTest.getValue(K2)));
-
-    durableClientVM
-        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K1, 
Boolean.FALSE));
-
-    Wait.pause(5000);
-    assertNull(durableClientVM.invoke(() -> 
DurableRegistrationDUnitTest.getValue(K1)));
-    assertNull(durableClientVM.invoke(() -> 
DurableRegistrationDUnitTest.getValue(K2)));
-
-    server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K1, 
"PingPong_updated_1"));
-    server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K2, 
"PingPong_updated_2"));
-    server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K3, 
"PingPong_updated_3"));
-    server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K4, 
"PingPong_updated_4"));
-
-    Wait.pause(5000);
-
-    // Step 9: Verify Updates on the Client
-    assertEquals("PingPong_updated_1",
-        durableClientVM.invoke(() -> 
DurableRegistrationDUnitTest.getValue(K1)));
-    assertNull(durableClientVM.invoke(() -> 
DurableRegistrationDUnitTest.getValue(K2)));
-    assertEquals("PingPong_updated_3",
-        durableClientVM.invoke(() -> 
DurableRegistrationDUnitTest.getValue(K3)));
-    assertEquals("PingPong_updated_4",
-        durableClientVM.invoke(() -> 
DurableRegistrationDUnitTest.getValue(K4)));
-
-    // Step 10 : Stop all VMs
-    // Stop the durable client
-    durableClientVM.invoke(() -> CacheServerTestUtil.closeCache());
-
-    // Stop server 2
-    server2VM.invoke(() -> CacheServerTestUtil.closeCache());
-
-    // Stop server 1
-    server1VM.invoke(() -> CacheServerTestUtil.closeCache());
-
-  }
-
-  @Test
-  public void testSimpleDurableClientWithRegistration() {
-    // Step 1: Starting the servers
-    PORT1 = server1VM
-        .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, 
Boolean.TRUE));
-    PORT2 = server2VM
-        .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, 
Boolean.TRUE));
-
-    // Step 2: Bring Up the Client
-    // Start a durable client that is not kept alive on the server when it
-    // stops normally
-    final String durableClientId = getName() + "_client";
-    // keep the client alive for 600 seconds
-    final int durableClientTimeout = 600;
-    durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
-        
getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), PORT1, 
PORT2, true,
-            0),
-        regionName, getClientDistributedSystemProperties(durableClientId, 
durableClientTimeout)));
-
-    // Send clientReady message
-    durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
-      @Override
-      public void run2() throws CacheException {
-        CacheServerTestUtil.getCache().readyForEvents();
-      }
-    });
-
-    // Step 3: Client registers Interests
-    // KEY_STONE1, KEY_STONE2 are registered as durableKeys & KEY_STONE3,
-    // KEY_STONE4 as non-durableKeys
-
-    durableClientVM
-        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K1, 
Boolean.FALSE));
-    durableClientVM
-        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K2, 
Boolean.FALSE));
-    durableClientVM
-        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K3, 
Boolean.TRUE));
-    durableClientVM
-        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K4, 
Boolean.TRUE));
-
-    // Step 4: Update Values on the Server for KEY_STONE1, KEY_STONE2,
-    // KEY_STONE3, KEY_STONE4
-
-    server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K1, 
"Value1"));
-    server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K2, 
"Value2"));
-    server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K3, 
"Value3"));
-    server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K4, 
"Value4"));
-
-    Wait.pause(1000);
-    // Step 5: Verify Updates on the Client
-
-    assertEquals("Value1", server2VM.invoke(() -> 
DurableRegistrationDUnitTest.getValue(K1)));
-    assertEquals("Value1", server1VM.invoke(() -> 
DurableRegistrationDUnitTest.getValue(K1)));
-
-    assertEquals("Value1",
-        durableClientVM.invoke(() -> 
DurableRegistrationDUnitTest.getValue(K1)));
-    assertEquals("Value2",
-        durableClientVM.invoke(() -> 
DurableRegistrationDUnitTest.getValue(K2)));
-    assertEquals("Value3",
-        durableClientVM.invoke(() -> 
DurableRegistrationDUnitTest.getValue(K3)));
-    assertEquals("Value4",
-        durableClientVM.invoke(() -> 
DurableRegistrationDUnitTest.getValue(K4)));
-
-    // Step 6: Close Cache of the DurableClient
-    durableClientVM.invoke(DurableRegistrationDUnitTest::closeCache);
-    // pause(5000);
-    // Step 7: Update KEY_STONE1,KEY_STONE2,KEY_STONE3,KEY_STONE4 on the
-    // Server say with values PingPong1, PingPong2, PingPong3, PingPong4
-    server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K1, 
"PingPong1"));
-    server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K2, 
"PingPong2"));
-    server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K3, 
"PingPong3"));
-    server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K4, 
"PingPong4"));
-
-    // Step 8: Re-start the Client
-    durableClientVM
-        .invoke(() -> CacheServerTestUtil.createCacheClient(
-            
getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), PORT1, 
PORT2,
-                true, 0),
-            regionName, getClientDistributedSystemProperties(durableClientId), 
Boolean.TRUE));
-
-    // Step 9: Send clientReady message
-    durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
-      @Override
-      public void run2() throws CacheException {
-        CacheServerTestUtil.getCache().readyForEvents();
-      }
-    });
-
-    // pause(1000);
-
-    // Step 10: Register all Keys
-    durableClientVM
-        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K3, 
Boolean.TRUE));
-    durableClientVM
-        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K4, 
Boolean.TRUE));
-
-    durableClientVM
-        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K1, 
Boolean.FALSE));
-    durableClientVM
-        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K2, 
Boolean.FALSE));
-
-    // Step 11: Unregister Some Keys (Here K1, K3)
-    durableClientVM.invoke(() -> 
DurableRegistrationDUnitTest.unregisterKey(K1));
-    durableClientVM.invoke(() -> 
DurableRegistrationDUnitTest.unregisterKey(K3));
-
-    Wait.pause(5000);
-
-    // Step 12: Modify values on the server for all the Keys
-    server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K1, 
"PingPong_updated_1"));
-    server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K2, 
"PingPong_updated_2"));
-    server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K3, 
"PingPong_updated_3"));
-    server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K4, 
"PingPong_updated_4"));
-
-    Wait.pause(5000);
-
-    // Step 13: Check the values for the ones not unregistered and the
-    // Unregistered Keys' Values should be null
-    try {
-      assertEquals("PingPong_updated_2",
-          durableClientVM.invoke(() -> 
DurableRegistrationDUnitTest.getValue(K2)));
-    } catch (Exception e) {
-      fail("Prob in KEY_STONE2: "
-          + durableClientVM.invoke(() -> 
DurableRegistrationDUnitTest.getValue(K2)));
-    }
-
-    try {
-      assertEquals("PingPong_updated_4",
-          durableClientVM.invoke(() -> 
DurableRegistrationDUnitTest.getValue(K4)));
-    } catch (Exception e) {
-      fail("Prob in KEY_STONE4: "
-          + durableClientVM.invoke(() -> 
DurableRegistrationDUnitTest.getValue(K4)));
-
-    }
-
-    try {
-      assertNull(durableClientVM.invoke(() -> 
DurableRegistrationDUnitTest.getValue(K1)));
-    } catch (Exception e) {
-      fail("Prob in KEY_STONE1: "
-          + durableClientVM.invoke(() -> 
DurableRegistrationDUnitTest.getValue(K1)));
-
-    }
-
-    try {
-      assertNull(durableClientVM.invoke(() -> 
DurableRegistrationDUnitTest.getValue(K3)));
-    } catch (Exception e) {
-      fail("Prob in KEY_STONE3: "
-          + durableClientVM.invoke(() -> 
DurableRegistrationDUnitTest.getValue(K3)));
-
-    }
-
-    // Stop the durable client
-    durableClientVM.invoke(() -> CacheServerTestUtil.closeCache());
-
-    // Stop server 2
-    server2VM.invoke(() -> CacheServerTestUtil.closeCache());
-
-    // Stop server 1
-    server1VM.invoke(() -> CacheServerTestUtil.closeCache());
-
-  }
-
-  @Test
-  public void testDurableClientWithRegistrationHA() {
-
-    // Step 1: Start server1
-    PORT1 = server1VM
-        .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, 
Boolean.TRUE));
-    PORT2 = getRandomAvailableTCPPort();
-
-    // Step 2: Bring Up the Client
-    final String durableClientId = getName() + "_client";
-    // keep the client alive for 600 seconds
-    final int durableClientTimeout = 600;
-    durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
-        
getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), PORT1, 
PORT2, true,
-            1),
-        regionName, getClientDistributedSystemProperties(durableClientId, 
durableClientTimeout)));
-
-    // Send clientReady message
-    durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
-      @Override
-      public void run2() throws CacheException {
-        CacheServerTestUtil.getCache().readyForEvents();
-      }
-    });
-
-    // Step 3: Client registers Interests
-    durableClientVM
-        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K1, 
Boolean.FALSE));
-    durableClientVM
-        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K2, 
Boolean.FALSE));
-    durableClientVM
-        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K3, 
Boolean.TRUE));
-    durableClientVM
-        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K4, 
Boolean.TRUE));
-
-    // Step 4: Bring up the server2
-
-    server2VM
-        .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, 
Boolean.TRUE, PORT2));
-
-    Wait.pause(3000);
-
-    // Check server2 got all the interests registered by the durable client.
-    server2VM.invoke(new CacheSerializableRunnable("Verify Interests.") {
-      @Override
-      public void run2() throws CacheException {
-        LogWriterUtils.getLogWriter()
-            .info("### Verifying interests registered by DurableClient. ###");
-        CacheClientNotifier ccn = CacheClientNotifier.getInstance();
-        CacheClientProxy p = null;
-
-        // Get proxy for the client.
-        for (int i = 0; i < 60; i++) {
-          Iterator ps = ccn.getClientProxies().iterator();
-          if (!ps.hasNext()) {
-            Wait.pause(1000);
-            continue;
-          } else {
-            p = (CacheClientProxy) ps.next();
-            break;
-          }
-        }
-
-        if (p == null) {
-          fail("Proxy initialization taking long time. Increase the wait 
time.");
-        }
-
-        Iterator rs = p.getInterestRegisteredRegions().iterator();
-        String rName = (String) rs.next();
-        assertNotNull("Null region Name found.", rs);
-        LocalRegion r = (LocalRegion) 
GemFireCacheImpl.getInstance().getRegion(rName);
-        assertNotNull("Null region found.", r);
-        FilterProfile pf = r.getFilterProfile();
-        Set intrests = Collections.EMPTY_SET;
-
-        Set interestKeys = pf.getKeysOfInterest(p.getProxyID().getDurableId());
-        assertNotNull("durable Interests not found for the proxy", 
interestKeys);
-        assertEquals("The number of durable keys registered during HARegion 
GII doesn't match.",
-            interestKeys.size(), 2);
-        interestKeys = pf.getKeysOfInterest(p.getProxyID());
-        assertNotNull("non-durable Interests not found for the proxy", 
interestKeys);
-        assertEquals("The number of non-durable keys registered during 
HARegion GII doesn't match.",
-            interestKeys.size(), 2);
-      }
-    });
-
-
-    // Stop the durable client
-    durableClientVM.invoke(() -> CacheServerTestUtil.closeCache());
-
-    // Stop server 2
-    server2VM.invoke(() -> CacheServerTestUtil.closeCache());
-
-    // Stop server 1
-    server1VM.invoke(() -> CacheServerTestUtil.closeCache());
-
-  }
-
-  @Test
-  public void testDurableClientDisConnectWithRegistrationHA() {
-
-    // Step 1: Start server1
-    PORT1 = server1VM
-        .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, 
Boolean.TRUE));
-    PORT2 = getRandomAvailableTCPPort();
-
-    // Step 2: Bring Up the Client
-    final String durableClientId = getName() + "_client";
-    // keep the client alive for 600 seconds
-    final int durableClientTimeout = 600;
-    durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
-        
getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), PORT1, 
PORT2, true,
-            1),
-        regionName, getClientDistributedSystemProperties(durableClientId, 
durableClientTimeout)));
-
-    // Send clientReady message
-    durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
-      @Override
-      public void run2() throws CacheException {
-        CacheServerTestUtil.getCache().readyForEvents();
-      }
-    });
-
-    // Step 3: Client registers Interests
-    durableClientVM
-        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K1, 
Boolean.FALSE));
-    durableClientVM
-        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K2, 
Boolean.FALSE));
-    durableClientVM
-        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K3, 
Boolean.TRUE));
-    durableClientVM
-        .invoke(() -> DurableRegistrationDUnitTest.registerKey(K4, 
Boolean.TRUE));
-
-    // Close Cache of the DurableClient
-    durableClientVM.invoke(DurableRegistrationDUnitTest::closeCache);
-
-    Wait.pause(2000);
-
-    // Re-start the Client
-    durableClientVM
-        .invoke(() -> CacheServerTestUtil.createCacheClient(
-            
getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), PORT1, 
PORT2,
-                true, 1),
-            regionName, getClientDistributedSystemProperties(durableClientId), 
Boolean.TRUE));
-
-    // Send clientReady message
-    durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
-      @Override
-      public void run2() throws CacheException {
-        CacheServerTestUtil.getCache().readyForEvents();
-      }
-    });
-
-    // Step 4: Bring up the server2
-    server2VM
-        .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, 
Boolean.TRUE, PORT2));
-
-    Wait.pause(3000);
-
-    // Check server2 got all the interests registered by the durable client.
-    server2VM.invoke(new CacheSerializableRunnable("Verify Interests.") {
-      @Override
-      public void run2() throws CacheException {
-        LogWriterUtils.getLogWriter()
-            .info("### Verifying interests registered by DurableClient. ###");
-        CacheClientNotifier ccn = CacheClientNotifier.getInstance();
-        CacheClientProxy p = null;
-
-        // Get proxy for the client.
-        for (int i = 0; i < 60; i++) {
-          Iterator ps = ccn.getClientProxies().iterator();
-          if (!ps.hasNext()) {
-            Wait.pause(1000);
-            continue;
-          } else {
-            p = (CacheClientProxy) ps.next();
-            break;
-          }
-        }
-
-        if (p == null) {
-          fail("Proxy initialization taking long time. Increase the wait 
time.");
-        }
-
-        Iterator rs = p.getInterestRegisteredRegions().iterator();
-        String rName = (String) rs.next();
-        assertNotNull("Null region Name found.", rs);
-        LocalRegion r = (LocalRegion) 
GemFireCacheImpl.getInstance().getRegion(rName);
-        assertNotNull("Null region found.", r);
-        FilterProfile pf = r.getFilterProfile();
-        Set intrests = Collections.EMPTY_SET;
-
-        Set interestKeys = pf.getKeysOfInterest(p.getProxyID().getDurableId());
-        assertNotNull("durable Interests not found for the proxy", 
interestKeys);
-        assertEquals("The number of durable keys registered during HARegion 
GII doesn't match.",
-            interestKeys.size(), 2);
-        interestKeys = pf.getKeysOfInterest(p.getProxyID());
-        assertNull("non-durable Interests found for the proxy", interestKeys);
-      }
-    });
-
-
-    // Stop the durable client
-    durableClientVM.invoke(() -> CacheServerTestUtil.closeCache());
-
-    // Stop server 2
-    server2VM.invoke(() -> CacheServerTestUtil.closeCache());
-
-    // Stop server 1
-    server1VM.invoke(() -> CacheServerTestUtil.closeCache());
-
-  }
-
-  private static void unregisterAllKeys() {
-    // Get the region
-    Region region = CacheServerTestUtil.getCache()
-        .getRegion(DurableRegistrationDUnitTest.class.getName() + "_region");
-    // Region region =
-    // 
CacheServerTestUtil.getCache().getRegion(DurableClientSampleDUnitTest.regionName);
-    assertNotNull(region);
-    region.unregisterInterest(K1);
-    region.unregisterInterest(K2);
-    region.unregisterInterest(K3);
-    region.unregisterInterest(K4);
-
-  }
-
-  private static void registerKeys() throws Exception {
-    try {
-      // Get the region
-      Region region = CacheServerTestUtil.getCache()
-          .getRegion(DurableRegistrationDUnitTest.class.getName() + "_region");
-      // Region region =
-      // 
CacheServerTestUtil.getCache().getRegion(DurableClientSampleDUnitTest.regionName);
-      assertNotNull(region);
-
-      region.registerInterest(K1, InterestResultPolicy.KEYS_VALUES, false);
-      region.registerInterest(K2, InterestResultPolicy.KEYS_VALUES, false);
-      region.registerInterest(K3, InterestResultPolicy.KEYS_VALUES, true);
-      region.registerInterest(K4, InterestResultPolicy.KEYS_VALUES, true);
-
-      assertNotNull(region.getInterestList());
-
-    } catch (Exception ex) {
-      Assert.fail("failed while registering interest in registerKey function", 
ex);
-    }
-  }
-
-  private static String getValue(String key) {
-    Region r = CacheServerTestUtil.getCache()
-        .getRegion(DurableRegistrationDUnitTest.class.getName() + "_region");
-    // Region r = CacheServerTestUtil.getCache().getRegion(regionName);
-    assertNotNull(r);
-    // String value = (String)r.get(key);
-    // String value = (String)r.getEntry(key).getValue();
-    Region.Entry re = r.getEntry(key);
-
-    if (re == null) {
-      return null;
-    } else {
-      return (String) re.getValue();
-    }
-  }
-
-  private static void registerKey(String key, boolean isDurable) throws 
Exception {
-    try {
-      // Get the region
-      Region region = CacheServerTestUtil.getCache()
-          .getRegion(DurableRegistrationDUnitTest.class.getName() + "_region");
-      // Region region =
-      // CacheServerTestUtil.getCache().getRegion(regionName);
-      assertNotNull(region);
-      region.registerInterest(key, InterestResultPolicy.NONE, isDurable);
-    } catch (Exception ex) {
-      Assert.fail("failed while registering interest in registerKey function", 
ex);
-    }
-  }
-
-  private static void unregisterKey(String key) throws Exception {
-    try {
-      // Get the region
-      Region region = CacheServerTestUtil.getCache()
-          .getRegion(DurableRegistrationDUnitTest.class.getName() + "_region");
-      // Region region =
-      // CacheServerTestUtil.getCache().getRegion(regionName);
-      assertNotNull(region);
-      region.unregisterInterest(key);
-    } catch (Exception ex) {
-      Assert.fail("failed while registering interest in registerKey function", 
ex);
-    }
-  }
-
-  private static void putValue(String key, String value) {
-    try {
-      Region r = CacheServerTestUtil.getCache()
-          .getRegion(DurableRegistrationDUnitTest.class.getName() + "_region");
-      // Region r = CacheServerTestUtil.getCache().getRegion(regionName);
-      assertNotNull(r);
-      if (r.getEntry(key) != null) {
-        r.put(key, value);
-      } else {
-        r.create(key, value);
-      }
-      assertEquals(value, r.getEntry(key).getValue());
-    } catch (Exception e) {
-
-      fail("Put in Server has some fight");
-
-    }
-  }
-
-  private Pool getClientPool(String host, int server1Port, int server2Port,
-      boolean establishCallbackConnection, int redundancyLevel) {
-    PoolFactory pf = PoolManager.createFactory();
-    pf.addServer(host, server1Port).addServer(host, server2Port)
-        .setSubscriptionEnabled(establishCallbackConnection)
-        .setSubscriptionRedundancy(redundancyLevel);
-    return ((PoolFactoryImpl) pf).getPoolAttributes();
-  }
-
-  private Properties getClientDistributedSystemProperties(String 
durableClientId) {
-    return getClientDistributedSystemProperties(durableClientId,
-        DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT);
-  }
-
-  private static void checkNumberOfClientProxies(final int expected) {
-    WaitCriterion ev = new WaitCriterion() {
-      @Override
-      public boolean done() {
-        return expected == getNumberOfClientProxies();
-      }
-
-      @Override
-      public String description() {
-        return null;
-      }
-    };
-    GeodeAwaitility.await().untilAsserted(ev);
-  }
-
-  protected static int getNumberOfClientProxies() {
-    return 
getBridgeServer().getAcceptor().getCacheClientNotifier().getClientProxies().size();
-  }
-
-  private Properties getClientDistributedSystemProperties(String 
durableClientId,
-      int durableClientTimeout) {
-    Properties properties = new Properties();
-    properties.setProperty(MCAST_PORT, "0");
-    properties.setProperty(LOCATORS, "");
-    properties.setProperty(DURABLE_CLIENT_ID, durableClientId);
-    properties.setProperty(DURABLE_CLIENT_TIMEOUT, 
String.valueOf(durableClientTimeout));
-    return properties;
-  }
-
-  private static CacheClientProxy getClientProxy() {
-    // Get the CacheClientNotifier
-    CacheClientNotifier notifier = 
getBridgeServer().getAcceptor().getCacheClientNotifier();
-
-    // Get the CacheClientProxy or not (if proxy set is empty)
-    CacheClientProxy proxy = null;
-    Iterator i = notifier.getClientProxies().iterator();
-    if (i.hasNext()) {
-      proxy = (CacheClientProxy) i.next();
-    }
-    return proxy;
-  }
-
-  private static CacheServerImpl getBridgeServer() {
-    CacheServerImpl bridgeServer =
-        (CacheServerImpl) 
CacheServerTestUtil.getCache().getCacheServers().iterator().next();
-    assertNotNull(bridgeServer);
-    return bridgeServer;
-  }
-
-  public static void closeCache() {
-    Cache cache = CacheServerTestUtil.getCache();
-    if (cache != null && !cache.isClosed()) {
-      cache.close(true);
-      cache.getDistributedSystem().disconnect();
-    }
-  }
-
-  public String getRegionName() {
-    return regionName;
-  }
-
-  public void setRegionName(String regionName) {
-    this.regionName = regionName;
-  }
-
-  @Override
-  public final void preTearDown() throws Exception {
-    CacheServerTestUtil.resetDisableShufflingOfEndpointsFlag();
-  }
-}
diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableRegistrationDistributedTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableRegistrationDistributedTest.java
new file mode 100644
index 0000000000..627403fc38
--- /dev/null
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableRegistrationDistributedTest.java
@@ -0,0 +1,703 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import static 
org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID;
+import static 
org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_TIMEOUT;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static 
org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort;
+import static 
org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.createCacheClient;
+import static 
org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.createCacheServer;
+import static 
org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.disableShufflingOfEndpoints;
+import static 
org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.getCache;
+import static 
org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.getClientCache;
+import static 
org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.resetDisableShufflingOfEndpointsFlag;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
+
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.CacheException;
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.Pool;
+import org.apache.geode.cache.client.PoolFactory;
+import org.apache.geode.cache.client.PoolManager;
+import org.apache.geode.cache30.CacheSerializableRunnable;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.FilterProfile;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.PoolFactoryImpl;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.Wait;
+import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
+import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
+
+/**
+ * We have 2 servers and One client which registers some keys with durable 
interest and some without
+ * it. We maintain queues on only One server as redundancy level is one. 
Following 2 tests have the
+ * two TestCase scenarios
+ *
+ * There are two Tests First Test does the follows : // Step 1: Starting the 
servers // Step 2:
+ * Bring Up the Client // Step 3: Client registers Interests // Step 4: Update 
Values on the Server
+ * for Keys // Step 5: Verify Updates on the Client // Step 6: Close Cache of 
the DurableClient //
+ * Step 7: Update the Values // Step 8: Re-start the Client // Step 9: Verify 
Updates on the Client
+ * // Step 10 : Stop all VMs
+ *
+ * For Test 2 the steps are as follows // Step 1: Starting the servers // Step 
2: Bring Up the
+ * Client // Step 3: Client registers Interests // Step 4: Update Values on 
the Server for Keys //
+ * Step 5: Verify Updates on the Client // Step 6: Close Cache of the 
DurableClient // Step 7:
+ * Update the Values // Step 8: Re-start the Client // Step 9: Send Client 
Ready Message // Step 10:
+ * Register all Keys (K1, K2 as Non-Durable. K3, K4 as Durable) // Step 11: 
Unregister Some Keys
+ * (Here K1, K3) // Step 12: Modify values on the server for all the Keys // 
Step 13: Check the
+ * values for the ones not unregistered and the Unregistered Keys' Values 
should be null
+ */
+@Category({ClientSubscriptionTest.class})
+public class DurableRegistrationDistributedTest extends 
JUnit4DistributedTestCase {
+
+  private VM server1VM;
+
+  private VM server2VM;
+
+  private VM durableClientVM;
+
+  private String regionName;
+  private String hostName;
+
+  private int PORT1;
+
+  private int PORT2;
+
+  private static final String K1 = "KEY_STONE1";
+
+  private static final String K2 = "KEY_STONE2";
+
+  private static final String K3 = "KEY_STONE3";
+
+  private static final String K4 = "KEY_STONE4";
+
+  public DurableRegistrationDistributedTest() {
+    super();
+  }
+
+  @Override
+  public final void postSetUp() {
+    server1VM = VM.getVM(0);
+    server2VM = VM.getVM(1);
+    durableClientVM = VM.getVM(2);
+    hostName = VM.getHostName();
+    regionName = DurableRegistrationDistributedTest.class.getName() + 
"_region";
+    disableShufflingOfEndpoints();
+  }
+
+  @Test
+  public void testSimpleDurableClient() {
+
+    // Step 1: Starting the servers
+    PORT1 = server1VM.invoke(() -> createCacheServer(regionName, 
Boolean.TRUE));
+    PORT2 = server2VM.invoke(() -> createCacheServer(regionName, 
Boolean.TRUE));
+
+    // Step 2: Bring Up the Client
+    // Start a durable client that is not kept alive on the server when it 
stops normally
+    final String durableClientId = getName() + "_client";
+
+    final int durableClientTimeout = 600; // keep the client alive for 600
+    // seconds
+    durableClientVM.invoke(() -> createCacheClient(
+        getClientPool(hostName, PORT1, PORT2, true,
+            0),
+        regionName, getClientDistributedSystemProperties(durableClientId, 
durableClientTimeout),
+        Boolean.TRUE));
+
+
+
+    // Step 3: Client registers Interests
+    // KEY_STONE1, KEY_STONE2 are registered as durableKeys & KEY_STONE3,
+    // KEY_STONE4 as non-durableKeys
+    durableClientVM.invoke(() -> registerKey(K1, Boolean.FALSE));
+    durableClientVM.invoke(() -> registerKey(K2, Boolean.FALSE));
+    durableClientVM.invoke(() -> registerKey(K3, Boolean.TRUE));
+    durableClientVM.invoke(() -> registerKey(K4, Boolean.TRUE));
+
+
+    // Send clientReady message
+    durableClientVM.invoke("Send clientReady", new CacheSerializableRunnable() 
{
+      @Override
+      public void run2() throws CacheException {
+        getClientCache().readyForEvents();
+      }
+    });
+
+    // Step 4: Update Values on the Server for KEY_STONE1, KEY_STONE2,
+    // KEY_STONE3, KEY_STONE4
+    server2VM.invoke(() -> putValue(K1, "Value1"));
+    server2VM.invoke(() -> putValue(K2, "Value2"));
+    server2VM.invoke(() -> putValue(K3, "Value3"));
+    server2VM.invoke(() -> putValue(K4, "Value4"));
+
+
+    GeodeAwaitility.await().atMost(1000, TimeUnit.MILLISECONDS)
+        .until(() -> server2VM.invoke(() -> 
getValue(K1).contentEquals("Value1")));
+    // Step 5: Verify Updates on the Client
+    assertThat(server2VM.invoke(() -> getValue(K1))).isEqualTo("Value1");
+    assertThat(server1VM.invoke(() -> getValue(K1))).isEqualTo("Value1");
+
+    assertThat(durableClientVM.invoke(() -> getValue(K1))).isEqualTo("Value1");
+    assertThat(durableClientVM.invoke(() -> getValue(K2))).isEqualTo("Value2");
+    assertThat(durableClientVM.invoke(() -> getValue(K3))).isEqualTo("Value3");
+    assertThat(durableClientVM.invoke(() -> getValue(K4))).isEqualTo("Value4");
+
+    // Step 6: Close Cache of the DurableClient
+    durableClientVM.invoke(this::closeCache);
+
+    // Step 7: Update KEY_STONE1,KEY_STONE2,KEY_STONE3,KEY_STONE4 on the
+    // Server say with values PingPong1, PingPong2, PingPong3, PingPong4
+    server2VM.invoke(() -> putValue(K1, "PingPong1"));
+    server2VM.invoke(() -> putValue(K2, "PingPong2"));
+    server2VM.invoke(() -> putValue(K3, "PingPong3"));
+    server2VM.invoke(() -> putValue(K4, "PingPong4"));
+
+    // Step 8: Re-start the Client
+    durableClientVM.invoke(() -> createCacheClient(
+        getClientPool(hostName, PORT1, PORT2,
+            true, 0),
+        regionName, getClientDistributedSystemProperties(durableClientId), 
Boolean.TRUE));
+
+    // Send clientReady message
+    durableClientVM.invoke("Send clientReady", new CacheSerializableRunnable() 
{
+      @Override
+      public void run2() throws CacheException {
+        getClientCache().readyForEvents();
+      }
+    });
+
+    GeodeAwaitility.await().atMost(5000, TimeUnit.MILLISECONDS)
+        .until(() -> durableClientVM.invoke(() -> getValue(K1) == null));
+
+    assertThat(durableClientVM.invoke(() -> getValue(K2))).isNull();
+
+    durableClientVM.invoke(() -> registerKey(K1, Boolean.FALSE));
+
+    GeodeAwaitility.await().atMost(5000, TimeUnit.MILLISECONDS)
+        .until(() -> durableClientVM.invoke(() -> getValue(K1) == null));
+    assertThat(durableClientVM.invoke(() -> getValue(K2))).isNull();
+
+    server2VM.invoke(() -> putValue(K1, "PingPong_updated_1"));
+    server2VM.invoke(() -> putValue(K2, "PingPong_updated_2"));
+    server2VM.invoke(() -> putValue(K3, "PingPong_updated_3"));
+    server2VM.invoke(() -> putValue(K4, "PingPong_updated_4"));
+
+
+    // Step 9: Verify Updates on the Client
+    GeodeAwaitility.await().atMost(5000, TimeUnit.MILLISECONDS).until(
+        () -> durableClientVM.invoke(() -> 
getValue(K1).contentEquals("PingPong_updated_1")));
+
+    assertThat(durableClientVM.invoke(() -> getValue(K2))).isNull();
+    assertThat(durableClientVM.invoke(() -> 
getValue(K3))).isEqualTo("PingPong_updated_3");
+    assertThat(durableClientVM.invoke(() -> 
getValue(K4))).isEqualTo("PingPong_updated_4");
+
+    // Step 10 : Stop all VMs
+    // Stop the durable client
+    durableClientVM.invoke(() -> CacheServerTestUtil.closeCache());
+
+    // Stop server 2
+    server2VM.invoke(() -> CacheServerTestUtil.closeCache());
+
+    // Stop server 1
+    server1VM.invoke(() -> CacheServerTestUtil.closeCache());
+
+  }
+
+  @Test
+  public void testSimpleDurableClientWithRegistration() {
+    // Step 1: Starting the servers
+    PORT1 = server1VM.invoke(() -> createCacheServer(regionName, 
Boolean.TRUE));
+    PORT2 = server2VM.invoke(() -> createCacheServer(regionName, 
Boolean.TRUE));
+
+    // Step 2: Bring Up the Client
+    // Start a durable client that is not kept alive on the server when it 
stops normally
+    final String durableClientId = getName() + "_client";
+    // keep the client alive for 600 seconds
+    final int durableClientTimeout = 600;
+    durableClientVM.invoke(() -> createCacheClient(
+        getClientPool(hostName, PORT1, PORT2, true,
+            0),
+        regionName, getClientDistributedSystemProperties(durableClientId, 
durableClientTimeout)));
+
+    // Step 3: Client registers Interests
+    // KEY_STONE1, KEY_STONE2 are registered as durableKeys & KEY_STONE3,
+    // KEY_STONE4 as non-durableKeys
+
+    durableClientVM.invoke(() -> registerKey(K1, Boolean.FALSE));
+    durableClientVM.invoke(() -> registerKey(K2, Boolean.FALSE));
+    durableClientVM.invoke(() -> registerKey(K3, Boolean.TRUE));
+    durableClientVM.invoke(() -> registerKey(K4, Boolean.TRUE));
+
+    // Send clientReady message
+    durableClientVM.invoke("Send clientReady", new CacheSerializableRunnable() 
{
+      @Override
+      public void run2() throws CacheException {
+        getClientCache().readyForEvents();
+      }
+    });
+
+    // Step 4: Update Values on the Server for KEY_STONE1, KEY_STONE2,
+    // KEY_STONE3, KEY_STONE4
+    server2VM.invoke(() -> putValue(K1, "Value1"));
+    server2VM.invoke(() -> putValue(K2, "Value2"));
+    server2VM.invoke(() -> putValue(K3, "Value3"));
+    server2VM.invoke(() -> putValue(K4, "Value4"));
+
+
+    // Step 5: Verify Updates on the Client
+    GeodeAwaitility.await().atMost(1000, TimeUnit.MILLISECONDS)
+        .until(() -> server2VM.invoke(() -> 
getValue(K1).contentEquals("Value1")));
+    assertThat(server1VM.invoke(() -> getValue(K1))).isEqualTo("Value1");
+
+    assertThat(durableClientVM.invoke(() -> getValue(K1))).isEqualTo("Value1");
+    assertThat(durableClientVM.invoke(() -> getValue(K2))).isEqualTo("Value2");
+    assertThat(durableClientVM.invoke(() -> getValue(K3))).isEqualTo("Value3");
+    assertThat(durableClientVM.invoke(() -> getValue(K4))).isEqualTo("Value4");
+
+    // Step 6: Close Cache of the DurableClient
+    durableClientVM.invoke(this::closeCache);
+
+    // Step 7: Update KEY_STONE1,KEY_STONE2,KEY_STONE3,KEY_STONE4 on the
+    // Server say with values PingPong1, PingPong2, PingPong3, PingPong4
+    server2VM.invoke(() -> putValue(K1, "PingPong1"));
+    server2VM.invoke(() -> putValue(K2, "PingPong2"));
+    server2VM.invoke(() -> putValue(K3, "PingPong3"));
+    server2VM.invoke(() -> putValue(K4, "PingPong4"));
+
+    // Step 8: Re-start the Client
+    durableClientVM.invoke(() -> createCacheClient(
+        getClientPool(hostName, PORT1, PORT2, true, 0),
+        regionName, getClientDistributedSystemProperties(durableClientId), 
Boolean.TRUE));
+
+
+    // Step 9: Register all Keys
+    durableClientVM.invoke(() -> registerKey(K3, Boolean.TRUE));
+    durableClientVM.invoke(() -> registerKey(K4, Boolean.TRUE));
+
+    durableClientVM.invoke(() -> registerKey(K1, Boolean.FALSE));
+    durableClientVM.invoke(() -> registerKey(K2, Boolean.FALSE));
+
+
+    // Step 10: Send clientReady message
+    durableClientVM.invoke("Send clientReady", new CacheSerializableRunnable() 
{
+      @Override
+      public void run2() throws CacheException {
+        getClientCache().readyForEvents();
+      }
+    });
+
+    // Step 11: Unregister Some Keys (Here K1, K3)
+    durableClientVM.invoke(() -> unregisterKey(K1));
+    durableClientVM.invoke(() -> unregisterKey(K3));
+
+    // Step 12: Modify values on the server for all the Keys
+    server2VM.invoke(() -> putValue(K1, "PingPong_updated_1"));
+    server2VM.invoke(() -> putValue(K2, "PingPong_updated_2"));
+    server2VM.invoke(() -> putValue(K3, "PingPong_updated_3"));
+    server2VM.invoke(() -> putValue(K4, "PingPong_updated_4"));
+
+    // Step 13: Check the values for the ones not unregistered and the
+    // Unregistered Keys' Values should be null
+    GeodeAwaitility.await("Prob in KEY_STONE2: ").atMost(5000, 
TimeUnit.MILLISECONDS).until(
+        () -> durableClientVM.invoke(() -> 
getValue(K2).contentEquals("PingPong_updated_2")));
+    assertThat(durableClientVM.invoke(() -> getValue(K4))).describedAs("Prob 
in KEY_STONE4: ")
+        .isEqualTo("PingPong_updated_4");
+
+
+    assertThat(durableClientVM.invoke(() -> getValue(K1))).describedAs("Prob 
in KEY_STONE1: ")
+        .isNull();
+    assertThat(durableClientVM.invoke(() -> getValue(K3))).describedAs("Prob 
in KEY_STONE1: ")
+        .isEqualTo("PingPong3");
+
+    // Stop the durable client
+    durableClientVM.invoke(() -> CacheServerTestUtil.closeCache());
+
+    // Stop server 2
+    server2VM.invoke(() -> CacheServerTestUtil.closeCache());
+
+    // Stop server 1
+    server1VM.invoke(() -> CacheServerTestUtil.closeCache());
+
+  }
+
+  @Test
+  public void testDurableClientWithRegistrationHA() {
+
+    // Step 1: Start server1
+    PORT1 = server1VM.invoke(() -> createCacheServer(regionName, 
Boolean.TRUE));
+    PORT2 = getRandomAvailableTCPPort();
+
+    // Step 2: Bring Up the Client
+    final String durableClientId = getName() + "_client";
+    // keep the client alive for 600 seconds
+    final int durableClientTimeout = 600;
+    durableClientVM.invoke(() -> createCacheClient(
+        getClientPool(hostName, PORT1, PORT2, true,
+            1),
+        regionName, getClientDistributedSystemProperties(durableClientId, 
durableClientTimeout)));
+
+    // Send clientReady message
+    durableClientVM.invoke("Send clientReady", new CacheSerializableRunnable() 
{
+      @Override
+      public void run2() throws CacheException {
+        getClientCache().readyForEvents();
+      }
+    });
+
+    // Step 3: Client registers Interests
+    durableClientVM.invoke(() -> registerKey(K1, Boolean.FALSE));
+    durableClientVM.invoke(() -> registerKey(K2, Boolean.FALSE));
+    durableClientVM.invoke(() -> registerKey(K3, Boolean.TRUE));
+    durableClientVM.invoke(() -> registerKey(K4, Boolean.TRUE));
+
+    // Step 4: Bring up the server2
+    server2VM.invoke(() -> createCacheServer(regionName, Boolean.TRUE, PORT2));
+
+    GeodeAwaitility.await().atMost(3000, TimeUnit.MILLISECONDS)
+        .until(() -> server2VM.invoke(() -> 
getCache().getCacheServers().get(0).isRunning()));
+
+
+    // Check server2 got all the interests registered by the durable client.
+    server2VM.invoke("Verify Interests.", new CacheSerializableRunnable() {
+      @Override
+      public void run2() throws CacheException {
+        logger.info("### Verifying interests registered by DurableClient. 
###");
+        CacheClientNotifier ccn = CacheClientNotifier.getInstance();
+        CacheClientProxy p = null;
+
+        // Get proxy for the client.
+        for (int i = 0; i < 60; i++) {
+          Iterator<CacheClientProxy> ps = ccn.getClientProxies().iterator();
+          if (!ps.hasNext()) {
+            Wait.pause(1000);
+          } else {
+            p = ps.next();
+            break;
+          }
+        }
+
+        if (p == null) {
+          fail("Proxy initialization taking long time. Increase the wait 
time.");
+        }
+
+        Iterator<String> rs = p.getInterestRegisteredRegions().iterator();
+        String rName = rs.next();
+        assertThat(rs).describedAs("Null region Name found.").isNotNull();
+
+        assertThat(p.getCache()).isEqualTo(GemFireCacheImpl.getInstance());
+        LocalRegion r = (LocalRegion) 
GemFireCacheImpl.getInstance().getRegion(rName);
+        assertThat(r).describedAs("Null region found.").isNotNull();
+        FilterProfile pf = r.getFilterProfile();
+
+        Set<String> interestKeys = 
pf.getKeysOfInterest(p.getProxyID().getDurableId());
+        assertThat(interestKeys).describedAs("durable Interests not found for 
the proxy")
+            .isNotNull();
+        assertThat(interestKeys.size())
+            .describedAs("The number of durable keys registered during 
HARegion GII doesn't match.")
+            .isEqualTo(2);
+        interestKeys = pf.getKeysOfInterest(p.getProxyID());
+        assertThat(interestKeys).describedAs("non-durable Interests not found 
for the proxy")
+            .isNotNull();
+        assertThat(2)
+            .describedAs(
+                "The number of non-durable keys registered during HARegion GII 
doesn't match.")
+            .isEqualTo(interestKeys.size());
+      }
+    });
+
+
+    // Stop the durable client
+    durableClientVM.invoke(() -> CacheServerTestUtil.closeCache());
+
+    // Stop server 2
+    server2VM.invoke(() -> CacheServerTestUtil.closeCache());
+
+    // Stop server 1
+    server1VM.invoke(() -> CacheServerTestUtil.closeCache());
+
+  }
+
+  @Test
+  public void testDurableClientDisConnectWithRegistrationHA() {
+
+    // Step 1: Start server1
+    PORT1 = server1VM
+        .invoke(() -> createCacheServer(regionName, Boolean.TRUE));
+    PORT2 = getRandomAvailableTCPPort();
+
+    // Step 2: Bring Up the Client
+    final String durableClientId = getName() + "_client";
+    // keep the client alive for 600 seconds
+    final int durableClientTimeout = 600;
+    durableClientVM.invoke(() -> createCacheClient(
+        getClientPool(hostName, PORT1, PORT2, true,
+            1),
+        regionName, getClientDistributedSystemProperties(durableClientId, 
durableClientTimeout)));
+
+    // Send clientReady message
+    durableClientVM.invoke("Send clientReady", new CacheSerializableRunnable() 
{
+      @Override
+      public void run2() throws CacheException {
+        getClientCache().readyForEvents();
+      }
+    });
+
+    // Step 3: Client registers Interests
+    durableClientVM.invoke(() -> registerKey(K1, Boolean.FALSE));
+    durableClientVM.invoke(() -> registerKey(K2, Boolean.FALSE));
+    durableClientVM.invoke(() -> registerKey(K3, Boolean.TRUE));
+    durableClientVM.invoke(() -> registerKey(K4, Boolean.TRUE));
+
+    // Close Cache of the DurableClient
+    durableClientVM.invoke(this::closeCache);
+    GeodeAwaitility.await().until(() -> durableClientVM.invoke(() -> 
getClientCache().isClosed()));
+
+    // Re-start the Client
+    durableClientVM.invoke(() -> createCacheClient(
+        getClientPool(hostName, PORT1, PORT2,
+            true, 1),
+        regionName, getClientDistributedSystemProperties(durableClientId), 
Boolean.TRUE));
+
+    // Send clientReady message
+    durableClientVM.invoke("Send clientReady", new CacheSerializableRunnable() 
{
+      @Override
+      public void run2() throws CacheException {
+        getClientCache().readyForEvents();
+      }
+    });
+
+    // Step 4: Bring up the server2
+    server2VM
+        .invoke(() -> createCacheServer(regionName, Boolean.TRUE, PORT2));
+
+    Wait.pause(3000);
+
+    // Check server2 got all the interests registered by the durable client.
+    server2VM.invoke("Verify Interests.", new CacheSerializableRunnable() {
+      @Override
+      public void run2() throws CacheException {
+        logger.info("### Verifying interests registered by DurableClient. 
###");
+        CacheClientNotifier ccn = CacheClientNotifier.getInstance();
+        CacheClientProxy p = null;
+
+        // Get proxy for the client.
+        for (int i = 0; i < 60; i++) {
+          Iterator<CacheClientProxy> ps = ccn.getClientProxies().iterator();
+          if (!ps.hasNext()) {
+            Wait.pause(1000);
+          } else {
+            p = ps.next();
+            break;
+          }
+        }
+
+        if (p == null) {
+          fail("Proxy initialization taking long time. Increase the wait 
time.");
+        }
+
+        Iterator<String> rs = p.getInterestRegisteredRegions().iterator();
+        String rName = rs.next();
+        assertThat(rs).describedAs("Null region Name found.").isNotNull();
+        LocalRegion r = (LocalRegion) 
GemFireCacheImpl.getInstance().getRegion(rName);
+        assertThat(r).describedAs("Null region found.").isNotNull();
+        FilterProfile pf = r.getFilterProfile();
+
+        Set<String> interestKeys = 
pf.getKeysOfInterest(p.getProxyID().getDurableId());
+        assertThat(interestKeys).describedAs("durable Interests not found for 
the proxy")
+            .isNotNull();
+        assertThat(2)
+            .describedAs("The number of durable keys registered during 
HARegion GII doesn't match.")
+            .isEqualTo(interestKeys.size());
+        interestKeys = pf.getKeysOfInterest(p.getProxyID());
+        assertThat(interestKeys).describedAs("non-durable Interests found for 
the proxy").isNull();
+      }
+    });
+
+
+    // Stop the durable client
+    durableClientVM.invoke(() -> CacheServerTestUtil.closeCache());
+
+    // Stop server 2
+    server2VM.invoke(() -> CacheServerTestUtil.closeCache());
+
+    // Stop server 1
+    server1VM.invoke(() -> CacheServerTestUtil.closeCache());
+
+  }
+
+  private void registerKeys() {
+    try {
+      // Get the region
+      Region<String, String> region = getCache()
+          .getRegion(DurableRegistrationDistributedTest.class.getName() + 
"_region");
+
+      assertThat(region).isNotNull();
+
+      region.registerInterest(K1, InterestResultPolicy.KEYS_VALUES, false);
+      region.registerInterest(K2, InterestResultPolicy.KEYS_VALUES, false);
+      region.registerInterest(K3, InterestResultPolicy.KEYS_VALUES, true);
+      region.registerInterest(K4, InterestResultPolicy.KEYS_VALUES, true);
+
+      assertThat(region.getInterestList()).isNotNull();
+
+    } catch (Exception ex) {
+      fail("failed while registering interest in registerKey function", ex);
+    }
+  }
+
+  private String getValue(String key) {
+    Region<String, String> r =
+        
getCache().getRegion(DurableRegistrationDistributedTest.class.getName() + 
"_region");
+
+    assertThat(r).isNotNull();
+
+    Region.Entry<String, String> re = r.getEntry(key);
+
+    if (re == null) {
+      return null;
+    } else {
+      return re.getValue();
+    }
+  }
+
+  private void registerKey(String key, boolean isDurable) {
+    try {
+      // Get the region
+      Region<String, String> region = getCache()
+          .getRegion(DurableRegistrationDistributedTest.class.getName() + 
"_region");
+
+      assertThat(region).isNotNull();
+      region.registerInterest(key, InterestResultPolicy.NONE, isDurable);
+    } catch (Exception ex) {
+      fail("failed while registering interest in registerKey function", ex);
+    }
+  }
+
+  private void unregisterKey(String key) {
+    try {
+      // Get the region
+      Region<String, String> region = getCache()
+          .getRegion(DurableRegistrationDistributedTest.class.getName() + 
"_region");
+
+      assertThat(region).isNotNull();
+      region.unregisterInterest(key);
+    } catch (Exception ex) {
+      fail("failed while registering interest in registerKey function", ex);
+    }
+  }
+
+  private void putValue(String key, String value) {
+    try {
+      Region<String, String> r = getCache()
+          .getRegion(DurableRegistrationDistributedTest.class.getName() + 
"_region");
+
+      assertThat(r).isNotNull();
+      if (r.getEntry(key) != null) {
+        r.put(key, value);
+      } else {
+        r.create(key, value);
+      }
+      assertThat(r.getEntry(key).getValue()).isEqualTo(value);
+    } catch (Exception e) {
+
+      fail("Put in Server has some fight");
+
+    }
+  }
+
+  private Pool getClientPool(String host, int server1Port, int server2Port,
+      boolean establishCallbackConnection, int redundancyLevel) {
+    PoolFactory pf = PoolManager.createFactory();
+    pf.addServer(host, server1Port).addServer(host, server2Port)
+        .setSubscriptionEnabled(establishCallbackConnection)
+        .setSubscriptionRedundancy(redundancyLevel);
+    return ((PoolFactoryImpl) pf).getPoolAttributes();
+  }
+
+  private Properties getClientDistributedSystemProperties(String 
durableClientId) {
+    return getClientDistributedSystemProperties(durableClientId,
+        DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT);
+  }
+
+  protected int getNumberOfClientProxies() {
+    return 
getBridgeServer().getAcceptor().getCacheClientNotifier().getClientProxies().size();
+  }
+
+  private Properties getClientDistributedSystemProperties(String 
durableClientId,
+      int durableClientTimeout) {
+    Properties properties = new Properties();
+    properties.setProperty(MCAST_PORT, "0");
+    properties.setProperty(LOCATORS, "");
+    properties.setProperty(DURABLE_CLIENT_ID, durableClientId);
+    properties.setProperty(DURABLE_CLIENT_TIMEOUT, 
String.valueOf(durableClientTimeout));
+    return properties;
+  }
+
+  private CacheClientProxy getClientProxy() {
+    // Get the CacheClientNotifier
+    CacheClientNotifier notifier = 
getBridgeServer().getAcceptor().getCacheClientNotifier();
+
+    // Get the CacheClientProxy or not (if proxy set is empty)
+    CacheClientProxy proxy = null;
+    Iterator<CacheClientProxy> i = notifier.getClientProxies().iterator();
+    if (i.hasNext()) {
+      proxy = i.next();
+    }
+    return proxy;
+  }
+
+  private CacheServerImpl getBridgeServer() {
+    CacheServerImpl bridgeServer =
+        (CacheServerImpl) getCache().getCacheServers().iterator().next();
+    assertThat(bridgeServer).isNotNull();
+    return bridgeServer;
+  }
+
+  public void closeCache() {
+    ClientCache cache = getClientCache();
+    if (cache != null && !cache.isClosed()) {
+      cache.close(true);
+    }
+  }
+
+  public String getRegionName() {
+    return regionName;
+  }
+
+  public void setRegionName(String regionName) {
+    this.regionName = regionName;
+  }
+
+  @Override
+  public final void preTearDown() {
+    resetDisableShufflingOfEndpointsFlag();
+  }
+}
diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/security/AuthExpirationDUnitTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/security/AuthExpirationDistributedTest.java
similarity index 95%
rename from 
geode-core/src/distributedTest/java/org/apache/geode/security/AuthExpirationDUnitTest.java
rename to 
geode-core/src/distributedTest/java/org/apache/geode/security/AuthExpirationDistributedTest.java
index bf38db0a6b..8a3a1de11b 100644
--- 
a/geode-core/src/distributedTest/java/org/apache/geode/security/AuthExpirationDUnitTest.java
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/security/AuthExpirationDistributedTest.java
@@ -30,7 +30,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.IntStream;
 
 import org.junit.After;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.contrib.java.lang.system.RestoreSystemProperties;
@@ -50,7 +49,7 @@ import org.apache.geode.test.junit.categories.SecurityTest;
 import org.apache.geode.test.junit.rules.ServerStarterRule;
 
 @Category({SecurityTest.class})
-public class AuthExpirationDUnitTest {
+public class AuthExpirationDistributedTest {
   @Rule
   public ClusterStartupRule cluster = new ClusterStartupRule();
 
@@ -119,7 +118,8 @@ public class AuthExpirationDUnitTest {
   }
 
   @Test
-  public void registeredInterest_slowReAuth_policyDefault() throws Exception {
+  public void 
registeredInterestForKeysAndValidatedTheyWereAllReceived_slowReAuth_policyDefault()
+      throws Exception {
     int serverPort = server.getPort();
     clientVM = cluster.startClientVM(0,
         c -> c.withProperty(SECURITY_CLIENT_AUTH_INIT, 
UpdatableUserAuthInitialize.class.getName())
@@ -171,8 +171,8 @@ public class AuthExpirationDUnitTest {
   }
 
   @Test
-  @Ignore("unnecessary test case for re-auth, but it manifests GEODE-9704")
-  public void registeredInterest_slowReAuth_policyKeys_durableClient() throws 
Exception {
+  public void 
registeredInterestForKeysAndValidatedTheyWereAllReceived_slowReAuth_policyNone_durableClient()
+      throws Exception {
     int serverPort = server.getPort();
     clientVM = cluster.startClientVM(0,
         c -> c.withProperty(SECURITY_CLIENT_AUTH_INIT, 
UpdatableUserAuthInitialize.class.getName())
@@ -180,14 +180,13 @@ public class AuthExpirationDUnitTest {
             .withPoolSubscription(true)
             .withServerConnection(serverPort));
 
-
     clientVM.invoke(() -> {
       UpdatableUserAuthInitialize.setUser("user1");
       ClientCache clientCache = ClusterStartupRule.getClientCache();
       Region<Object, Object> region = clientCache
           
.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("region");
 
-      region.registerInterestForAllKeys(InterestResultPolicy.KEYS, true);
+      region.registerInterestForAllKeys(InterestResultPolicy.NONE, true);
       clientCache.readyForEvents();
       UpdatableUserAuthInitialize.setUser("user11");
       // wait for time longer than server's max time to wait to re-authenticate
@@ -215,7 +214,8 @@ public class AuthExpirationDUnitTest {
   private static KeysCacheListener myListener = new KeysCacheListener();
 
   @Test
-  public void registeredInterest_slowReAuth_policyNone_durableClient() throws 
Exception {
+  public void 
registeredInterestForKeysAndValidatedTheyWereAllReceived_slowReAuth_policyNone_CacheListener_durableClient()
+      throws Exception {
     int serverPort = server.getPort();
     clientVM = cluster.startClientVM(0,
         c -> c.withProperty(SECURITY_CLIENT_AUTH_INIT, 
UpdatableUserAuthInitialize.class.getName())
@@ -260,7 +260,7 @@ public class AuthExpirationDUnitTest {
 
 
   @Test
-  public void registeredInterest_slowReAuth_policyNone_nonDurableClient()
+  public void 
registeredInterestForKeysAndValidatedTheyWereAllReceived_slowReAuth_policyNone_nonDurableClient()
       throws Exception {
     int serverPort = server.getPort();
     clientVM = cluster.startClientVM(0,
diff --git 
a/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/QueueManagerJUnitTest.java
 
b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/QueueManagerIntegrationTest.java
similarity index 78%
rename from 
geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/QueueManagerJUnitTest.java
rename to 
geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/QueueManagerIntegrationTest.java
index dbdd1236ad..143dfeaf53 100644
--- 
a/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/QueueManagerJUnitTest.java
+++ 
b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/QueueManagerIntegrationTest.java
@@ -23,7 +23,8 @@ import static 
org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.assertj.core.api.Assertions.catchThrowable;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
@@ -35,22 +36,27 @@ import java.net.Socket;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
 
 import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.InterestResultPolicy;
 import org.apache.geode.cache.NoSubscriptionServersAvailableException;
+import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.cache.client.ServerRefusedConnectionException;
 import org.apache.geode.cache.client.SocketFactory;
 import org.apache.geode.cache.client.SubscriptionNotEnabledException;
@@ -59,26 +65,28 @@ import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.DistributedSystem;
 import org.apache.geode.distributed.internal.ServerLocation;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.PoolStats;
+import org.apache.geode.internal.cache.tier.InterestType;
 import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
 import org.apache.geode.internal.cache.tier.sockets.ServerQueueStatus;
 import org.apache.geode.internal.logging.LocalLogWriter;
-import org.apache.geode.test.junit.categories.ClientServerTest;
 
-@Category(ClientServerTest.class)
-public class QueueManagerJUnitTest {
+@Tag("ClientServerTest")
+public class QueueManagerIntegrationTest {
 
-  private DummyPool pool;
+  private TestPool pool;
   private LocalLogWriter logger;
   private DistributedSystem ds;
   private EndpointManagerImpl endpoints;
-  private DummySource source;
-  private DummyFactory factory;
-  private QueueManager manager;
+  private TestConnectionSource source;
+  private TestConnectionFactory factory;
+  private QueueManagerImpl manager;
   private ScheduledExecutorService background;
   private PoolStats stats;
+  private List<Op> opList;
 
-  @Before
+  @BeforeEach
   public void setUp() {
     logger = new LocalLogWriter(FINEST.intLevel(), System.out);
 
@@ -89,17 +97,17 @@ public class QueueManagerJUnitTest {
     ds = DistributedSystem.connect(properties);
 
     stats = new PoolStats(ds, "QueueManagerJUnitTest");
-    pool = new DummyPool();
+    pool = new TestPool();
     endpoints = new EndpointManagerImpl("pool", ds, ds.getCancelCriterion(), 
pool.getStats());
-    source = new DummySource();
-    factory = new DummyFactory();
+    source = new TestConnectionSource();
+    factory = new TestConnectionFactory();
     background = Executors.newSingleThreadScheduledExecutor();
-
+    opList = new LinkedList<>();
     addIgnoredException("Could not find any server to host primary client 
queue.");
     addIgnoredException("Could not find any server to host redundant client 
queue.");
   }
 
-  @After
+  @AfterEach
   public void tearDown() {
     background.shutdownNow();
     manager.close(false);
@@ -195,9 +203,8 @@ public class QueueManagerJUnitTest {
 
     assertPortEquals(2, manager.getAllConnections().getPrimary());
 
-    await().untilAsserted(() -> {
-      assertPortEquals(new int[] {3, 4, 5}, 
manager.getAllConnections().getBackups());
-    });
+    await().untilAsserted(
+        () -> assertPortEquals(new int[] {3, 4, 5}, 
manager.getAllConnections().getBackups()));
   }
 
   @Test
@@ -216,18 +223,16 @@ public class QueueManagerJUnitTest {
 
     assertPortEquals(1, manager.getAllConnections().getPrimary());
 
-    await().untilAsserted(() -> {
-      assertPortEquals(new int[] {2, 3}, 
manager.getAllConnections().getBackups());
-    });
+    await().untilAsserted(
+        () -> assertPortEquals(new int[] {2, 3}, 
manager.getAllConnections().getBackups()));
 
     Connection backup = manager.getAllConnections().getBackups().get(0);
     backup.destroy();
 
     assertPortEquals(1, manager.getAllConnections().getPrimary());
 
-    await().untilAsserted(() -> {
-      assertPortEquals(new int[] {3, 4}, 
manager.getAllConnections().getBackups());
-    });
+    await().untilAsserted(
+        () -> assertPortEquals(new int[] {3, 4}, 
manager.getAllConnections().getBackups()));
   }
 
   @Test
@@ -238,17 +243,13 @@ public class QueueManagerJUnitTest {
     manager.start(background);
     manager.getAllConnections().getPrimary().destroy();
 
-    Throwable thrown = catchThrowable(() -> {
-      manager.getAllConnections().getPrimary();
-    });
-    
assertThat(thrown).isInstanceOf(NoSubscriptionServersAvailableException.class);
-
+    assertThatThrownBy(() -> manager.getAllConnections().getPrimary())
+        .isInstanceOf(NoSubscriptionServersAvailableException.class);
     factory.addConnection(0, 0, 2);
     factory.addConnection(0, 0, 3);
 
-    await().untilAsserted(() -> {
-      assertThatCode(() -> 
manager.getAllConnections()).doesNotThrowAnyException();
-    });
+    await().untilAsserted(
+        () -> assertThatCode(() -> 
manager.getAllConnections()).doesNotThrowAnyException());
 
     assertPortEquals(2, manager.getAllConnections().getPrimary());
   }
@@ -258,7 +259,7 @@ public class QueueManagerJUnitTest {
     String serverRefusedConnectionExceptionMessage =
         "Peer or client version with ordinal x not supported. Highest known 
version is x.x.x.";
     // Spy the factory so that the createClientToServerConnection method can 
be mocked
-    DummyFactory factorySpy = spy(factory);
+    TestConnectionFactory factorySpy = spy(factory);
     manager = new QueueManagerImpl(pool, endpoints, source, factorySpy, 2, 20, 
logger,
         ClientProxyMembershipID.getNewProxyMembership(ds));
     // Cause a ServerRefusedConnectionException to be thrown from 
createClientToServerConnection
@@ -277,8 +278,8 @@ public class QueueManagerJUnitTest {
 
   @Test
   public void 
testAddToConnectionListCallsCloseConnectionOpWithKeepAliveTrue2() {
-    // Create a DummyConnection
-    DummyConnection connection = factory.addConnection(0, 0, 1);
+    // Create a TestConnection
+    TestConnection connection = factory.addConnection(0, 0, 1);
     assertThat(connection.keepAlive).isFalse();
 
     // Get and close its Endpoint
@@ -294,6 +295,70 @@ public class QueueManagerJUnitTest {
     assertThat(connection.keepAlive).isTrue();
   }
 
+  @Test
+  public void recoverPrimaryRegistersBeforeSendingReady() {
+    Set<ServerLocation> excludedServers = new HashSet<>();
+    excludedServers.add(new ServerLocation("localhost", 1));
+    excludedServers.add(new ServerLocation("localhost", 2));
+    excludedServers.add(new ServerLocation("localhost", 3));
+    factory.addConnection(0, 0, 1);
+    factory.addConnection(0, 0, 2);
+    factory.addConnection(0, 0, 3);
+
+    LocalRegion testRegion = mock(LocalRegion.class);
+
+    InternalPool pool = new RecoveryTestPool();
+    ServerRegionProxy serverRegionProxy = new ServerRegionProxy("region", 
pool);
+
+    when(testRegion.getServerProxy()).thenReturn(serverRegionProxy);
+    RegionAttributes<Object, Object> regionAttributes = 
mock(RegionAttributes.class);
+    when(testRegion.getAttributes()).thenReturn(regionAttributes);
+    when(regionAttributes.getDataPolicy()).thenReturn(DataPolicy.DEFAULT);
+
+    createRegisterInterestTracker(pool, testRegion);
+
+    manager = new QueueManagerImpl(pool, endpoints, source, factory, 2,
+        20, logger, ClientProxyMembershipID.getNewProxyMembership(ds));
+    manager.start(background);
+    manager.setSendClientReadyInTestOnly();
+    manager.clearQueueConnections();
+    factory.addConnection(0, 0, 4);
+    manager.recoverPrimary(excludedServers);
+
+    
assertThat(opList.get(0)).isInstanceOf(RegisterInterestListOp.RegisterInterestListOpImpl.class);
+    
assertThat(opList.get(1)).isInstanceOf(RegisterInterestListOp.RegisterInterestListOpImpl.class);
+    
assertThat(opList.get(2)).isInstanceOf(RegisterInterestListOp.RegisterInterestListOpImpl.class);
+    
assertThat(opList.get(3)).isInstanceOf(RegisterInterestListOp.RegisterInterestListOpImpl.class);
+    
assertThat(opList.get(4)).isInstanceOf(ReadyForEventsOp.ReadyForEventsOpImpl.class);
+  }
+
+  private void createRegisterInterestTracker(InternalPool localPool,
+      LocalRegion localRegion) {
+    final RegisterInterestTracker registerInterestTracker = 
localPool.getRITracker();
+
+    final ConcurrentHashMap<String, 
RegisterInterestTracker.RegionInterestEntry> keysConcurrentMap =
+        new ConcurrentHashMap<>();
+    when(registerInterestTracker.getRegionToInterestsMap(eq(InterestType.KEY), 
anyBoolean(),
+        anyBoolean())).thenReturn(
+            keysConcurrentMap);
+
+
+    for (InterestType interestType : InterestType.values()) {
+      final ConcurrentHashMap<String, 
RegisterInterestTracker.RegionInterestEntry> concurrentMap =
+          new ConcurrentHashMap<>();
+      when(registerInterestTracker.getRegionToInterestsMap(eq(interestType), 
anyBoolean(),
+          anyBoolean()))
+              .thenReturn(concurrentMap);
+      if (interestType.equals(InterestType.KEY)) {
+        RegisterInterestTracker.RegionInterestEntry registerInterestEntry =
+            new RegisterInterestTracker.RegionInterestEntry(localRegion);
+
+        registerInterestEntry.getInterests().put("bob", 
InterestResultPolicy.NONE);
+        concurrentMap.put("testRegion", registerInterestEntry);
+      }
+    }
+  }
+
   private static void assertPortEquals(int expected, Connection actual) {
     assertThat(actual.getServer().getPort()).isEqualTo(expected);
   }
@@ -312,7 +377,22 @@ public class QueueManagerJUnitTest {
     assertThat(actualPorts).isEqualTo(expectedPorts);
   }
 
-  private class DummyPool implements InternalPool {
+  private class RecoveryTestPool extends TestPool {
+    RegisterInterestTracker registerInterestTracker = 
mock(RegisterInterestTracker.class);
+
+    @Override
+    public Object executeOn(Connection con, Op op) {
+      opList.add(op);
+      return null;
+    }
+
+    @Override
+    public RegisterInterestTracker getRITracker() {
+      return registerInterestTracker;
+    }
+  }
+
+  private class TestPool implements InternalPool {
 
     @Override
     public String getPoolOrCacheCancelInProgress() {
@@ -604,19 +684,19 @@ public class QueueManagerJUnitTest {
   /**
    * A fake factory which returns a list of connections. Fake connections are 
created by calling
    * addConnection or add error. The factory maintains a queue of connections 
which will be handed
-   * out when the queue manager calls createClientToServerConnection. If a 
error was added, the
+   * out when the queue manager calls createClientToServerConnection. If an 
error was added, the
    * factory will return null instead.
    */
-  private class DummyFactory implements ConnectionFactory {
+  private class TestConnectionFactory implements ConnectionFactory {
 
-    private final LinkedList<DummyConnection> nextConnections = new 
LinkedList<>();
+    private final LinkedList<TestConnection> nextConnections = new 
LinkedList<>();
 
     private void addError() {
       nextConnections.add(null);
     }
 
-    private DummyConnection addConnection(int endpointType, int queueSize, int 
port) {
-      DummyConnection connection = new DummyConnection(endpointType, 
queueSize, port);
+    private TestConnection addConnection(int endpointType, int queueSize, int 
port) {
+      TestConnection connection = new TestConnection(endpointType, queueSize, 
port);
       nextConnections.add(connection);
       return connection;
     }
@@ -682,7 +762,7 @@ public class QueueManagerJUnitTest {
     }
   }
 
-  private class DummySource implements ConnectionSource {
+  private class TestConnectionSource implements ConnectionSource {
 
     private final AtomicInteger nextPort = new AtomicInteger();
 
@@ -700,9 +780,7 @@ public class QueueManagerJUnitTest {
     @Override
     public List<ServerLocation> findServersForQueue(Set excludedServers, int 
numServers,
         ClientProxyMembershipID proxyId, boolean findDurableQueue) {
-      numServers = numServers > factory.nextConnections.size()
-          ? factory.nextConnections.size()
-          : numServers;
+      numServers = Math.min(numServers, factory.nextConnections.size());
       List<ServerLocation> locations = new ArrayList<>(numServers);
       for (int i = 0; i < numServers; i++) {
         locations.add(findServer(null));
@@ -736,14 +814,14 @@ public class QueueManagerJUnitTest {
     }
   }
 
-  private class DummyConnection implements Connection {
+  private class TestConnection implements Connection {
 
     private final ServerQueueStatus status;
     private final ServerLocation location;
     private final Endpoint endpoint;
     private boolean keepAlive;
 
-    private DummyConnection(int endpointType, int queueSize, int port) {
+    private TestConnection(int endpointType, int queueSize, int port) {
       InternalDistributedMember member = new 
InternalDistributedMember("localhost", 555);
       status = new ServerQueueStatus((byte) endpointType, queueSize, member, 
0);
       location = new ServerLocation("localhost", port);
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java
 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java
index 0dd3f17a96..4474b6ba88 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java
@@ -109,6 +109,10 @@ public class QueueManagerImpl implements QueueManager {
   private ScheduledExecutorService recoveryThread;
   private volatile boolean sentClientReady;
 
+  void clearQueueConnections() {
+    this.queueConnections = new ConnectionList();
+  }
+
   // queueConnections in maintained by using copy-on-write
   private volatile ConnectionList queueConnections = new ConnectionList();
   private volatile RedundancySatisfierTask redundancySatisfierTask = null;
@@ -309,6 +313,16 @@ public class QueueManagerImpl implements QueueManager {
   }
 
 
+  /*
+   * This is for test only, it does bad stuff that one should only do in test,
+   * if ever.
+   */
+  @VisibleForTesting
+  public void setSendClientReadyInTestOnly() {
+    synchronized (lock) {
+      sentClientReady = true;
+    }
+  }
 
   @Override
   public void readyForEvents(InternalDistributedSystem system) {
@@ -540,6 +554,9 @@ public class QueueManagerImpl implements QueueManager {
           // couldn't find a server to make primary
           break;
         }
+
+        markQueueAsReadyForEvents(primaryQueue);
+
         if (!addToConnectionList(primaryQueue, true)) {
           excludedServers.add(primaryQueue.getServer());
           primaryQueue = null;
@@ -678,21 +695,10 @@ public class QueueManagerImpl implements QueueManager {
             if (recoverInterest && queueConnections.getPrimary() == null
                 && queueConnections.getBackups().isEmpty()) {
               // we lost our queue at some point. We Need to recover
-              // interest. This server will be made primary after this method
-              // finishes
-              // because whoever killed the primary when this method started
-              // should
+              // interest. This server will be made primary after this method 
finishes
+              // because whoever killed the primary when this method started 
should
               // have scheduled a task to recover the primary.
               isFirstNewConnection = true;
-              // TODO - Actually, we need a better check than the above. 
There's
-              // still a chance
-              // that we haven't realized that the primary has died but it is
-              // already gone. We should
-              // get some information from the queue server about whether it 
was
-              // able to copy the
-              // queue from another server and decide if we need to recover our
-              // interest based on
-              // that information.
             }
           }
           boolean promotionFailed = false;
@@ -727,7 +733,7 @@ public class QueueManagerImpl implements QueueManager {
   }
 
   @VisibleForTesting
-  QueueConnectionImpl promoteBackupToPrimary(List<Connection> backups) {
+  public QueueConnectionImpl promoteBackupToPrimary(List<Connection> backups) {
     QueueConnectionImpl primary = null;
     for (int i = 0; primary == null && i < backups.size(); i++) {
       QueueConnectionImpl lastConnection = (QueueConnectionImpl) 
backups.get(i);
@@ -805,10 +811,13 @@ public class QueueManagerImpl implements QueueManager {
       excludedServers.addAll(servers);
     }
 
-    if (primary != null && sentClientReady && primary.sendClientReady()) {
+    return primary;
+  }
+
+  public void markQueueAsReadyForEvents(@NotNull QueueConnectionImpl primary) {
+    if (sentClientReady && primary.sendClientReady()) {
       readyForEventsAfterFailover(primary);
     }
-    return primary;
   }
 
   private List<ServerLocation> findQueueServers(Set<ServerLocation> 
excludedServers, int count,
@@ -848,7 +857,7 @@ public class QueueManagerImpl implements QueueManager {
    * to find a new server.
    */
   @VisibleForTesting
-  void recoverPrimary(Set<ServerLocation> excludedServers) {
+  public void recoverPrimary(Set<ServerLocation> excludedServers) {
     if (pool.getPoolOrCacheCancelInProgress() != null) {
       return;
     }
@@ -888,7 +897,6 @@ public class QueueManagerImpl implements QueueManager {
         }
         newPrimary = null;
       }
-
     }
 
     if (newPrimary != null) {
@@ -915,6 +923,7 @@ public class QueueManagerImpl implements QueueManager {
         // could not find a new primary to create
         break;
       }
+
       if (!addToConnectionList(newPrimary, true)) {
         excludedServers.add(newPrimary.getServer());
         newPrimary = null;
@@ -931,6 +940,9 @@ public class QueueManagerImpl implements QueueManager {
           excludedServers.add(newPrimary.getServer());
           newPrimary = null;
         }
+
+        markQueueAsReadyForEvents(newPrimary);
+
         // New primary queue was found from a non backup, alert the affected 
cqs
         cqsConnected();
       }
@@ -986,7 +998,7 @@ public class QueueManagerImpl implements QueueManager {
   // so before putting connection need to see if something(crash) happen we 
should be able to
   // recover from it
   @VisibleForTesting
-  boolean addToConnectionList(QueueConnectionImpl connection, boolean 
isPrimary) {
+  public boolean addToConnectionList(QueueConnectionImpl connection, boolean 
isPrimary) {
     boolean isBadConnection;
     synchronized (lock) {
       ClientUpdater cu = connection.getUpdater();
@@ -1029,7 +1041,7 @@ public class QueueManagerImpl implements QueueManager {
   }
 
   @VisibleForTesting
-  void scheduleRedundancySatisfierIfNeeded(long delay) {
+  public void scheduleRedundancySatisfierIfNeeded(long delay) {
     if (shuttingDown) {
       return;
     }
@@ -1111,10 +1123,9 @@ public class QueueManagerImpl implements QueueManager {
   }
 
   private void recoverCqs(Connection recoveredConnection, boolean isDurable) {
-    Map cqs = getPool().getRITracker().getCqsMap();
-    for (Object o : cqs.entrySet()) {
-      Map.Entry e = (Map.Entry) o;
-      ClientCQ cqi = (ClientCQ) e.getKey();
+    Map<ClientCQ, Boolean> cqs = getPool().getRITracker().getCqsMap();
+    for (Map.Entry<ClientCQ, Boolean> e : cqs.entrySet()) {
+      ClientCQ cqi = e.getKey();
       String name = cqi.getName();
       if (pool.getMultiuserAuthentication()) {
         UserAttributes.userAttributes
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ReadyForEventsOp.java
 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ReadyForEventsOp.java
index d93a69ca99..2d8f1515c0 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ReadyForEventsOp.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ReadyForEventsOp.java
@@ -16,6 +16,7 @@ package org.apache.geode.cache.client.internal;
 
 import org.jetbrains.annotations.NotNull;
 
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.internal.cache.tier.MessageType;
 import org.apache.geode.internal.cache.tier.sockets.Message;
 
@@ -40,7 +41,8 @@ public class ReadyForEventsOp {
     // no instances allowed
   }
 
-  private static class ReadyForEventsOpImpl extends AbstractOp {
+  @VisibleForTesting
+  public static class ReadyForEventsOpImpl extends AbstractOp {
     /**
      * @throws org.apache.geode.SerializationException if serialization fails
      */
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/RegisterInterestListOp.java
 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/RegisterInterestListOp.java
index dc7fb3c001..33ee30a7ff 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/RegisterInterestListOp.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/RegisterInterestListOp.java
@@ -20,6 +20,7 @@ import java.util.List;
 
 import org.jetbrains.annotations.NotNull;
 
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.cache.DataPolicy;
 import org.apache.geode.cache.InterestResultPolicy;
 import 
org.apache.geode.cache.client.internal.RegisterInterestOp.RegisterInterestOpImpl;
@@ -105,7 +106,8 @@ public class RegisterInterestListOp {
     return uncheckedCast(pool.executeOn(conn, op));
   }
 
-  private static class RegisterInterestListOpImpl extends 
RegisterInterestOpImpl {
+  @VisibleForTesting
+  public static class RegisterInterestListOpImpl extends 
RegisterInterestOpImpl {
     /**
      * @throws org.apache.geode.SerializationException if serialization fails
      */
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/RegisterInterestTracker.java
 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/RegisterInterestTracker.java
index e0d1bb1bff..8a08a81ce6 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/RegisterInterestTracker.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/RegisterInterestTracker.java
@@ -22,14 +22,15 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Stream;
 
 import org.apache.logging.log4j.Logger;
 import org.jetbrains.annotations.NotNull;
 
 import org.apache.geode.InternalGemFireError;
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.cache.InterestResultPolicy;
-import org.apache.geode.cache.query.CqQuery;
-import org.apache.geode.cache.query.internal.cq.InternalCqQuery;
+import org.apache.geode.cache.query.internal.cq.ClientCQ;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.tier.InterestType;
 import org.apache.geode.internal.cache.tier.sockets.UnregisterAllInterest;
@@ -52,7 +53,7 @@ public class RegisterInterestTracker {
   private final FailoverInterestList[] failoverInterestLists = new 
FailoverInterestList[4];
 
   /** Manages CQs */
-  private final ConcurrentMap<CqQuery, Boolean> cqs = new 
ConcurrentHashMap<>();
+  private final ConcurrentMap<ClientCQ, Boolean> cqs = new 
ConcurrentHashMap<>();
 
   public RegisterInterestTracker() {
     failoverInterestLists[interestListIndex] = new FailoverInterestList();
@@ -106,7 +107,8 @@ public class RegisterInterestTracker {
     return result;
   }
 
-  void addSingleInterest(final @NotNull LocalRegion r, final @NotNull Object 
key,
+  @VisibleForTesting
+  public void addSingleInterest(final @NotNull LocalRegion r, final @NotNull 
Object key,
       final @NotNull InterestType interestType,
       final @NotNull InterestResultPolicy pol, final boolean isDurable,
       boolean receiveUpdatesAsInvalidates) {
@@ -146,15 +148,15 @@ public class RegisterInterestTracker {
     }
   }
 
-  public void addCq(InternalCqQuery cqi, boolean isDurable) {
+  public void addCq(ClientCQ cqi, boolean isDurable) {
     cqs.put(cqi, isDurable);
   }
 
-  public void removeCq(InternalCqQuery cqi) {
+  public void removeCq(ClientCQ cqi) {
     cqs.remove(cqi);
   }
 
-  Map<CqQuery, Boolean> getCqsMap() {
+  Map<ClientCQ, Boolean> getCqsMap() {
     return cqs;
   }
 
@@ -302,6 +304,42 @@ public class RegisterInterestTracker {
     return mapOfInterest.get(regionName);
   }
 
+  /**
+   * Iterate InterestTypes searching for any with the input 
interestResultPolicy
+   *
+   * @return boolean based on the presence of the selected InterestResultPolicy
+   */
+  public boolean hasInterestsWithResultPolicy(final @NotNull String 
regionName, boolean isDurable,
+      final @NotNull InterestResultPolicy interestResultPolicy) {
+    return Stream.of(InterestType.values())
+        .anyMatch(interestType -> hasInterestsWithResultPolicy(regionName, 
isDurable,
+            interestResultPolicy, interestType));
+  }
+
+  /**
+   * Check the RegionInterestEntries with receiveUpdatesAsInvalidates both 
true and false
+   *
+   * @return boolean based on the presence of the selected InterestResultPolicy
+   */
+  public boolean hasInterestsWithResultPolicy(final @NotNull String 
regionName, boolean isDurable,
+      final @NotNull InterestResultPolicy interestResultPolicy,
+      final @NotNull InterestType interestType) {
+
+    return Stream.of(true, false)
+        .anyMatch(receiveUpdatesAsInvalidates -> 
hasInterestsWithResultPolicy(regionName, isDurable,
+            interestResultPolicy, interestType, receiveUpdatesAsInvalidates));
+  }
+
+  private boolean hasInterestsWithResultPolicy(final @NotNull String 
regionName, boolean isDurable,
+      final @NotNull InterestResultPolicy interestResultPolicy,
+      final @NotNull InterestType interestType, boolean 
receiveUpdatesAsInvalidates) {
+    RegionInterestEntry regionInterestEntry =
+        readRegionInterests(regionName, interestType, isDurable, 
receiveUpdatesAsInvalidates);
+    return regionInterestEntry != null && 
regionInterestEntry.getInterests().values().stream()
+        .anyMatch(actualInterestResultPolicy -> actualInterestResultPolicy
+            .getOrdinal() == interestResultPolicy.getOrdinal());
+  }
+
   /**
    * A Holder object for client's register interest, this is required when a 
client fails over to
    * another server and does register interest based on this Data structure
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ServerRegionProxy.java
 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ServerRegionProxy.java
index ba4119903b..5b57d77649 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ServerRegionProxy.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ServerRegionProxy.java
@@ -83,7 +83,7 @@ public class ServerRegionProxy extends ServerProxy implements 
ServerRegionDataAc
    * Used by tests to create proxies for "fake" regions. Also, used by 
ClientStatsManager for admin
    * region.
    */
-  public ServerRegionProxy(String regionName, PoolImpl pool) {
+  public ServerRegionProxy(String regionName, InternalPool pool) {
     super(pool);
     region = null;
     this.regionName = regionName;
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 8e7cf5e581..dbc6006088 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -5335,7 +5335,7 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
     // If the marker has been processed, process this put event normally;
     // otherwise, this event occurred in the past and has been stored for a
     // durable client. In this case, just invoke the put callbacks.
-    if (processedMarker) {
+    if (processedMarker || hasRegisterInterestsWithResultPolicy(true, 
InterestResultPolicy.NONE)) {
       boolean ifNew = false; // can overwrite an existing key
       boolean ifOld = false; // can create a new key
       long lastModified = 0L; // use now
@@ -5352,6 +5352,12 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
     }
   }
 
+  private boolean hasRegisterInterestsWithResultPolicy(boolean isDurable,
+      InterestResultPolicy interestResultPolicy) {
+    return 
getServerProxy().getPool().getRITracker().hasInterestsWithResultPolicy(fullPath,
+        isDurable, interestResultPolicy);
+  }
+
   /**
    * Perform an invalidate in a bridge client. The op is from the cache server 
and should not be
    * distributed back to it.
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/LocalRegionUpdateUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/LocalRegionUpdateUnitTest.java
new file mode 100644
index 0000000000..16d5eb34e6
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/LocalRegionUpdateUnitTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static 
org.apache.geode.internal.statistics.StatisticsClockFactory.disabledClock;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.function.Function;
+
+import org.junit.Rule;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+import org.mockito.quality.Strictness;
+
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.InterestPolicy;
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.SubscriptionAttributes;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import org.apache.geode.cache.client.internal.RegisterInterestTracker;
+import org.apache.geode.cache.client.internal.ServerRegionProxy;
+import org.apache.geode.distributed.internal.DSClock;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.cache.tier.InterestType;
+
+public class LocalRegionUpdateUnitTest {
+  private InternalDataView internalDataView;
+  private LocalRegion region;
+  private RegisterInterestTracker registerInterestTracker;
+
+  @Rule
+  public MockitoRule mockitoRule = 
MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
+
+  @BeforeEach
+  public void setUp() {
+    internalDataView = mock(InternalDataView.class);
+
+    EntryEventFactory entryEventFactory = mock(EntryEventFactory.class);
+    InternalCache cache = mock(InternalCache.class);
+    InternalDistributedSystem internalDistributedSystem = 
mock(InternalDistributedSystem.class);
+    InternalRegionArguments internalRegionArguments = 
mock(InternalRegionArguments.class);
+
+    LocalRegion.RegionMapConstructor regionMapConstructor =
+        mock(LocalRegion.RegionMapConstructor.class);
+    Function<LocalRegion, RegionPerfStats> regionPerfStatsFactory = 
localRegion -> {
+      localRegion.getLocalSize();
+      return mock(RegionPerfStats.class);
+    };
+
+    final PoolImpl poolImpl = mock(PoolImpl.class);
+    SubscriptionAttributes subscriptionAttributes =
+        new SubscriptionAttributes(InterestPolicy.ALL);
+
+
+    
when(cache.getInternalDistributedSystem()).thenReturn(internalDistributedSystem);
+    when(internalDistributedSystem.getClock()).thenReturn(mock(DSClock.class));
+
+    when(regionMapConstructor.create(any(), any(), 
any())).thenReturn(mock(RegionMap.class));
+
+    InternalRegionFactory<Object, Object> regionFactory = new 
InternalRegionFactory<>(cache);
+    regionFactory.setDataPolicy(DataPolicy.NORMAL);
+    regionFactory.setPoolName("Pool1");
+    regionFactory.setConcurrencyChecksEnabled(false);
+    regionFactory.setSubscriptionAttributes(subscriptionAttributes);
+    RegionAttributes<Object, Object> regionAttributes =
+        regionFactory.getCreateAttributes();
+
+    registerInterestTracker = new RegisterInterestTracker();
+    when(poolImpl.getRITracker()).thenReturn(registerInterestTracker);
+
+    ServerRegionProxy serverRegionProxy = mock(ServerRegionProxy.class);
+    when(serverRegionProxy.getPool()).thenReturn(poolImpl);
+    LocalRegion.ServerRegionProxyConstructor proxyConstructor = region -> 
serverRegionProxy;
+
+    AbstractRegion.PoolFinder poolFinder = poolName -> poolImpl;
+
+    region = spy(new LocalRegion("region", regionAttributes, null, cache,
+        internalRegionArguments, internalDataView, regionMapConstructor, 
proxyConstructor,
+        entryEventFactory, poolFinder, regionPerfStatsFactory, 
disabledClock()));
+
+  }
+
+  /*
+   * As indicated by the name this code tests the basicBridgeClientUpdate 
method's
+   * fork where it checks to see if the interestResultPolicy is NONE, then it 
will
+   * call basicUpdate.
+   */
+  @Test
+  public void 
basicBridgeClientUpdateChecksInterestResultPolicyNoneThenUpdates() {
+    Object key = new Object();
+    Object value = new Object();
+
+    registerInterestTracker.addSingleInterest(region, key, InterestType.KEY,
+        InterestResultPolicy.NONE, true, false);
+
+    region.basicBridgeClientUpdate(null, key, value, new byte[] {'0'}, true,
+        null, true, false, new EntryEventImpl(),
+        new EventID(new byte[] {1}, 1, 1));
+
+    verify(internalDataView).putEntry(any(), anyBoolean(),
+        anyBoolean(), any(), anyBoolean(), anyLong(), anyBoolean(), 
anyBoolean(), anyBoolean());
+
+  }
+
+  /*
+   * As indicated by the name this code tests the basicBridgeClientUpdate 
method's
+   * fork where it checks to see if the interestResultPolicy is KEY, then it 
will
+   * not call basicUpdate.
+   */
+  @Test
+  public void 
basicBridgeClientUpdateChecksInterestResultPolicyKeyThenDoesNotUpdate() {
+    Object key = new Object();
+    Object value = new Object();
+
+    registerInterestTracker.addSingleInterest(region, key, InterestType.KEY,
+        InterestResultPolicy.KEYS, true, false);
+
+    region.basicBridgeClientUpdate(null, key, value, new byte[] {'0'}, true,
+        null, true, false, new EntryEventImpl(),
+        new EventID(new byte[] {1}, 1, 1));
+
+    verify(internalDataView, times(0)).putEntry(any(), anyBoolean(),
+        anyBoolean(), any(), anyBoolean(), anyLong(), anyBoolean(), 
anyBoolean(), anyBoolean());
+
+  }
+
+}
diff --git 
a/geode-dunit/src/main/java/org/apache/geode/test/dunit/internal/JUnit4DistributedTestCase.java
 
b/geode-dunit/src/main/java/org/apache/geode/test/dunit/internal/JUnit4DistributedTestCase.java
index c8e23e36bd..2bfcbf1ede 100644
--- 
a/geode-dunit/src/main/java/org/apache/geode/test/dunit/internal/JUnit4DistributedTestCase.java
+++ 
b/geode-dunit/src/main/java/org/apache/geode/test/dunit/internal/JUnit4DistributedTestCase.java
@@ -62,7 +62,7 @@ import org.apache.geode.test.version.VersionManager;
  *             instead.
  */
 public abstract class JUnit4DistributedTestCase implements 
DistributedTestFixture, Serializable {
-  private static final Logger logger = LogService.getLogger();
+  protected static final Logger logger = LogService.getLogger();
 
   /** This VM's connection to the distributed system */
   protected static InternalDistributedSystem system;

Reply via email to