This is an automated email from the ASF dual-hosted git repository.
mhanson pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 30bd1cef01 GEODE-9704: Ensure that register interest is called before
ready for events (#7442)
30bd1cef01 is described below
commit 30bd1cef01b555c84e970c548cfb0e55f06fbf1c
Author: mhansonp <[email protected]>
AuthorDate: Wed Apr 6 09:27:03 2022 -0700
GEODE-9704: Ensure that register interest is called before ready for events
(#7442)
- RegisterInterestOps need to happen before ReadyForEventsOp is sent
These changes make sure that happens.
- Added an InterestResultPolicyCheck
Authored-by: Barry Oglesby <[email protected]>
Un-ignored a test that will reproduce the issue
periodically during a number of runs. It is a flaky
test without the core fix.
Authored-by: Jinmei Liao <[email protected]>
---
.../tier/sockets/DurableRegistrationDUnitTest.java | 802 ---------------------
.../DurableRegistrationDistributedTest.java | 703 ++++++++++++++++++
...est.java => AuthExpirationDistributedTest.java} | 18 +-
...tTest.java => QueueManagerIntegrationTest.java} | 178 +++--
.../cache/client/internal/QueueManagerImpl.java | 59 +-
.../cache/client/internal/ReadyForEventsOp.java | 4 +-
.../client/internal/RegisterInterestListOp.java | 4 +-
.../client/internal/RegisterInterestTracker.java | 52 +-
.../cache/client/internal/ServerRegionProxy.java | 2 +-
.../apache/geode/internal/cache/LocalRegion.java | 8 +-
.../internal/cache/LocalRegionUpdateUnitTest.java | 149 ++++
.../dunit/internal/JUnit4DistributedTestCase.java | 2 +-
12 files changed, 1084 insertions(+), 897 deletions(-)
diff --git
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableRegistrationDUnitTest.java
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableRegistrationDUnitTest.java
deleted file mode 100644
index 51e69cff23..0000000000
---
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableRegistrationDUnitTest.java
+++ /dev/null
@@ -1,802 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
- * agreements. See the NOTICE file distributed with this work for additional
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
- * or implied. See the License for the specific language governing permissions
and limitations under
- * the License.
- */
-package org.apache.geode.internal.cache.tier.sockets;
-
-import static
org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID;
-import static
org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_TIMEOUT;
-import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
-import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
-import static
org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.Properties;
-import java.util.Set;
-
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.CacheException;
-import org.apache.geode.cache.InterestResultPolicy;
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.client.Pool;
-import org.apache.geode.cache.client.PoolFactory;
-import org.apache.geode.cache.client.PoolManager;
-import org.apache.geode.cache30.CacheSerializableRunnable;
-import org.apache.geode.distributed.internal.DistributionConfig;
-import org.apache.geode.internal.cache.CacheServerImpl;
-import org.apache.geode.internal.cache.FilterProfile;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
-import org.apache.geode.internal.cache.LocalRegion;
-import org.apache.geode.internal.cache.PoolFactoryImpl;
-import org.apache.geode.test.awaitility.GeodeAwaitility;
-import org.apache.geode.test.dunit.Assert;
-import org.apache.geode.test.dunit.Host;
-import org.apache.geode.test.dunit.LogWriterUtils;
-import org.apache.geode.test.dunit.NetworkUtils;
-import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.WaitCriterion;
-import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
-import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
-
-/**
- * We have 2 servers and One client which registers some keys with durable
interest and some without
- * it. We maintain queues on only One server as redundancy level is one.
Following 2 tests have the
- * two TestCase scenarios
- *
- * There are two Tests First Test does the follows : // Step 1: Starting the
servers // Step 2:
- * Bring Up the Client // Step 3: Client registers Interests // Step 4: Update
Values on the Server
- * for Keys // Step 5: Verify Updates on the Client // Step 6: Close Cache of
the DurableClient //
- * Step 7: Update the Values // Step 8: Re-start the Client // Step 9: Verify
Updates on the Client
- * // Step 10 : Stop all VMs
- *
- * For Test 2 the steps are as follows // Step 1: Starting the servers // Step
2: Bring Up the
- * Client // Step 3: Client registers Interests // Step 4: Update Values on
the Server for Keys //
- * Step 5: Verify Updates on the Client // Step 6: Close Cache of the
DurableClient // Step 7:
- * Update the Values // Step 8: Re-start the Client // Step 9: Send Client
Ready Message // Step 10:
- * Register all Keys (K1, K2 as Non-Durable. K3, K4 as Durable) // Step 11:
Unregister Some Keys
- * (Here K1, K3) // Step 12: Modify values on the server for all the Keys //
Step 13: Check the
- * values for the ones not unregistered and the Unregistered Keys' Values
should be null
- */
-@Category({ClientSubscriptionTest.class})
-public class DurableRegistrationDUnitTest extends JUnit4DistributedTestCase {
-
- private VM server1VM;
-
- private VM server2VM;
-
- private VM durableClientVM;
-
- private String regionName;
-
- private int PORT1;
-
- private int PORT2;
-
- private static final String K1 = "KEY_STONE1";
-
- private static final String K2 = "KEY_STONE2";
-
- private static final String K3 = "KEY_STONE3";
-
- private static final String K4 = "KEY_STONE4";
-
- public DurableRegistrationDUnitTest() {
- super();
- }
-
- @Override
- public final void postSetUp() throws Exception {
- Host host = Host.getHost(0);
- server1VM = host.getVM(0);
- server2VM = host.getVM(1);
- durableClientVM = host.getVM(2);
- regionName = DurableRegistrationDUnitTest.class.getName() + "_region";
- CacheServerTestUtil.disableShufflingOfEndpoints();
- }
-
- @Test
- public void testSimpleDurableClient() {
-
- // Step 1: Starting the servers
- PORT1 = server1VM
- .invoke(() -> CacheServerTestUtil.createCacheServer(regionName,
Boolean.TRUE));
- PORT2 = server2VM
- .invoke(() -> CacheServerTestUtil.createCacheServer(regionName,
Boolean.TRUE));
-
- // Step 2: Bring Up the Client
- // Start a durable client that is not kept alive on the server when it
- // stops normally
- final String durableClientId = getName() + "_client";
-
- final int durableClientTimeout = 600; // keep the client alive for 600
- // seconds
- durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
-
getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), PORT1,
PORT2, true,
- 0),
- regionName, getClientDistributedSystemProperties(durableClientId,
durableClientTimeout),
- Boolean.TRUE));
-
- // Send clientReady message
- durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
- @Override
- public void run2() throws CacheException {
- CacheServerTestUtil.getCache().readyForEvents();
- }
- });
-
- // Step 3: Client registers Interests
- // KEY_STONE1, KEY_STONE2 are registered as durableKeys & KEY_STONE3,
- // KEY_STONE4 as non-durableKeys
-
- durableClientVM
- .invoke(() -> DurableRegistrationDUnitTest.registerKey(K1,
Boolean.FALSE));
- durableClientVM
- .invoke(() -> DurableRegistrationDUnitTest.registerKey(K2,
Boolean.FALSE));
- durableClientVM
- .invoke(() -> DurableRegistrationDUnitTest.registerKey(K3,
Boolean.TRUE));
- durableClientVM
- .invoke(() -> DurableRegistrationDUnitTest.registerKey(K4,
Boolean.TRUE));
-
- // Step 4: Update Values on the Server for KEY_STONE1, KEY_STONE2,
- // KEY_STONE3, KEY_STONE4
-
- server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K1,
"Value1"));
- server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K2,
"Value2"));
- server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K3,
"Value3"));
- server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K4,
"Value4"));
-
- Wait.pause(1000);
- // Step 5: Verify Updates on the Client
-
- assertEquals("Value1", server2VM.invoke(() ->
DurableRegistrationDUnitTest.getValue(K1)));
- assertEquals("Value1", server1VM.invoke(() ->
DurableRegistrationDUnitTest.getValue(K1)));
-
- assertEquals("Value1",
- durableClientVM.invoke(() ->
DurableRegistrationDUnitTest.getValue(K1)));
- assertEquals("Value2",
- durableClientVM.invoke(() ->
DurableRegistrationDUnitTest.getValue(K2)));
- assertEquals("Value3",
- durableClientVM.invoke(() ->
DurableRegistrationDUnitTest.getValue(K3)));
- assertEquals("Value4",
- durableClientVM.invoke(() ->
DurableRegistrationDUnitTest.getValue(K4)));
-
- // Step 6: Close Cache of the DurableClient
- durableClientVM.invoke(DurableRegistrationDUnitTest::closeCache);
- // pause(5000);
- // Step 7: Update KEY_STONE1,KEY_STONE2,KEY_STONE3,KEY_STONE4 on the
- // Server say with values PingPong1, PingPong2, PingPong3, PingPong4
- server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K1,
"PingPong1"));
- server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K2,
"PingPong2"));
- server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K3,
"PingPong3"));
- server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K4,
"PingPong4"));
-
- // Step 8: Re-start the Client
- durableClientVM
- .invoke(() -> CacheServerTestUtil.createCacheClient(
-
getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), PORT1,
PORT2,
- true, 0),
- regionName, getClientDistributedSystemProperties(durableClientId),
Boolean.TRUE));
-
- // Send clientReady message
- durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
- @Override
- public void run2() throws CacheException {
- CacheServerTestUtil.getCache().readyForEvents();
- }
- });
-
- Wait.pause(5000);
-
- assertNull(durableClientVM.invoke(() ->
DurableRegistrationDUnitTest.getValue(K1)));
- assertNull(durableClientVM.invoke(() ->
DurableRegistrationDUnitTest.getValue(K2)));
-
- durableClientVM
- .invoke(() -> DurableRegistrationDUnitTest.registerKey(K1,
Boolean.FALSE));
-
- Wait.pause(5000);
- assertNull(durableClientVM.invoke(() ->
DurableRegistrationDUnitTest.getValue(K1)));
- assertNull(durableClientVM.invoke(() ->
DurableRegistrationDUnitTest.getValue(K2)));
-
- server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K1,
"PingPong_updated_1"));
- server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K2,
"PingPong_updated_2"));
- server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K3,
"PingPong_updated_3"));
- server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K4,
"PingPong_updated_4"));
-
- Wait.pause(5000);
-
- // Step 9: Verify Updates on the Client
- assertEquals("PingPong_updated_1",
- durableClientVM.invoke(() ->
DurableRegistrationDUnitTest.getValue(K1)));
- assertNull(durableClientVM.invoke(() ->
DurableRegistrationDUnitTest.getValue(K2)));
- assertEquals("PingPong_updated_3",
- durableClientVM.invoke(() ->
DurableRegistrationDUnitTest.getValue(K3)));
- assertEquals("PingPong_updated_4",
- durableClientVM.invoke(() ->
DurableRegistrationDUnitTest.getValue(K4)));
-
- // Step 10 : Stop all VMs
- // Stop the durable client
- durableClientVM.invoke(() -> CacheServerTestUtil.closeCache());
-
- // Stop server 2
- server2VM.invoke(() -> CacheServerTestUtil.closeCache());
-
- // Stop server 1
- server1VM.invoke(() -> CacheServerTestUtil.closeCache());
-
- }
-
- @Test
- public void testSimpleDurableClientWithRegistration() {
- // Step 1: Starting the servers
- PORT1 = server1VM
- .invoke(() -> CacheServerTestUtil.createCacheServer(regionName,
Boolean.TRUE));
- PORT2 = server2VM
- .invoke(() -> CacheServerTestUtil.createCacheServer(regionName,
Boolean.TRUE));
-
- // Step 2: Bring Up the Client
- // Start a durable client that is not kept alive on the server when it
- // stops normally
- final String durableClientId = getName() + "_client";
- // keep the client alive for 600 seconds
- final int durableClientTimeout = 600;
- durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
-
getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), PORT1,
PORT2, true,
- 0),
- regionName, getClientDistributedSystemProperties(durableClientId,
durableClientTimeout)));
-
- // Send clientReady message
- durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
- @Override
- public void run2() throws CacheException {
- CacheServerTestUtil.getCache().readyForEvents();
- }
- });
-
- // Step 3: Client registers Interests
- // KEY_STONE1, KEY_STONE2 are registered as durableKeys & KEY_STONE3,
- // KEY_STONE4 as non-durableKeys
-
- durableClientVM
- .invoke(() -> DurableRegistrationDUnitTest.registerKey(K1,
Boolean.FALSE));
- durableClientVM
- .invoke(() -> DurableRegistrationDUnitTest.registerKey(K2,
Boolean.FALSE));
- durableClientVM
- .invoke(() -> DurableRegistrationDUnitTest.registerKey(K3,
Boolean.TRUE));
- durableClientVM
- .invoke(() -> DurableRegistrationDUnitTest.registerKey(K4,
Boolean.TRUE));
-
- // Step 4: Update Values on the Server for KEY_STONE1, KEY_STONE2,
- // KEY_STONE3, KEY_STONE4
-
- server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K1,
"Value1"));
- server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K2,
"Value2"));
- server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K3,
"Value3"));
- server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K4,
"Value4"));
-
- Wait.pause(1000);
- // Step 5: Verify Updates on the Client
-
- assertEquals("Value1", server2VM.invoke(() ->
DurableRegistrationDUnitTest.getValue(K1)));
- assertEquals("Value1", server1VM.invoke(() ->
DurableRegistrationDUnitTest.getValue(K1)));
-
- assertEquals("Value1",
- durableClientVM.invoke(() ->
DurableRegistrationDUnitTest.getValue(K1)));
- assertEquals("Value2",
- durableClientVM.invoke(() ->
DurableRegistrationDUnitTest.getValue(K2)));
- assertEquals("Value3",
- durableClientVM.invoke(() ->
DurableRegistrationDUnitTest.getValue(K3)));
- assertEquals("Value4",
- durableClientVM.invoke(() ->
DurableRegistrationDUnitTest.getValue(K4)));
-
- // Step 6: Close Cache of the DurableClient
- durableClientVM.invoke(DurableRegistrationDUnitTest::closeCache);
- // pause(5000);
- // Step 7: Update KEY_STONE1,KEY_STONE2,KEY_STONE3,KEY_STONE4 on the
- // Server say with values PingPong1, PingPong2, PingPong3, PingPong4
- server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K1,
"PingPong1"));
- server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K2,
"PingPong2"));
- server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K3,
"PingPong3"));
- server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K4,
"PingPong4"));
-
- // Step 8: Re-start the Client
- durableClientVM
- .invoke(() -> CacheServerTestUtil.createCacheClient(
-
getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), PORT1,
PORT2,
- true, 0),
- regionName, getClientDistributedSystemProperties(durableClientId),
Boolean.TRUE));
-
- // Step 9: Send clientReady message
- durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
- @Override
- public void run2() throws CacheException {
- CacheServerTestUtil.getCache().readyForEvents();
- }
- });
-
- // pause(1000);
-
- // Step 10: Register all Keys
- durableClientVM
- .invoke(() -> DurableRegistrationDUnitTest.registerKey(K3,
Boolean.TRUE));
- durableClientVM
- .invoke(() -> DurableRegistrationDUnitTest.registerKey(K4,
Boolean.TRUE));
-
- durableClientVM
- .invoke(() -> DurableRegistrationDUnitTest.registerKey(K1,
Boolean.FALSE));
- durableClientVM
- .invoke(() -> DurableRegistrationDUnitTest.registerKey(K2,
Boolean.FALSE));
-
- // Step 11: Unregister Some Keys (Here K1, K3)
- durableClientVM.invoke(() ->
DurableRegistrationDUnitTest.unregisterKey(K1));
- durableClientVM.invoke(() ->
DurableRegistrationDUnitTest.unregisterKey(K3));
-
- Wait.pause(5000);
-
- // Step 12: Modify values on the server for all the Keys
- server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K1,
"PingPong_updated_1"));
- server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K2,
"PingPong_updated_2"));
- server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K3,
"PingPong_updated_3"));
- server2VM.invoke(() -> DurableRegistrationDUnitTest.putValue(K4,
"PingPong_updated_4"));
-
- Wait.pause(5000);
-
- // Step 13: Check the values for the ones not unregistered and the
- // Unregistered Keys' Values should be null
- try {
- assertEquals("PingPong_updated_2",
- durableClientVM.invoke(() ->
DurableRegistrationDUnitTest.getValue(K2)));
- } catch (Exception e) {
- fail("Prob in KEY_STONE2: "
- + durableClientVM.invoke(() ->
DurableRegistrationDUnitTest.getValue(K2)));
- }
-
- try {
- assertEquals("PingPong_updated_4",
- durableClientVM.invoke(() ->
DurableRegistrationDUnitTest.getValue(K4)));
- } catch (Exception e) {
- fail("Prob in KEY_STONE4: "
- + durableClientVM.invoke(() ->
DurableRegistrationDUnitTest.getValue(K4)));
-
- }
-
- try {
- assertNull(durableClientVM.invoke(() ->
DurableRegistrationDUnitTest.getValue(K1)));
- } catch (Exception e) {
- fail("Prob in KEY_STONE1: "
- + durableClientVM.invoke(() ->
DurableRegistrationDUnitTest.getValue(K1)));
-
- }
-
- try {
- assertNull(durableClientVM.invoke(() ->
DurableRegistrationDUnitTest.getValue(K3)));
- } catch (Exception e) {
- fail("Prob in KEY_STONE3: "
- + durableClientVM.invoke(() ->
DurableRegistrationDUnitTest.getValue(K3)));
-
- }
-
- // Stop the durable client
- durableClientVM.invoke(() -> CacheServerTestUtil.closeCache());
-
- // Stop server 2
- server2VM.invoke(() -> CacheServerTestUtil.closeCache());
-
- // Stop server 1
- server1VM.invoke(() -> CacheServerTestUtil.closeCache());
-
- }
-
- @Test
- public void testDurableClientWithRegistrationHA() {
-
- // Step 1: Start server1
- PORT1 = server1VM
- .invoke(() -> CacheServerTestUtil.createCacheServer(regionName,
Boolean.TRUE));
- PORT2 = getRandomAvailableTCPPort();
-
- // Step 2: Bring Up the Client
- final String durableClientId = getName() + "_client";
- // keep the client alive for 600 seconds
- final int durableClientTimeout = 600;
- durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
-
getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), PORT1,
PORT2, true,
- 1),
- regionName, getClientDistributedSystemProperties(durableClientId,
durableClientTimeout)));
-
- // Send clientReady message
- durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
- @Override
- public void run2() throws CacheException {
- CacheServerTestUtil.getCache().readyForEvents();
- }
- });
-
- // Step 3: Client registers Interests
- durableClientVM
- .invoke(() -> DurableRegistrationDUnitTest.registerKey(K1,
Boolean.FALSE));
- durableClientVM
- .invoke(() -> DurableRegistrationDUnitTest.registerKey(K2,
Boolean.FALSE));
- durableClientVM
- .invoke(() -> DurableRegistrationDUnitTest.registerKey(K3,
Boolean.TRUE));
- durableClientVM
- .invoke(() -> DurableRegistrationDUnitTest.registerKey(K4,
Boolean.TRUE));
-
- // Step 4: Bring up the server2
-
- server2VM
- .invoke(() -> CacheServerTestUtil.createCacheServer(regionName,
Boolean.TRUE, PORT2));
-
- Wait.pause(3000);
-
- // Check server2 got all the interests registered by the durable client.
- server2VM.invoke(new CacheSerializableRunnable("Verify Interests.") {
- @Override
- public void run2() throws CacheException {
- LogWriterUtils.getLogWriter()
- .info("### Verifying interests registered by DurableClient. ###");
- CacheClientNotifier ccn = CacheClientNotifier.getInstance();
- CacheClientProxy p = null;
-
- // Get proxy for the client.
- for (int i = 0; i < 60; i++) {
- Iterator ps = ccn.getClientProxies().iterator();
- if (!ps.hasNext()) {
- Wait.pause(1000);
- continue;
- } else {
- p = (CacheClientProxy) ps.next();
- break;
- }
- }
-
- if (p == null) {
- fail("Proxy initialization taking long time. Increase the wait
time.");
- }
-
- Iterator rs = p.getInterestRegisteredRegions().iterator();
- String rName = (String) rs.next();
- assertNotNull("Null region Name found.", rs);
- LocalRegion r = (LocalRegion)
GemFireCacheImpl.getInstance().getRegion(rName);
- assertNotNull("Null region found.", r);
- FilterProfile pf = r.getFilterProfile();
- Set intrests = Collections.EMPTY_SET;
-
- Set interestKeys = pf.getKeysOfInterest(p.getProxyID().getDurableId());
- assertNotNull("durable Interests not found for the proxy",
interestKeys);
- assertEquals("The number of durable keys registered during HARegion
GII doesn't match.",
- interestKeys.size(), 2);
- interestKeys = pf.getKeysOfInterest(p.getProxyID());
- assertNotNull("non-durable Interests not found for the proxy",
interestKeys);
- assertEquals("The number of non-durable keys registered during
HARegion GII doesn't match.",
- interestKeys.size(), 2);
- }
- });
-
-
- // Stop the durable client
- durableClientVM.invoke(() -> CacheServerTestUtil.closeCache());
-
- // Stop server 2
- server2VM.invoke(() -> CacheServerTestUtil.closeCache());
-
- // Stop server 1
- server1VM.invoke(() -> CacheServerTestUtil.closeCache());
-
- }
-
- @Test
- public void testDurableClientDisConnectWithRegistrationHA() {
-
- // Step 1: Start server1
- PORT1 = server1VM
- .invoke(() -> CacheServerTestUtil.createCacheServer(regionName,
Boolean.TRUE));
- PORT2 = getRandomAvailableTCPPort();
-
- // Step 2: Bring Up the Client
- final String durableClientId = getName() + "_client";
- // keep the client alive for 600 seconds
- final int durableClientTimeout = 600;
- durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
-
getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), PORT1,
PORT2, true,
- 1),
- regionName, getClientDistributedSystemProperties(durableClientId,
durableClientTimeout)));
-
- // Send clientReady message
- durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
- @Override
- public void run2() throws CacheException {
- CacheServerTestUtil.getCache().readyForEvents();
- }
- });
-
- // Step 3: Client registers Interests
- durableClientVM
- .invoke(() -> DurableRegistrationDUnitTest.registerKey(K1,
Boolean.FALSE));
- durableClientVM
- .invoke(() -> DurableRegistrationDUnitTest.registerKey(K2,
Boolean.FALSE));
- durableClientVM
- .invoke(() -> DurableRegistrationDUnitTest.registerKey(K3,
Boolean.TRUE));
- durableClientVM
- .invoke(() -> DurableRegistrationDUnitTest.registerKey(K4,
Boolean.TRUE));
-
- // Close Cache of the DurableClient
- durableClientVM.invoke(DurableRegistrationDUnitTest::closeCache);
-
- Wait.pause(2000);
-
- // Re-start the Client
- durableClientVM
- .invoke(() -> CacheServerTestUtil.createCacheClient(
-
getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), PORT1,
PORT2,
- true, 1),
- regionName, getClientDistributedSystemProperties(durableClientId),
Boolean.TRUE));
-
- // Send clientReady message
- durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
- @Override
- public void run2() throws CacheException {
- CacheServerTestUtil.getCache().readyForEvents();
- }
- });
-
- // Step 4: Bring up the server2
- server2VM
- .invoke(() -> CacheServerTestUtil.createCacheServer(regionName,
Boolean.TRUE, PORT2));
-
- Wait.pause(3000);
-
- // Check server2 got all the interests registered by the durable client.
- server2VM.invoke(new CacheSerializableRunnable("Verify Interests.") {
- @Override
- public void run2() throws CacheException {
- LogWriterUtils.getLogWriter()
- .info("### Verifying interests registered by DurableClient. ###");
- CacheClientNotifier ccn = CacheClientNotifier.getInstance();
- CacheClientProxy p = null;
-
- // Get proxy for the client.
- for (int i = 0; i < 60; i++) {
- Iterator ps = ccn.getClientProxies().iterator();
- if (!ps.hasNext()) {
- Wait.pause(1000);
- continue;
- } else {
- p = (CacheClientProxy) ps.next();
- break;
- }
- }
-
- if (p == null) {
- fail("Proxy initialization taking long time. Increase the wait
time.");
- }
-
- Iterator rs = p.getInterestRegisteredRegions().iterator();
- String rName = (String) rs.next();
- assertNotNull("Null region Name found.", rs);
- LocalRegion r = (LocalRegion)
GemFireCacheImpl.getInstance().getRegion(rName);
- assertNotNull("Null region found.", r);
- FilterProfile pf = r.getFilterProfile();
- Set intrests = Collections.EMPTY_SET;
-
- Set interestKeys = pf.getKeysOfInterest(p.getProxyID().getDurableId());
- assertNotNull("durable Interests not found for the proxy",
interestKeys);
- assertEquals("The number of durable keys registered during HARegion
GII doesn't match.",
- interestKeys.size(), 2);
- interestKeys = pf.getKeysOfInterest(p.getProxyID());
- assertNull("non-durable Interests found for the proxy", interestKeys);
- }
- });
-
-
- // Stop the durable client
- durableClientVM.invoke(() -> CacheServerTestUtil.closeCache());
-
- // Stop server 2
- server2VM.invoke(() -> CacheServerTestUtil.closeCache());
-
- // Stop server 1
- server1VM.invoke(() -> CacheServerTestUtil.closeCache());
-
- }
-
- private static void unregisterAllKeys() {
- // Get the region
- Region region = CacheServerTestUtil.getCache()
- .getRegion(DurableRegistrationDUnitTest.class.getName() + "_region");
- // Region region =
- //
CacheServerTestUtil.getCache().getRegion(DurableClientSampleDUnitTest.regionName);
- assertNotNull(region);
- region.unregisterInterest(K1);
- region.unregisterInterest(K2);
- region.unregisterInterest(K3);
- region.unregisterInterest(K4);
-
- }
-
- private static void registerKeys() throws Exception {
- try {
- // Get the region
- Region region = CacheServerTestUtil.getCache()
- .getRegion(DurableRegistrationDUnitTest.class.getName() + "_region");
- // Region region =
- //
CacheServerTestUtil.getCache().getRegion(DurableClientSampleDUnitTest.regionName);
- assertNotNull(region);
-
- region.registerInterest(K1, InterestResultPolicy.KEYS_VALUES, false);
- region.registerInterest(K2, InterestResultPolicy.KEYS_VALUES, false);
- region.registerInterest(K3, InterestResultPolicy.KEYS_VALUES, true);
- region.registerInterest(K4, InterestResultPolicy.KEYS_VALUES, true);
-
- assertNotNull(region.getInterestList());
-
- } catch (Exception ex) {
- Assert.fail("failed while registering interest in registerKey function",
ex);
- }
- }
-
- private static String getValue(String key) {
- Region r = CacheServerTestUtil.getCache()
- .getRegion(DurableRegistrationDUnitTest.class.getName() + "_region");
- // Region r = CacheServerTestUtil.getCache().getRegion(regionName);
- assertNotNull(r);
- // String value = (String)r.get(key);
- // String value = (String)r.getEntry(key).getValue();
- Region.Entry re = r.getEntry(key);
-
- if (re == null) {
- return null;
- } else {
- return (String) re.getValue();
- }
- }
-
- private static void registerKey(String key, boolean isDurable) throws
Exception {
- try {
- // Get the region
- Region region = CacheServerTestUtil.getCache()
- .getRegion(DurableRegistrationDUnitTest.class.getName() + "_region");
- // Region region =
- // CacheServerTestUtil.getCache().getRegion(regionName);
- assertNotNull(region);
- region.registerInterest(key, InterestResultPolicy.NONE, isDurable);
- } catch (Exception ex) {
- Assert.fail("failed while registering interest in registerKey function",
ex);
- }
- }
-
- private static void unregisterKey(String key) throws Exception {
- try {
- // Get the region
- Region region = CacheServerTestUtil.getCache()
- .getRegion(DurableRegistrationDUnitTest.class.getName() + "_region");
- // Region region =
- // CacheServerTestUtil.getCache().getRegion(regionName);
- assertNotNull(region);
- region.unregisterInterest(key);
- } catch (Exception ex) {
- Assert.fail("failed while registering interest in registerKey function",
ex);
- }
- }
-
- private static void putValue(String key, String value) {
- try {
- Region r = CacheServerTestUtil.getCache()
- .getRegion(DurableRegistrationDUnitTest.class.getName() + "_region");
- // Region r = CacheServerTestUtil.getCache().getRegion(regionName);
- assertNotNull(r);
- if (r.getEntry(key) != null) {
- r.put(key, value);
- } else {
- r.create(key, value);
- }
- assertEquals(value, r.getEntry(key).getValue());
- } catch (Exception e) {
-
- fail("Put in Server has some fight");
-
- }
- }
-
- private Pool getClientPool(String host, int server1Port, int server2Port,
- boolean establishCallbackConnection, int redundancyLevel) {
- PoolFactory pf = PoolManager.createFactory();
- pf.addServer(host, server1Port).addServer(host, server2Port)
- .setSubscriptionEnabled(establishCallbackConnection)
- .setSubscriptionRedundancy(redundancyLevel);
- return ((PoolFactoryImpl) pf).getPoolAttributes();
- }
-
- private Properties getClientDistributedSystemProperties(String
durableClientId) {
- return getClientDistributedSystemProperties(durableClientId,
- DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT);
- }
-
- private static void checkNumberOfClientProxies(final int expected) {
- WaitCriterion ev = new WaitCriterion() {
- @Override
- public boolean done() {
- return expected == getNumberOfClientProxies();
- }
-
- @Override
- public String description() {
- return null;
- }
- };
- GeodeAwaitility.await().untilAsserted(ev);
- }
-
- protected static int getNumberOfClientProxies() {
- return
getBridgeServer().getAcceptor().getCacheClientNotifier().getClientProxies().size();
- }
-
- private Properties getClientDistributedSystemProperties(String
durableClientId,
- int durableClientTimeout) {
- Properties properties = new Properties();
- properties.setProperty(MCAST_PORT, "0");
- properties.setProperty(LOCATORS, "");
- properties.setProperty(DURABLE_CLIENT_ID, durableClientId);
- properties.setProperty(DURABLE_CLIENT_TIMEOUT,
String.valueOf(durableClientTimeout));
- return properties;
- }
-
- private static CacheClientProxy getClientProxy() {
- // Get the CacheClientNotifier
- CacheClientNotifier notifier =
getBridgeServer().getAcceptor().getCacheClientNotifier();
-
- // Get the CacheClientProxy or not (if proxy set is empty)
- CacheClientProxy proxy = null;
- Iterator i = notifier.getClientProxies().iterator();
- if (i.hasNext()) {
- proxy = (CacheClientProxy) i.next();
- }
- return proxy;
- }
-
- private static CacheServerImpl getBridgeServer() {
- CacheServerImpl bridgeServer =
- (CacheServerImpl)
CacheServerTestUtil.getCache().getCacheServers().iterator().next();
- assertNotNull(bridgeServer);
- return bridgeServer;
- }
-
- public static void closeCache() {
- Cache cache = CacheServerTestUtil.getCache();
- if (cache != null && !cache.isClosed()) {
- cache.close(true);
- cache.getDistributedSystem().disconnect();
- }
- }
-
- public String getRegionName() {
- return regionName;
- }
-
- public void setRegionName(String regionName) {
- this.regionName = regionName;
- }
-
- @Override
- public final void preTearDown() throws Exception {
- CacheServerTestUtil.resetDisableShufflingOfEndpointsFlag();
- }
-}
diff --git
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableRegistrationDistributedTest.java
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableRegistrationDistributedTest.java
new file mode 100644
index 0000000000..627403fc38
--- /dev/null
+++
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/DurableRegistrationDistributedTest.java
@@ -0,0 +1,703 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import static
org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID;
+import static
org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_TIMEOUT;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static
org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort;
+import static
org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.createCacheClient;
+import static
org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.createCacheServer;
+import static
org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.disableShufflingOfEndpoints;
+import static
org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.getCache;
+import static
org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.getClientCache;
+import static
org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.resetDisableShufflingOfEndpointsFlag;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
+
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.CacheException;
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.Pool;
+import org.apache.geode.cache.client.PoolFactory;
+import org.apache.geode.cache.client.PoolManager;
+import org.apache.geode.cache30.CacheSerializableRunnable;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.FilterProfile;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.PoolFactoryImpl;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.Wait;
+import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
+import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
+
+/**
+ * We have 2 servers and One client which registers some keys with durable
interest and some without
+ * it. We maintain queues on only One server as redundancy level is one.
Following 2 tests have the
+ * two TestCase scenarios
+ *
+ * There are two Tests First Test does the follows : // Step 1: Starting the
servers // Step 2:
+ * Bring Up the Client // Step 3: Client registers Interests // Step 4: Update
Values on the Server
+ * for Keys // Step 5: Verify Updates on the Client // Step 6: Close Cache of
the DurableClient //
+ * Step 7: Update the Values // Step 8: Re-start the Client // Step 9: Verify
Updates on the Client
+ * // Step 10 : Stop all VMs
+ *
+ * For Test 2 the steps are as follows // Step 1: Starting the servers // Step
2: Bring Up the
+ * Client // Step 3: Client registers Interests // Step 4: Update Values on
the Server for Keys //
+ * Step 5: Verify Updates on the Client // Step 6: Close Cache of the
DurableClient // Step 7:
+ * Update the Values // Step 8: Re-start the Client // Step 9: Send Client
Ready Message // Step 10:
+ * Register all Keys (K1, K2 as Non-Durable. K3, K4 as Durable) // Step 11:
Unregister Some Keys
+ * (Here K1, K3) // Step 12: Modify values on the server for all the Keys //
Step 13: Check the
+ * values for the ones not unregistered and the Unregistered Keys' Values
should be null
+ */
+@Category({ClientSubscriptionTest.class})
+public class DurableRegistrationDistributedTest extends
JUnit4DistributedTestCase {
+
+ private VM server1VM;
+
+ private VM server2VM;
+
+ private VM durableClientVM;
+
+ private String regionName;
+ private String hostName;
+
+ private int PORT1;
+
+ private int PORT2;
+
+ private static final String K1 = "KEY_STONE1";
+
+ private static final String K2 = "KEY_STONE2";
+
+ private static final String K3 = "KEY_STONE3";
+
+ private static final String K4 = "KEY_STONE4";
+
+ public DurableRegistrationDistributedTest() {
+ super();
+ }
+
+ @Override
+ public final void postSetUp() {
+ server1VM = VM.getVM(0);
+ server2VM = VM.getVM(1);
+ durableClientVM = VM.getVM(2);
+ hostName = VM.getHostName();
+ regionName = DurableRegistrationDistributedTest.class.getName() +
"_region";
+ disableShufflingOfEndpoints();
+ }
+
+ @Test
+ public void testSimpleDurableClient() {
+
+ // Step 1: Starting the servers
+ PORT1 = server1VM.invoke(() -> createCacheServer(regionName,
Boolean.TRUE));
+ PORT2 = server2VM.invoke(() -> createCacheServer(regionName,
Boolean.TRUE));
+
+ // Step 2: Bring Up the Client
+ // Start a durable client that is not kept alive on the server when it
stops normally
+ final String durableClientId = getName() + "_client";
+
+ final int durableClientTimeout = 600; // keep the client alive for 600
+ // seconds
+ durableClientVM.invoke(() -> createCacheClient(
+ getClientPool(hostName, PORT1, PORT2, true,
+ 0),
+ regionName, getClientDistributedSystemProperties(durableClientId,
durableClientTimeout),
+ Boolean.TRUE));
+
+
+
+ // Step 3: Client registers Interests
+ // KEY_STONE1, KEY_STONE2 are registered as durableKeys & KEY_STONE3,
+ // KEY_STONE4 as non-durableKeys
+ durableClientVM.invoke(() -> registerKey(K1, Boolean.FALSE));
+ durableClientVM.invoke(() -> registerKey(K2, Boolean.FALSE));
+ durableClientVM.invoke(() -> registerKey(K3, Boolean.TRUE));
+ durableClientVM.invoke(() -> registerKey(K4, Boolean.TRUE));
+
+
+ // Send clientReady message
+ durableClientVM.invoke("Send clientReady", new CacheSerializableRunnable()
{
+ @Override
+ public void run2() throws CacheException {
+ getClientCache().readyForEvents();
+ }
+ });
+
+ // Step 4: Update Values on the Server for KEY_STONE1, KEY_STONE2,
+ // KEY_STONE3, KEY_STONE4
+ server2VM.invoke(() -> putValue(K1, "Value1"));
+ server2VM.invoke(() -> putValue(K2, "Value2"));
+ server2VM.invoke(() -> putValue(K3, "Value3"));
+ server2VM.invoke(() -> putValue(K4, "Value4"));
+
+
+ GeodeAwaitility.await().atMost(1000, TimeUnit.MILLISECONDS)
+ .until(() -> server2VM.invoke(() ->
getValue(K1).contentEquals("Value1")));
+ // Step 5: Verify Updates on the Client
+ assertThat(server2VM.invoke(() -> getValue(K1))).isEqualTo("Value1");
+ assertThat(server1VM.invoke(() -> getValue(K1))).isEqualTo("Value1");
+
+ assertThat(durableClientVM.invoke(() -> getValue(K1))).isEqualTo("Value1");
+ assertThat(durableClientVM.invoke(() -> getValue(K2))).isEqualTo("Value2");
+ assertThat(durableClientVM.invoke(() -> getValue(K3))).isEqualTo("Value3");
+ assertThat(durableClientVM.invoke(() -> getValue(K4))).isEqualTo("Value4");
+
+ // Step 6: Close Cache of the DurableClient
+ durableClientVM.invoke(this::closeCache);
+
+ // Step 7: Update KEY_STONE1,KEY_STONE2,KEY_STONE3,KEY_STONE4 on the
+ // Server say with values PingPong1, PingPong2, PingPong3, PingPong4
+ server2VM.invoke(() -> putValue(K1, "PingPong1"));
+ server2VM.invoke(() -> putValue(K2, "PingPong2"));
+ server2VM.invoke(() -> putValue(K3, "PingPong3"));
+ server2VM.invoke(() -> putValue(K4, "PingPong4"));
+
+ // Step 8: Re-start the Client
+ durableClientVM.invoke(() -> createCacheClient(
+ getClientPool(hostName, PORT1, PORT2,
+ true, 0),
+ regionName, getClientDistributedSystemProperties(durableClientId),
Boolean.TRUE));
+
+ // Send clientReady message
+ durableClientVM.invoke("Send clientReady", new CacheSerializableRunnable()
{
+ @Override
+ public void run2() throws CacheException {
+ getClientCache().readyForEvents();
+ }
+ });
+
+ GeodeAwaitility.await().atMost(5000, TimeUnit.MILLISECONDS)
+ .until(() -> durableClientVM.invoke(() -> getValue(K1) == null));
+
+ assertThat(durableClientVM.invoke(() -> getValue(K2))).isNull();
+
+ durableClientVM.invoke(() -> registerKey(K1, Boolean.FALSE));
+
+ GeodeAwaitility.await().atMost(5000, TimeUnit.MILLISECONDS)
+ .until(() -> durableClientVM.invoke(() -> getValue(K1) == null));
+ assertThat(durableClientVM.invoke(() -> getValue(K2))).isNull();
+
+ server2VM.invoke(() -> putValue(K1, "PingPong_updated_1"));
+ server2VM.invoke(() -> putValue(K2, "PingPong_updated_2"));
+ server2VM.invoke(() -> putValue(K3, "PingPong_updated_3"));
+ server2VM.invoke(() -> putValue(K4, "PingPong_updated_4"));
+
+
+ // Step 9: Verify Updates on the Client
+ GeodeAwaitility.await().atMost(5000, TimeUnit.MILLISECONDS).until(
+ () -> durableClientVM.invoke(() ->
getValue(K1).contentEquals("PingPong_updated_1")));
+
+ assertThat(durableClientVM.invoke(() -> getValue(K2))).isNull();
+ assertThat(durableClientVM.invoke(() ->
getValue(K3))).isEqualTo("PingPong_updated_3");
+ assertThat(durableClientVM.invoke(() ->
getValue(K4))).isEqualTo("PingPong_updated_4");
+
+ // Step 10 : Stop all VMs
+ // Stop the durable client
+ durableClientVM.invoke(() -> CacheServerTestUtil.closeCache());
+
+ // Stop server 2
+ server2VM.invoke(() -> CacheServerTestUtil.closeCache());
+
+ // Stop server 1
+ server1VM.invoke(() -> CacheServerTestUtil.closeCache());
+
+ }
+
+ @Test
+ public void testSimpleDurableClientWithRegistration() {
+ // Step 1: Starting the servers
+ PORT1 = server1VM.invoke(() -> createCacheServer(regionName,
Boolean.TRUE));
+ PORT2 = server2VM.invoke(() -> createCacheServer(regionName,
Boolean.TRUE));
+
+ // Step 2: Bring Up the Client
+ // Start a durable client that is not kept alive on the server when it
stops normally
+ final String durableClientId = getName() + "_client";
+ // keep the client alive for 600 seconds
+ final int durableClientTimeout = 600;
+ durableClientVM.invoke(() -> createCacheClient(
+ getClientPool(hostName, PORT1, PORT2, true,
+ 0),
+ regionName, getClientDistributedSystemProperties(durableClientId,
durableClientTimeout)));
+
+ // Step 3: Client registers Interests
+ // KEY_STONE1, KEY_STONE2 are registered as durableKeys & KEY_STONE3,
+ // KEY_STONE4 as non-durableKeys
+
+ durableClientVM.invoke(() -> registerKey(K1, Boolean.FALSE));
+ durableClientVM.invoke(() -> registerKey(K2, Boolean.FALSE));
+ durableClientVM.invoke(() -> registerKey(K3, Boolean.TRUE));
+ durableClientVM.invoke(() -> registerKey(K4, Boolean.TRUE));
+
+ // Send clientReady message
+ durableClientVM.invoke("Send clientReady", new CacheSerializableRunnable()
{
+ @Override
+ public void run2() throws CacheException {
+ getClientCache().readyForEvents();
+ }
+ });
+
+ // Step 4: Update Values on the Server for KEY_STONE1, KEY_STONE2,
+ // KEY_STONE3, KEY_STONE4
+ server2VM.invoke(() -> putValue(K1, "Value1"));
+ server2VM.invoke(() -> putValue(K2, "Value2"));
+ server2VM.invoke(() -> putValue(K3, "Value3"));
+ server2VM.invoke(() -> putValue(K4, "Value4"));
+
+
+ // Step 5: Verify Updates on the Client
+ GeodeAwaitility.await().atMost(1000, TimeUnit.MILLISECONDS)
+ .until(() -> server2VM.invoke(() ->
getValue(K1).contentEquals("Value1")));
+ assertThat(server1VM.invoke(() -> getValue(K1))).isEqualTo("Value1");
+
+ assertThat(durableClientVM.invoke(() -> getValue(K1))).isEqualTo("Value1");
+ assertThat(durableClientVM.invoke(() -> getValue(K2))).isEqualTo("Value2");
+ assertThat(durableClientVM.invoke(() -> getValue(K3))).isEqualTo("Value3");
+ assertThat(durableClientVM.invoke(() -> getValue(K4))).isEqualTo("Value4");
+
+ // Step 6: Close Cache of the DurableClient
+ durableClientVM.invoke(this::closeCache);
+
+ // Step 7: Update KEY_STONE1,KEY_STONE2,KEY_STONE3,KEY_STONE4 on the
+ // Server say with values PingPong1, PingPong2, PingPong3, PingPong4
+ server2VM.invoke(() -> putValue(K1, "PingPong1"));
+ server2VM.invoke(() -> putValue(K2, "PingPong2"));
+ server2VM.invoke(() -> putValue(K3, "PingPong3"));
+ server2VM.invoke(() -> putValue(K4, "PingPong4"));
+
+ // Step 8: Re-start the Client
+ durableClientVM.invoke(() -> createCacheClient(
+ getClientPool(hostName, PORT1, PORT2, true, 0),
+ regionName, getClientDistributedSystemProperties(durableClientId),
Boolean.TRUE));
+
+
+ // Step 9: Register all Keys
+ durableClientVM.invoke(() -> registerKey(K3, Boolean.TRUE));
+ durableClientVM.invoke(() -> registerKey(K4, Boolean.TRUE));
+
+ durableClientVM.invoke(() -> registerKey(K1, Boolean.FALSE));
+ durableClientVM.invoke(() -> registerKey(K2, Boolean.FALSE));
+
+
+ // Step 10: Send clientReady message
+ durableClientVM.invoke("Send clientReady", new CacheSerializableRunnable()
{
+ @Override
+ public void run2() throws CacheException {
+ getClientCache().readyForEvents();
+ }
+ });
+
+ // Step 11: Unregister Some Keys (Here K1, K3)
+ durableClientVM.invoke(() -> unregisterKey(K1));
+ durableClientVM.invoke(() -> unregisterKey(K3));
+
+ // Step 12: Modify values on the server for all the Keys
+ server2VM.invoke(() -> putValue(K1, "PingPong_updated_1"));
+ server2VM.invoke(() -> putValue(K2, "PingPong_updated_2"));
+ server2VM.invoke(() -> putValue(K3, "PingPong_updated_3"));
+ server2VM.invoke(() -> putValue(K4, "PingPong_updated_4"));
+
+ // Step 13: Check the values for the ones not unregistered and the
+ // Unregistered Keys' Values should be null
+ GeodeAwaitility.await("Prob in KEY_STONE2: ").atMost(5000,
TimeUnit.MILLISECONDS).until(
+ () -> durableClientVM.invoke(() ->
getValue(K2).contentEquals("PingPong_updated_2")));
+ assertThat(durableClientVM.invoke(() -> getValue(K4))).describedAs("Prob
in KEY_STONE4: ")
+ .isEqualTo("PingPong_updated_4");
+
+
+ assertThat(durableClientVM.invoke(() -> getValue(K1))).describedAs("Prob
in KEY_STONE1: ")
+ .isNull();
+ assertThat(durableClientVM.invoke(() -> getValue(K3))).describedAs("Prob
in KEY_STONE1: ")
+ .isEqualTo("PingPong3");
+
+ // Stop the durable client
+ durableClientVM.invoke(() -> CacheServerTestUtil.closeCache());
+
+ // Stop server 2
+ server2VM.invoke(() -> CacheServerTestUtil.closeCache());
+
+ // Stop server 1
+ server1VM.invoke(() -> CacheServerTestUtil.closeCache());
+
+ }
+
+ @Test
+ public void testDurableClientWithRegistrationHA() {
+
+ // Step 1: Start server1
+ PORT1 = server1VM.invoke(() -> createCacheServer(regionName,
Boolean.TRUE));
+ PORT2 = getRandomAvailableTCPPort();
+
+ // Step 2: Bring Up the Client
+ final String durableClientId = getName() + "_client";
+ // keep the client alive for 600 seconds
+ final int durableClientTimeout = 600;
+ durableClientVM.invoke(() -> createCacheClient(
+ getClientPool(hostName, PORT1, PORT2, true,
+ 1),
+ regionName, getClientDistributedSystemProperties(durableClientId,
durableClientTimeout)));
+
+ // Send clientReady message
+ durableClientVM.invoke("Send clientReady", new CacheSerializableRunnable()
{
+ @Override
+ public void run2() throws CacheException {
+ getClientCache().readyForEvents();
+ }
+ });
+
+ // Step 3: Client registers Interests
+ durableClientVM.invoke(() -> registerKey(K1, Boolean.FALSE));
+ durableClientVM.invoke(() -> registerKey(K2, Boolean.FALSE));
+ durableClientVM.invoke(() -> registerKey(K3, Boolean.TRUE));
+ durableClientVM.invoke(() -> registerKey(K4, Boolean.TRUE));
+
+ // Step 4: Bring up the server2
+ server2VM.invoke(() -> createCacheServer(regionName, Boolean.TRUE, PORT2));
+
+ GeodeAwaitility.await().atMost(3000, TimeUnit.MILLISECONDS)
+ .until(() -> server2VM.invoke(() ->
getCache().getCacheServers().get(0).isRunning()));
+
+
+ // Check server2 got all the interests registered by the durable client.
+ server2VM.invoke("Verify Interests.", new CacheSerializableRunnable() {
+ @Override
+ public void run2() throws CacheException {
+ logger.info("### Verifying interests registered by DurableClient.
###");
+ CacheClientNotifier ccn = CacheClientNotifier.getInstance();
+ CacheClientProxy p = null;
+
+ // Get proxy for the client.
+ for (int i = 0; i < 60; i++) {
+ Iterator<CacheClientProxy> ps = ccn.getClientProxies().iterator();
+ if (!ps.hasNext()) {
+ Wait.pause(1000);
+ } else {
+ p = ps.next();
+ break;
+ }
+ }
+
+ if (p == null) {
+ fail("Proxy initialization taking long time. Increase the wait
time.");
+ }
+
+ Iterator<String> rs = p.getInterestRegisteredRegions().iterator();
+ String rName = rs.next();
+ assertThat(rs).describedAs("Null region Name found.").isNotNull();
+
+ assertThat(p.getCache()).isEqualTo(GemFireCacheImpl.getInstance());
+ LocalRegion r = (LocalRegion)
GemFireCacheImpl.getInstance().getRegion(rName);
+ assertThat(r).describedAs("Null region found.").isNotNull();
+ FilterProfile pf = r.getFilterProfile();
+
+ Set<String> interestKeys =
pf.getKeysOfInterest(p.getProxyID().getDurableId());
+ assertThat(interestKeys).describedAs("durable Interests not found for
the proxy")
+ .isNotNull();
+ assertThat(interestKeys.size())
+ .describedAs("The number of durable keys registered during
HARegion GII doesn't match.")
+ .isEqualTo(2);
+ interestKeys = pf.getKeysOfInterest(p.getProxyID());
+ assertThat(interestKeys).describedAs("non-durable Interests not found
for the proxy")
+ .isNotNull();
+ assertThat(2)
+ .describedAs(
+ "The number of non-durable keys registered during HARegion GII
doesn't match.")
+ .isEqualTo(interestKeys.size());
+ }
+ });
+
+
+ // Stop the durable client
+ durableClientVM.invoke(() -> CacheServerTestUtil.closeCache());
+
+ // Stop server 2
+ server2VM.invoke(() -> CacheServerTestUtil.closeCache());
+
+ // Stop server 1
+ server1VM.invoke(() -> CacheServerTestUtil.closeCache());
+
+ }
+
+ @Test
+ public void testDurableClientDisConnectWithRegistrationHA() {
+
+ // Step 1: Start server1
+ PORT1 = server1VM
+ .invoke(() -> createCacheServer(regionName, Boolean.TRUE));
+ PORT2 = getRandomAvailableTCPPort();
+
+ // Step 2: Bring Up the Client
+ final String durableClientId = getName() + "_client";
+ // keep the client alive for 600 seconds
+ final int durableClientTimeout = 600;
+ durableClientVM.invoke(() -> createCacheClient(
+ getClientPool(hostName, PORT1, PORT2, true,
+ 1),
+ regionName, getClientDistributedSystemProperties(durableClientId,
durableClientTimeout)));
+
+ // Send clientReady message
+ durableClientVM.invoke("Send clientReady", new CacheSerializableRunnable()
{
+ @Override
+ public void run2() throws CacheException {
+ getClientCache().readyForEvents();
+ }
+ });
+
+ // Step 3: Client registers Interests
+ durableClientVM.invoke(() -> registerKey(K1, Boolean.FALSE));
+ durableClientVM.invoke(() -> registerKey(K2, Boolean.FALSE));
+ durableClientVM.invoke(() -> registerKey(K3, Boolean.TRUE));
+ durableClientVM.invoke(() -> registerKey(K4, Boolean.TRUE));
+
+ // Close Cache of the DurableClient
+ durableClientVM.invoke(this::closeCache);
+ GeodeAwaitility.await().until(() -> durableClientVM.invoke(() ->
getClientCache().isClosed()));
+
+ // Re-start the Client
+ durableClientVM.invoke(() -> createCacheClient(
+ getClientPool(hostName, PORT1, PORT2,
+ true, 1),
+ regionName, getClientDistributedSystemProperties(durableClientId),
Boolean.TRUE));
+
+ // Send clientReady message
+ durableClientVM.invoke("Send clientReady", new CacheSerializableRunnable()
{
+ @Override
+ public void run2() throws CacheException {
+ getClientCache().readyForEvents();
+ }
+ });
+
+ // Step 4: Bring up the server2
+ server2VM
+ .invoke(() -> createCacheServer(regionName, Boolean.TRUE, PORT2));
+
+ Wait.pause(3000);
+
+ // Check server2 got all the interests registered by the durable client.
+ server2VM.invoke("Verify Interests.", new CacheSerializableRunnable() {
+ @Override
+ public void run2() throws CacheException {
+ logger.info("### Verifying interests registered by DurableClient.
###");
+ CacheClientNotifier ccn = CacheClientNotifier.getInstance();
+ CacheClientProxy p = null;
+
+ // Get proxy for the client.
+ for (int i = 0; i < 60; i++) {
+ Iterator<CacheClientProxy> ps = ccn.getClientProxies().iterator();
+ if (!ps.hasNext()) {
+ Wait.pause(1000);
+ } else {
+ p = ps.next();
+ break;
+ }
+ }
+
+ if (p == null) {
+ fail("Proxy initialization taking long time. Increase the wait
time.");
+ }
+
+ Iterator<String> rs = p.getInterestRegisteredRegions().iterator();
+ String rName = rs.next();
+ assertThat(rs).describedAs("Null region Name found.").isNotNull();
+ LocalRegion r = (LocalRegion)
GemFireCacheImpl.getInstance().getRegion(rName);
+ assertThat(r).describedAs("Null region found.").isNotNull();
+ FilterProfile pf = r.getFilterProfile();
+
+ Set<String> interestKeys =
pf.getKeysOfInterest(p.getProxyID().getDurableId());
+ assertThat(interestKeys).describedAs("durable Interests not found for
the proxy")
+ .isNotNull();
+ assertThat(2)
+ .describedAs("The number of durable keys registered during
HARegion GII doesn't match.")
+ .isEqualTo(interestKeys.size());
+ interestKeys = pf.getKeysOfInterest(p.getProxyID());
+ assertThat(interestKeys).describedAs("non-durable Interests found for
the proxy").isNull();
+ }
+ });
+
+
+ // Stop the durable client
+ durableClientVM.invoke(() -> CacheServerTestUtil.closeCache());
+
+ // Stop server 2
+ server2VM.invoke(() -> CacheServerTestUtil.closeCache());
+
+ // Stop server 1
+ server1VM.invoke(() -> CacheServerTestUtil.closeCache());
+
+ }
+
+ private void registerKeys() {
+ try {
+ // Get the region
+ Region<String, String> region = getCache()
+ .getRegion(DurableRegistrationDistributedTest.class.getName() +
"_region");
+
+ assertThat(region).isNotNull();
+
+ region.registerInterest(K1, InterestResultPolicy.KEYS_VALUES, false);
+ region.registerInterest(K2, InterestResultPolicy.KEYS_VALUES, false);
+ region.registerInterest(K3, InterestResultPolicy.KEYS_VALUES, true);
+ region.registerInterest(K4, InterestResultPolicy.KEYS_VALUES, true);
+
+ assertThat(region.getInterestList()).isNotNull();
+
+ } catch (Exception ex) {
+ fail("failed while registering interest in registerKey function", ex);
+ }
+ }
+
+ private String getValue(String key) {
+ Region<String, String> r =
+
getCache().getRegion(DurableRegistrationDistributedTest.class.getName() +
"_region");
+
+ assertThat(r).isNotNull();
+
+ Region.Entry<String, String> re = r.getEntry(key);
+
+ if (re == null) {
+ return null;
+ } else {
+ return re.getValue();
+ }
+ }
+
+ private void registerKey(String key, boolean isDurable) {
+ try {
+ // Get the region
+ Region<String, String> region = getCache()
+ .getRegion(DurableRegistrationDistributedTest.class.getName() +
"_region");
+
+ assertThat(region).isNotNull();
+ region.registerInterest(key, InterestResultPolicy.NONE, isDurable);
+ } catch (Exception ex) {
+ fail("failed while registering interest in registerKey function", ex);
+ }
+ }
+
+ private void unregisterKey(String key) {
+ try {
+ // Get the region
+ Region<String, String> region = getCache()
+ .getRegion(DurableRegistrationDistributedTest.class.getName() +
"_region");
+
+ assertThat(region).isNotNull();
+ region.unregisterInterest(key);
+ } catch (Exception ex) {
+ fail("failed while registering interest in registerKey function", ex);
+ }
+ }
+
+ private void putValue(String key, String value) {
+ try {
+ Region<String, String> r = getCache()
+ .getRegion(DurableRegistrationDistributedTest.class.getName() +
"_region");
+
+ assertThat(r).isNotNull();
+ if (r.getEntry(key) != null) {
+ r.put(key, value);
+ } else {
+ r.create(key, value);
+ }
+ assertThat(r.getEntry(key).getValue()).isEqualTo(value);
+ } catch (Exception e) {
+
+ fail("Put in Server has some fight");
+
+ }
+ }
+
+ private Pool getClientPool(String host, int server1Port, int server2Port,
+ boolean establishCallbackConnection, int redundancyLevel) {
+ PoolFactory pf = PoolManager.createFactory();
+ pf.addServer(host, server1Port).addServer(host, server2Port)
+ .setSubscriptionEnabled(establishCallbackConnection)
+ .setSubscriptionRedundancy(redundancyLevel);
+ return ((PoolFactoryImpl) pf).getPoolAttributes();
+ }
+
+ private Properties getClientDistributedSystemProperties(String
durableClientId) {
+ return getClientDistributedSystemProperties(durableClientId,
+ DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT);
+ }
+
+ protected int getNumberOfClientProxies() {
+ return
getBridgeServer().getAcceptor().getCacheClientNotifier().getClientProxies().size();
+ }
+
+ private Properties getClientDistributedSystemProperties(String
durableClientId,
+ int durableClientTimeout) {
+ Properties properties = new Properties();
+ properties.setProperty(MCAST_PORT, "0");
+ properties.setProperty(LOCATORS, "");
+ properties.setProperty(DURABLE_CLIENT_ID, durableClientId);
+ properties.setProperty(DURABLE_CLIENT_TIMEOUT,
String.valueOf(durableClientTimeout));
+ return properties;
+ }
+
+ private CacheClientProxy getClientProxy() {
+ // Get the CacheClientNotifier
+ CacheClientNotifier notifier =
getBridgeServer().getAcceptor().getCacheClientNotifier();
+
+ // Get the CacheClientProxy or not (if proxy set is empty)
+ CacheClientProxy proxy = null;
+ Iterator<CacheClientProxy> i = notifier.getClientProxies().iterator();
+ if (i.hasNext()) {
+ proxy = i.next();
+ }
+ return proxy;
+ }
+
+ private CacheServerImpl getBridgeServer() {
+ CacheServerImpl bridgeServer =
+ (CacheServerImpl) getCache().getCacheServers().iterator().next();
+ assertThat(bridgeServer).isNotNull();
+ return bridgeServer;
+ }
+
+ public void closeCache() {
+ ClientCache cache = getClientCache();
+ if (cache != null && !cache.isClosed()) {
+ cache.close(true);
+ }
+ }
+
+ public String getRegionName() {
+ return regionName;
+ }
+
+ public void setRegionName(String regionName) {
+ this.regionName = regionName;
+ }
+
+ @Override
+ public final void preTearDown() {
+ resetDisableShufflingOfEndpointsFlag();
+ }
+}
diff --git
a/geode-core/src/distributedTest/java/org/apache/geode/security/AuthExpirationDUnitTest.java
b/geode-core/src/distributedTest/java/org/apache/geode/security/AuthExpirationDistributedTest.java
similarity index 95%
rename from
geode-core/src/distributedTest/java/org/apache/geode/security/AuthExpirationDUnitTest.java
rename to
geode-core/src/distributedTest/java/org/apache/geode/security/AuthExpirationDistributedTest.java
index bf38db0a6b..8a3a1de11b 100644
---
a/geode-core/src/distributedTest/java/org/apache/geode/security/AuthExpirationDUnitTest.java
+++
b/geode-core/src/distributedTest/java/org/apache/geode/security/AuthExpirationDistributedTest.java
@@ -30,7 +30,6 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.junit.After;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.contrib.java.lang.system.RestoreSystemProperties;
@@ -50,7 +49,7 @@ import org.apache.geode.test.junit.categories.SecurityTest;
import org.apache.geode.test.junit.rules.ServerStarterRule;
@Category({SecurityTest.class})
-public class AuthExpirationDUnitTest {
+public class AuthExpirationDistributedTest {
@Rule
public ClusterStartupRule cluster = new ClusterStartupRule();
@@ -119,7 +118,8 @@ public class AuthExpirationDUnitTest {
}
@Test
- public void registeredInterest_slowReAuth_policyDefault() throws Exception {
+ public void
registeredInterestForKeysAndValidatedTheyWereAllReceived_slowReAuth_policyDefault()
+ throws Exception {
int serverPort = server.getPort();
clientVM = cluster.startClientVM(0,
c -> c.withProperty(SECURITY_CLIENT_AUTH_INIT,
UpdatableUserAuthInitialize.class.getName())
@@ -171,8 +171,8 @@ public class AuthExpirationDUnitTest {
}
@Test
- @Ignore("unnecessary test case for re-auth, but it manifests GEODE-9704")
- public void registeredInterest_slowReAuth_policyKeys_durableClient() throws
Exception {
+ public void
registeredInterestForKeysAndValidatedTheyWereAllReceived_slowReAuth_policyNone_durableClient()
+ throws Exception {
int serverPort = server.getPort();
clientVM = cluster.startClientVM(0,
c -> c.withProperty(SECURITY_CLIENT_AUTH_INIT,
UpdatableUserAuthInitialize.class.getName())
@@ -180,14 +180,13 @@ public class AuthExpirationDUnitTest {
.withPoolSubscription(true)
.withServerConnection(serverPort));
-
clientVM.invoke(() -> {
UpdatableUserAuthInitialize.setUser("user1");
ClientCache clientCache = ClusterStartupRule.getClientCache();
Region<Object, Object> region = clientCache
.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("region");
- region.registerInterestForAllKeys(InterestResultPolicy.KEYS, true);
+ region.registerInterestForAllKeys(InterestResultPolicy.NONE, true);
clientCache.readyForEvents();
UpdatableUserAuthInitialize.setUser("user11");
// wait for time longer than server's max time to wait to re-authenticate
@@ -215,7 +214,8 @@ public class AuthExpirationDUnitTest {
private static KeysCacheListener myListener = new KeysCacheListener();
@Test
- public void registeredInterest_slowReAuth_policyNone_durableClient() throws
Exception {
+ public void
registeredInterestForKeysAndValidatedTheyWereAllReceived_slowReAuth_policyNone_CacheListener_durableClient()
+ throws Exception {
int serverPort = server.getPort();
clientVM = cluster.startClientVM(0,
c -> c.withProperty(SECURITY_CLIENT_AUTH_INIT,
UpdatableUserAuthInitialize.class.getName())
@@ -260,7 +260,7 @@ public class AuthExpirationDUnitTest {
@Test
- public void registeredInterest_slowReAuth_policyNone_nonDurableClient()
+ public void
registeredInterestForKeysAndValidatedTheyWereAllReceived_slowReAuth_policyNone_nonDurableClient()
throws Exception {
int serverPort = server.getPort();
clientVM = cluster.startClientVM(0,
diff --git
a/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/QueueManagerJUnitTest.java
b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/QueueManagerIntegrationTest.java
similarity index 78%
rename from
geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/QueueManagerJUnitTest.java
rename to
geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/QueueManagerIntegrationTest.java
index dbdd1236ad..143dfeaf53 100644
---
a/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/QueueManagerJUnitTest.java
+++
b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/QueueManagerIntegrationTest.java
@@ -23,7 +23,8 @@ import static
org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.assertj.core.api.Assertions.catchThrowable;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@@ -35,22 +36,27 @@ import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.InterestResultPolicy;
import org.apache.geode.cache.NoSubscriptionServersAvailableException;
+import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.client.ServerRefusedConnectionException;
import org.apache.geode.cache.client.SocketFactory;
import org.apache.geode.cache.client.SubscriptionNotEnabledException;
@@ -59,26 +65,28 @@ import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.internal.ServerLocation;
import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.PoolStats;
+import org.apache.geode.internal.cache.tier.InterestType;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
import org.apache.geode.internal.cache.tier.sockets.ServerQueueStatus;
import org.apache.geode.internal.logging.LocalLogWriter;
-import org.apache.geode.test.junit.categories.ClientServerTest;
-@Category(ClientServerTest.class)
-public class QueueManagerJUnitTest {
+@Tag("ClientServerTest")
+public class QueueManagerIntegrationTest {
- private DummyPool pool;
+ private TestPool pool;
private LocalLogWriter logger;
private DistributedSystem ds;
private EndpointManagerImpl endpoints;
- private DummySource source;
- private DummyFactory factory;
- private QueueManager manager;
+ private TestConnectionSource source;
+ private TestConnectionFactory factory;
+ private QueueManagerImpl manager;
private ScheduledExecutorService background;
private PoolStats stats;
+ private List<Op> opList;
- @Before
+ @BeforeEach
public void setUp() {
logger = new LocalLogWriter(FINEST.intLevel(), System.out);
@@ -89,17 +97,17 @@ public class QueueManagerJUnitTest {
ds = DistributedSystem.connect(properties);
stats = new PoolStats(ds, "QueueManagerJUnitTest");
- pool = new DummyPool();
+ pool = new TestPool();
endpoints = new EndpointManagerImpl("pool", ds, ds.getCancelCriterion(),
pool.getStats());
- source = new DummySource();
- factory = new DummyFactory();
+ source = new TestConnectionSource();
+ factory = new TestConnectionFactory();
background = Executors.newSingleThreadScheduledExecutor();
-
+ opList = new LinkedList<>();
addIgnoredException("Could not find any server to host primary client
queue.");
addIgnoredException("Could not find any server to host redundant client
queue.");
}
- @After
+ @AfterEach
public void tearDown() {
background.shutdownNow();
manager.close(false);
@@ -195,9 +203,8 @@ public class QueueManagerJUnitTest {
assertPortEquals(2, manager.getAllConnections().getPrimary());
- await().untilAsserted(() -> {
- assertPortEquals(new int[] {3, 4, 5},
manager.getAllConnections().getBackups());
- });
+ await().untilAsserted(
+ () -> assertPortEquals(new int[] {3, 4, 5},
manager.getAllConnections().getBackups()));
}
@Test
@@ -216,18 +223,16 @@ public class QueueManagerJUnitTest {
assertPortEquals(1, manager.getAllConnections().getPrimary());
- await().untilAsserted(() -> {
- assertPortEquals(new int[] {2, 3},
manager.getAllConnections().getBackups());
- });
+ await().untilAsserted(
+ () -> assertPortEquals(new int[] {2, 3},
manager.getAllConnections().getBackups()));
Connection backup = manager.getAllConnections().getBackups().get(0);
backup.destroy();
assertPortEquals(1, manager.getAllConnections().getPrimary());
- await().untilAsserted(() -> {
- assertPortEquals(new int[] {3, 4},
manager.getAllConnections().getBackups());
- });
+ await().untilAsserted(
+ () -> assertPortEquals(new int[] {3, 4},
manager.getAllConnections().getBackups()));
}
@Test
@@ -238,17 +243,13 @@ public class QueueManagerJUnitTest {
manager.start(background);
manager.getAllConnections().getPrimary().destroy();
- Throwable thrown = catchThrowable(() -> {
- manager.getAllConnections().getPrimary();
- });
-
assertThat(thrown).isInstanceOf(NoSubscriptionServersAvailableException.class);
-
+ assertThatThrownBy(() -> manager.getAllConnections().getPrimary())
+ .isInstanceOf(NoSubscriptionServersAvailableException.class);
factory.addConnection(0, 0, 2);
factory.addConnection(0, 0, 3);
- await().untilAsserted(() -> {
- assertThatCode(() ->
manager.getAllConnections()).doesNotThrowAnyException();
- });
+ await().untilAsserted(
+ () -> assertThatCode(() ->
manager.getAllConnections()).doesNotThrowAnyException());
assertPortEquals(2, manager.getAllConnections().getPrimary());
}
@@ -258,7 +259,7 @@ public class QueueManagerJUnitTest {
String serverRefusedConnectionExceptionMessage =
"Peer or client version with ordinal x not supported. Highest known
version is x.x.x.";
// Spy the factory so that the createClientToServerConnection method can
be mocked
- DummyFactory factorySpy = spy(factory);
+ TestConnectionFactory factorySpy = spy(factory);
manager = new QueueManagerImpl(pool, endpoints, source, factorySpy, 2, 20,
logger,
ClientProxyMembershipID.getNewProxyMembership(ds));
// Cause a ServerRefusedConnectionException to be thrown from
createClientToServerConnection
@@ -277,8 +278,8 @@ public class QueueManagerJUnitTest {
@Test
public void
testAddToConnectionListCallsCloseConnectionOpWithKeepAliveTrue2() {
- // Create a DummyConnection
- DummyConnection connection = factory.addConnection(0, 0, 1);
+ // Create a TestConnection
+ TestConnection connection = factory.addConnection(0, 0, 1);
assertThat(connection.keepAlive).isFalse();
// Get and close its Endpoint
@@ -294,6 +295,70 @@ public class QueueManagerJUnitTest {
assertThat(connection.keepAlive).isTrue();
}
+ @Test
+ public void recoverPrimaryRegistersBeforeSendingReady() {
+ Set<ServerLocation> excludedServers = new HashSet<>();
+ excludedServers.add(new ServerLocation("localhost", 1));
+ excludedServers.add(new ServerLocation("localhost", 2));
+ excludedServers.add(new ServerLocation("localhost", 3));
+ factory.addConnection(0, 0, 1);
+ factory.addConnection(0, 0, 2);
+ factory.addConnection(0, 0, 3);
+
+ LocalRegion testRegion = mock(LocalRegion.class);
+
+ InternalPool pool = new RecoveryTestPool();
+ ServerRegionProxy serverRegionProxy = new ServerRegionProxy("region",
pool);
+
+ when(testRegion.getServerProxy()).thenReturn(serverRegionProxy);
+ RegionAttributes<Object, Object> regionAttributes =
mock(RegionAttributes.class);
+ when(testRegion.getAttributes()).thenReturn(regionAttributes);
+ when(regionAttributes.getDataPolicy()).thenReturn(DataPolicy.DEFAULT);
+
+ createRegisterInterestTracker(pool, testRegion);
+
+ manager = new QueueManagerImpl(pool, endpoints, source, factory, 2,
+ 20, logger, ClientProxyMembershipID.getNewProxyMembership(ds));
+ manager.start(background);
+ manager.setSendClientReadyInTestOnly();
+ manager.clearQueueConnections();
+ factory.addConnection(0, 0, 4);
+ manager.recoverPrimary(excludedServers);
+
+
assertThat(opList.get(0)).isInstanceOf(RegisterInterestListOp.RegisterInterestListOpImpl.class);
+
assertThat(opList.get(1)).isInstanceOf(RegisterInterestListOp.RegisterInterestListOpImpl.class);
+
assertThat(opList.get(2)).isInstanceOf(RegisterInterestListOp.RegisterInterestListOpImpl.class);
+
assertThat(opList.get(3)).isInstanceOf(RegisterInterestListOp.RegisterInterestListOpImpl.class);
+
assertThat(opList.get(4)).isInstanceOf(ReadyForEventsOp.ReadyForEventsOpImpl.class);
+ }
+
+ private void createRegisterInterestTracker(InternalPool localPool,
+ LocalRegion localRegion) {
+ final RegisterInterestTracker registerInterestTracker =
localPool.getRITracker();
+
+ final ConcurrentHashMap<String,
RegisterInterestTracker.RegionInterestEntry> keysConcurrentMap =
+ new ConcurrentHashMap<>();
+ when(registerInterestTracker.getRegionToInterestsMap(eq(InterestType.KEY),
anyBoolean(),
+ anyBoolean())).thenReturn(
+ keysConcurrentMap);
+
+
+ for (InterestType interestType : InterestType.values()) {
+ final ConcurrentHashMap<String,
RegisterInterestTracker.RegionInterestEntry> concurrentMap =
+ new ConcurrentHashMap<>();
+ when(registerInterestTracker.getRegionToInterestsMap(eq(interestType),
anyBoolean(),
+ anyBoolean()))
+ .thenReturn(concurrentMap);
+ if (interestType.equals(InterestType.KEY)) {
+ RegisterInterestTracker.RegionInterestEntry registerInterestEntry =
+ new RegisterInterestTracker.RegionInterestEntry(localRegion);
+
+ registerInterestEntry.getInterests().put("bob",
InterestResultPolicy.NONE);
+ concurrentMap.put("testRegion", registerInterestEntry);
+ }
+ }
+ }
+
private static void assertPortEquals(int expected, Connection actual) {
assertThat(actual.getServer().getPort()).isEqualTo(expected);
}
@@ -312,7 +377,22 @@ public class QueueManagerJUnitTest {
assertThat(actualPorts).isEqualTo(expectedPorts);
}
- private class DummyPool implements InternalPool {
+ private class RecoveryTestPool extends TestPool {
+ RegisterInterestTracker registerInterestTracker =
mock(RegisterInterestTracker.class);
+
+ @Override
+ public Object executeOn(Connection con, Op op) {
+ opList.add(op);
+ return null;
+ }
+
+ @Override
+ public RegisterInterestTracker getRITracker() {
+ return registerInterestTracker;
+ }
+ }
+
+ private class TestPool implements InternalPool {
@Override
public String getPoolOrCacheCancelInProgress() {
@@ -604,19 +684,19 @@ public class QueueManagerJUnitTest {
/**
* A fake factory which returns a list of connections. Fake connections are
created by calling
* addConnection or add error. The factory maintains a queue of connections
which will be handed
- * out when the queue manager calls createClientToServerConnection. If a
error was added, the
+ * out when the queue manager calls createClientToServerConnection. If an
error was added, the
* factory will return null instead.
*/
- private class DummyFactory implements ConnectionFactory {
+ private class TestConnectionFactory implements ConnectionFactory {
- private final LinkedList<DummyConnection> nextConnections = new
LinkedList<>();
+ private final LinkedList<TestConnection> nextConnections = new
LinkedList<>();
private void addError() {
nextConnections.add(null);
}
- private DummyConnection addConnection(int endpointType, int queueSize, int
port) {
- DummyConnection connection = new DummyConnection(endpointType,
queueSize, port);
+ private TestConnection addConnection(int endpointType, int queueSize, int
port) {
+ TestConnection connection = new TestConnection(endpointType, queueSize,
port);
nextConnections.add(connection);
return connection;
}
@@ -682,7 +762,7 @@ public class QueueManagerJUnitTest {
}
}
- private class DummySource implements ConnectionSource {
+ private class TestConnectionSource implements ConnectionSource {
private final AtomicInteger nextPort = new AtomicInteger();
@@ -700,9 +780,7 @@ public class QueueManagerJUnitTest {
@Override
public List<ServerLocation> findServersForQueue(Set excludedServers, int
numServers,
ClientProxyMembershipID proxyId, boolean findDurableQueue) {
- numServers = numServers > factory.nextConnections.size()
- ? factory.nextConnections.size()
- : numServers;
+ numServers = Math.min(numServers, factory.nextConnections.size());
List<ServerLocation> locations = new ArrayList<>(numServers);
for (int i = 0; i < numServers; i++) {
locations.add(findServer(null));
@@ -736,14 +814,14 @@ public class QueueManagerJUnitTest {
}
}
- private class DummyConnection implements Connection {
+ private class TestConnection implements Connection {
private final ServerQueueStatus status;
private final ServerLocation location;
private final Endpoint endpoint;
private boolean keepAlive;
- private DummyConnection(int endpointType, int queueSize, int port) {
+ private TestConnection(int endpointType, int queueSize, int port) {
InternalDistributedMember member = new
InternalDistributedMember("localhost", 555);
status = new ServerQueueStatus((byte) endpointType, queueSize, member,
0);
location = new ServerLocation("localhost", port);
diff --git
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java
index 0dd3f17a96..4474b6ba88 100644
---
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java
+++
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java
@@ -109,6 +109,10 @@ public class QueueManagerImpl implements QueueManager {
private ScheduledExecutorService recoveryThread;
private volatile boolean sentClientReady;
+ void clearQueueConnections() {
+ this.queueConnections = new ConnectionList();
+ }
+
// queueConnections in maintained by using copy-on-write
private volatile ConnectionList queueConnections = new ConnectionList();
private volatile RedundancySatisfierTask redundancySatisfierTask = null;
@@ -309,6 +313,16 @@ public class QueueManagerImpl implements QueueManager {
}
+ /*
+ * This is for test only, it does bad stuff that one should only do in test,
+ * if ever.
+ */
+ @VisibleForTesting
+ public void setSendClientReadyInTestOnly() {
+ synchronized (lock) {
+ sentClientReady = true;
+ }
+ }
@Override
public void readyForEvents(InternalDistributedSystem system) {
@@ -540,6 +554,9 @@ public class QueueManagerImpl implements QueueManager {
// couldn't find a server to make primary
break;
}
+
+ markQueueAsReadyForEvents(primaryQueue);
+
if (!addToConnectionList(primaryQueue, true)) {
excludedServers.add(primaryQueue.getServer());
primaryQueue = null;
@@ -678,21 +695,10 @@ public class QueueManagerImpl implements QueueManager {
if (recoverInterest && queueConnections.getPrimary() == null
&& queueConnections.getBackups().isEmpty()) {
// we lost our queue at some point. We Need to recover
- // interest. This server will be made primary after this method
- // finishes
- // because whoever killed the primary when this method started
- // should
+ // interest. This server will be made primary after this method
finishes
+ // because whoever killed the primary when this method started
should
// have scheduled a task to recover the primary.
isFirstNewConnection = true;
- // TODO - Actually, we need a better check than the above.
There's
- // still a chance
- // that we haven't realized that the primary has died but it is
- // already gone. We should
- // get some information from the queue server about whether it
was
- // able to copy the
- // queue from another server and decide if we need to recover our
- // interest based on
- // that information.
}
}
boolean promotionFailed = false;
@@ -727,7 +733,7 @@ public class QueueManagerImpl implements QueueManager {
}
@VisibleForTesting
- QueueConnectionImpl promoteBackupToPrimary(List<Connection> backups) {
+ public QueueConnectionImpl promoteBackupToPrimary(List<Connection> backups) {
QueueConnectionImpl primary = null;
for (int i = 0; primary == null && i < backups.size(); i++) {
QueueConnectionImpl lastConnection = (QueueConnectionImpl)
backups.get(i);
@@ -805,10 +811,13 @@ public class QueueManagerImpl implements QueueManager {
excludedServers.addAll(servers);
}
- if (primary != null && sentClientReady && primary.sendClientReady()) {
+ return primary;
+ }
+
+ public void markQueueAsReadyForEvents(@NotNull QueueConnectionImpl primary) {
+ if (sentClientReady && primary.sendClientReady()) {
readyForEventsAfterFailover(primary);
}
- return primary;
}
private List<ServerLocation> findQueueServers(Set<ServerLocation>
excludedServers, int count,
@@ -848,7 +857,7 @@ public class QueueManagerImpl implements QueueManager {
* to find a new server.
*/
@VisibleForTesting
- void recoverPrimary(Set<ServerLocation> excludedServers) {
+ public void recoverPrimary(Set<ServerLocation> excludedServers) {
if (pool.getPoolOrCacheCancelInProgress() != null) {
return;
}
@@ -888,7 +897,6 @@ public class QueueManagerImpl implements QueueManager {
}
newPrimary = null;
}
-
}
if (newPrimary != null) {
@@ -915,6 +923,7 @@ public class QueueManagerImpl implements QueueManager {
// could not find a new primary to create
break;
}
+
if (!addToConnectionList(newPrimary, true)) {
excludedServers.add(newPrimary.getServer());
newPrimary = null;
@@ -931,6 +940,9 @@ public class QueueManagerImpl implements QueueManager {
excludedServers.add(newPrimary.getServer());
newPrimary = null;
}
+
+ markQueueAsReadyForEvents(newPrimary);
+
// New primary queue was found from a non backup, alert the affected
cqs
cqsConnected();
}
@@ -986,7 +998,7 @@ public class QueueManagerImpl implements QueueManager {
// so before putting connection need to see if something(crash) happen we
should be able to
// recover from it
@VisibleForTesting
- boolean addToConnectionList(QueueConnectionImpl connection, boolean
isPrimary) {
+ public boolean addToConnectionList(QueueConnectionImpl connection, boolean
isPrimary) {
boolean isBadConnection;
synchronized (lock) {
ClientUpdater cu = connection.getUpdater();
@@ -1029,7 +1041,7 @@ public class QueueManagerImpl implements QueueManager {
}
@VisibleForTesting
- void scheduleRedundancySatisfierIfNeeded(long delay) {
+ public void scheduleRedundancySatisfierIfNeeded(long delay) {
if (shuttingDown) {
return;
}
@@ -1111,10 +1123,9 @@ public class QueueManagerImpl implements QueueManager {
}
private void recoverCqs(Connection recoveredConnection, boolean isDurable) {
- Map cqs = getPool().getRITracker().getCqsMap();
- for (Object o : cqs.entrySet()) {
- Map.Entry e = (Map.Entry) o;
- ClientCQ cqi = (ClientCQ) e.getKey();
+ Map<ClientCQ, Boolean> cqs = getPool().getRITracker().getCqsMap();
+ for (Map.Entry<ClientCQ, Boolean> e : cqs.entrySet()) {
+ ClientCQ cqi = e.getKey();
String name = cqi.getName();
if (pool.getMultiuserAuthentication()) {
UserAttributes.userAttributes
diff --git
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ReadyForEventsOp.java
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ReadyForEventsOp.java
index d93a69ca99..2d8f1515c0 100644
---
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ReadyForEventsOp.java
+++
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ReadyForEventsOp.java
@@ -16,6 +16,7 @@ package org.apache.geode.cache.client.internal;
import org.jetbrains.annotations.NotNull;
+import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.cache.tier.sockets.Message;
@@ -40,7 +41,8 @@ public class ReadyForEventsOp {
// no instances allowed
}
- private static class ReadyForEventsOpImpl extends AbstractOp {
+ @VisibleForTesting
+ public static class ReadyForEventsOpImpl extends AbstractOp {
/**
* @throws org.apache.geode.SerializationException if serialization fails
*/
diff --git
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/RegisterInterestListOp.java
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/RegisterInterestListOp.java
index dc7fb3c001..33ee30a7ff 100644
---
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/RegisterInterestListOp.java
+++
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/RegisterInterestListOp.java
@@ -20,6 +20,7 @@ import java.util.List;
import org.jetbrains.annotations.NotNull;
+import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.InterestResultPolicy;
import
org.apache.geode.cache.client.internal.RegisterInterestOp.RegisterInterestOpImpl;
@@ -105,7 +106,8 @@ public class RegisterInterestListOp {
return uncheckedCast(pool.executeOn(conn, op));
}
- private static class RegisterInterestListOpImpl extends
RegisterInterestOpImpl {
+ @VisibleForTesting
+ public static class RegisterInterestListOpImpl extends
RegisterInterestOpImpl {
/**
* @throws org.apache.geode.SerializationException if serialization fails
*/
diff --git
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/RegisterInterestTracker.java
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/RegisterInterestTracker.java
index e0d1bb1bff..8a08a81ce6 100644
---
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/RegisterInterestTracker.java
+++
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/RegisterInterestTracker.java
@@ -22,14 +22,15 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Stream;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.apache.geode.InternalGemFireError;
+import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.InterestResultPolicy;
-import org.apache.geode.cache.query.CqQuery;
-import org.apache.geode.cache.query.internal.cq.InternalCqQuery;
+import org.apache.geode.cache.query.internal.cq.ClientCQ;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.tier.InterestType;
import org.apache.geode.internal.cache.tier.sockets.UnregisterAllInterest;
@@ -52,7 +53,7 @@ public class RegisterInterestTracker {
private final FailoverInterestList[] failoverInterestLists = new
FailoverInterestList[4];
/** Manages CQs */
- private final ConcurrentMap<CqQuery, Boolean> cqs = new
ConcurrentHashMap<>();
+ private final ConcurrentMap<ClientCQ, Boolean> cqs = new
ConcurrentHashMap<>();
public RegisterInterestTracker() {
failoverInterestLists[interestListIndex] = new FailoverInterestList();
@@ -106,7 +107,8 @@ public class RegisterInterestTracker {
return result;
}
- void addSingleInterest(final @NotNull LocalRegion r, final @NotNull Object
key,
+ @VisibleForTesting
+ public void addSingleInterest(final @NotNull LocalRegion r, final @NotNull
Object key,
final @NotNull InterestType interestType,
final @NotNull InterestResultPolicy pol, final boolean isDurable,
boolean receiveUpdatesAsInvalidates) {
@@ -146,15 +148,15 @@ public class RegisterInterestTracker {
}
}
- public void addCq(InternalCqQuery cqi, boolean isDurable) {
+ public void addCq(ClientCQ cqi, boolean isDurable) {
cqs.put(cqi, isDurable);
}
- public void removeCq(InternalCqQuery cqi) {
+ public void removeCq(ClientCQ cqi) {
cqs.remove(cqi);
}
- Map<CqQuery, Boolean> getCqsMap() {
+ Map<ClientCQ, Boolean> getCqsMap() {
return cqs;
}
@@ -302,6 +304,42 @@ public class RegisterInterestTracker {
return mapOfInterest.get(regionName);
}
+ /**
+ * Iterate InterestTypes searching for any with the input
interestResultPolicy
+ *
+ * @return boolean based on the presence of the selected InterestResultPolicy
+ */
+ public boolean hasInterestsWithResultPolicy(final @NotNull String
regionName, boolean isDurable,
+ final @NotNull InterestResultPolicy interestResultPolicy) {
+ return Stream.of(InterestType.values())
+ .anyMatch(interestType -> hasInterestsWithResultPolicy(regionName,
isDurable,
+ interestResultPolicy, interestType));
+ }
+
+ /**
+ * Check the RegionInterestEntries with receiveUpdatesAsInvalidates both
true and false
+ *
+ * @return boolean based on the presence of the selected InterestResultPolicy
+ */
+ public boolean hasInterestsWithResultPolicy(final @NotNull String
regionName, boolean isDurable,
+ final @NotNull InterestResultPolicy interestResultPolicy,
+ final @NotNull InterestType interestType) {
+
+ return Stream.of(true, false)
+ .anyMatch(receiveUpdatesAsInvalidates ->
hasInterestsWithResultPolicy(regionName, isDurable,
+ interestResultPolicy, interestType, receiveUpdatesAsInvalidates));
+ }
+
+ private boolean hasInterestsWithResultPolicy(final @NotNull String
regionName, boolean isDurable,
+ final @NotNull InterestResultPolicy interestResultPolicy,
+ final @NotNull InterestType interestType, boolean
receiveUpdatesAsInvalidates) {
+ RegionInterestEntry regionInterestEntry =
+ readRegionInterests(regionName, interestType, isDurable,
receiveUpdatesAsInvalidates);
+ return regionInterestEntry != null &&
regionInterestEntry.getInterests().values().stream()
+ .anyMatch(actualInterestResultPolicy -> actualInterestResultPolicy
+ .getOrdinal() == interestResultPolicy.getOrdinal());
+ }
+
/**
* A Holder object for client's register interest, this is required when a
client fails over to
* another server and does register interest based on this Data structure
diff --git
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ServerRegionProxy.java
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ServerRegionProxy.java
index ba4119903b..5b57d77649 100644
---
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ServerRegionProxy.java
+++
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ServerRegionProxy.java
@@ -83,7 +83,7 @@ public class ServerRegionProxy extends ServerProxy implements
ServerRegionDataAc
* Used by tests to create proxies for "fake" regions. Also, used by
ClientStatsManager for admin
* region.
*/
- public ServerRegionProxy(String regionName, PoolImpl pool) {
+ public ServerRegionProxy(String regionName, InternalPool pool) {
super(pool);
region = null;
this.regionName = regionName;
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 8e7cf5e581..dbc6006088 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -5335,7 +5335,7 @@ public class LocalRegion extends AbstractRegion
implements LoaderHelperFactory,
// If the marker has been processed, process this put event normally;
// otherwise, this event occurred in the past and has been stored for a
// durable client. In this case, just invoke the put callbacks.
- if (processedMarker) {
+ if (processedMarker || hasRegisterInterestsWithResultPolicy(true,
InterestResultPolicy.NONE)) {
boolean ifNew = false; // can overwrite an existing key
boolean ifOld = false; // can create a new key
long lastModified = 0L; // use now
@@ -5352,6 +5352,12 @@ public class LocalRegion extends AbstractRegion
implements LoaderHelperFactory,
}
}
+ private boolean hasRegisterInterestsWithResultPolicy(boolean isDurable,
+ InterestResultPolicy interestResultPolicy) {
+ return
getServerProxy().getPool().getRITracker().hasInterestsWithResultPolicy(fullPath,
+ isDurable, interestResultPolicy);
+ }
+
/**
* Perform an invalidate in a bridge client. The op is from the cache server
and should not be
* distributed back to it.
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/LocalRegionUpdateUnitTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/LocalRegionUpdateUnitTest.java
new file mode 100644
index 0000000000..16d5eb34e6
--- /dev/null
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/LocalRegionUpdateUnitTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static
org.apache.geode.internal.statistics.StatisticsClockFactory.disabledClock;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.function.Function;
+
+import org.junit.Rule;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+import org.mockito.quality.Strictness;
+
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.InterestPolicy;
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.SubscriptionAttributes;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import org.apache.geode.cache.client.internal.RegisterInterestTracker;
+import org.apache.geode.cache.client.internal.ServerRegionProxy;
+import org.apache.geode.distributed.internal.DSClock;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.cache.tier.InterestType;
+
+public class LocalRegionUpdateUnitTest {
+ private InternalDataView internalDataView;
+ private LocalRegion region;
+ private RegisterInterestTracker registerInterestTracker;
+
+ @Rule
+ public MockitoRule mockitoRule =
MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
+
+ @BeforeEach
+ public void setUp() {
+ internalDataView = mock(InternalDataView.class);
+
+ EntryEventFactory entryEventFactory = mock(EntryEventFactory.class);
+ InternalCache cache = mock(InternalCache.class);
+ InternalDistributedSystem internalDistributedSystem =
mock(InternalDistributedSystem.class);
+ InternalRegionArguments internalRegionArguments =
mock(InternalRegionArguments.class);
+
+ LocalRegion.RegionMapConstructor regionMapConstructor =
+ mock(LocalRegion.RegionMapConstructor.class);
+ Function<LocalRegion, RegionPerfStats> regionPerfStatsFactory =
localRegion -> {
+ localRegion.getLocalSize();
+ return mock(RegionPerfStats.class);
+ };
+
+ final PoolImpl poolImpl = mock(PoolImpl.class);
+ SubscriptionAttributes subscriptionAttributes =
+ new SubscriptionAttributes(InterestPolicy.ALL);
+
+
+
when(cache.getInternalDistributedSystem()).thenReturn(internalDistributedSystem);
+ when(internalDistributedSystem.getClock()).thenReturn(mock(DSClock.class));
+
+ when(regionMapConstructor.create(any(), any(),
any())).thenReturn(mock(RegionMap.class));
+
+ InternalRegionFactory<Object, Object> regionFactory = new
InternalRegionFactory<>(cache);
+ regionFactory.setDataPolicy(DataPolicy.NORMAL);
+ regionFactory.setPoolName("Pool1");
+ regionFactory.setConcurrencyChecksEnabled(false);
+ regionFactory.setSubscriptionAttributes(subscriptionAttributes);
+ RegionAttributes<Object, Object> regionAttributes =
+ regionFactory.getCreateAttributes();
+
+ registerInterestTracker = new RegisterInterestTracker();
+ when(poolImpl.getRITracker()).thenReturn(registerInterestTracker);
+
+ ServerRegionProxy serverRegionProxy = mock(ServerRegionProxy.class);
+ when(serverRegionProxy.getPool()).thenReturn(poolImpl);
+ LocalRegion.ServerRegionProxyConstructor proxyConstructor = region ->
serverRegionProxy;
+
+ AbstractRegion.PoolFinder poolFinder = poolName -> poolImpl;
+
+ region = spy(new LocalRegion("region", regionAttributes, null, cache,
+ internalRegionArguments, internalDataView, regionMapConstructor,
proxyConstructor,
+ entryEventFactory, poolFinder, regionPerfStatsFactory,
disabledClock()));
+
+ }
+
+ /*
+ * As indicated by the name this code tests the basicBridgeClientUpdate
method's
+ * fork where it checks to see if the interestResultPolicy is NONE, then it
will
+ * call basicUpdate.
+ */
+ @Test
+ public void
basicBridgeClientUpdateChecksInterestResultPolicyNoneThenUpdates() {
+ Object key = new Object();
+ Object value = new Object();
+
+ registerInterestTracker.addSingleInterest(region, key, InterestType.KEY,
+ InterestResultPolicy.NONE, true, false);
+
+ region.basicBridgeClientUpdate(null, key, value, new byte[] {'0'}, true,
+ null, true, false, new EntryEventImpl(),
+ new EventID(new byte[] {1}, 1, 1));
+
+ verify(internalDataView).putEntry(any(), anyBoolean(),
+ anyBoolean(), any(), anyBoolean(), anyLong(), anyBoolean(),
anyBoolean(), anyBoolean());
+
+ }
+
+ /*
+ * As indicated by the name this code tests the basicBridgeClientUpdate
method's
+ * fork where it checks to see if the interestResultPolicy is KEY, then it
will
+ * not call basicUpdate.
+ */
+ @Test
+ public void
basicBridgeClientUpdateChecksInterestResultPolicyKeyThenDoesNotUpdate() {
+ Object key = new Object();
+ Object value = new Object();
+
+ registerInterestTracker.addSingleInterest(region, key, InterestType.KEY,
+ InterestResultPolicy.KEYS, true, false);
+
+ region.basicBridgeClientUpdate(null, key, value, new byte[] {'0'}, true,
+ null, true, false, new EntryEventImpl(),
+ new EventID(new byte[] {1}, 1, 1));
+
+ verify(internalDataView, times(0)).putEntry(any(), anyBoolean(),
+ anyBoolean(), any(), anyBoolean(), anyLong(), anyBoolean(),
anyBoolean(), anyBoolean());
+
+ }
+
+}
diff --git
a/geode-dunit/src/main/java/org/apache/geode/test/dunit/internal/JUnit4DistributedTestCase.java
b/geode-dunit/src/main/java/org/apache/geode/test/dunit/internal/JUnit4DistributedTestCase.java
index c8e23e36bd..2bfcbf1ede 100644
---
a/geode-dunit/src/main/java/org/apache/geode/test/dunit/internal/JUnit4DistributedTestCase.java
+++
b/geode-dunit/src/main/java/org/apache/geode/test/dunit/internal/JUnit4DistributedTestCase.java
@@ -62,7 +62,7 @@ import org.apache.geode.test.version.VersionManager;
* instead.
*/
public abstract class JUnit4DistributedTestCase implements
DistributedTestFixture, Serializable {
- private static final Logger logger = LogService.getLogger();
+ protected static final Logger logger = LogService.getLogger();
/** This VM's connection to the distributed system */
protected static InternalDistributedSystem system;