This is an automated email from the ASF dual-hosted git repository.
jinmeiliao pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new e25fba2a13 GEODE-10271: ConnectionProxyJUnitTest cleanup (#7652)
e25fba2a13 is described below
commit e25fba2a13f15e77af6aafabbe6921b919b95d19
Author: Jinmei Liao <[email protected]>
AuthorDate: Wed May 11 10:48:23 2022 -0700
GEODE-10271: ConnectionProxyJUnitTest cleanup (#7652)
---
.../sockets/ConnectionProxyIntegrationTest.java | 395 +++++++++++
.../tier/sockets/ConnectionProxyJUnitTest.java | 771 ---------------------
2 files changed, 395 insertions(+), 771 deletions(-)
diff --git
a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/ConnectionProxyIntegrationTest.java
b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/ConnectionProxyIntegrationTest.java
new file mode 100644
index 0000000000..212c983bfb
--- /dev/null
+++
b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/ConnectionProxyIntegrationTest.java
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+/*
+ * Created on Feb 3, 2006
+ *
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import static org.apache.geode.cache.client.PoolManager.createFactory;
+import static
org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Map;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.client.NoAvailableServersException;
+import org.apache.geode.cache.client.PoolFactory;
+import org.apache.geode.cache.client.PoolManager;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import
org.apache.geode.cache.client.internal.QueueStateImpl.SequenceIdAndExpirationObject;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.distributed.DistributedSystem;
+import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.ha.ThreadIdentifier;
+import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
+import org.apache.geode.test.junit.rules.ServerStarterRule;
+
+/**
+ * Tests the functionality of operations of AbstractConnectionProxy & its
derived classes.
+ */
+@Category({ClientSubscriptionTest.class})
+public class ConnectionProxyIntegrationTest {
+ DistributedSystem system;
+ Cache cache;
+ PoolImpl proxy = null;
+ SequenceIdAndExpirationObject seo = null;
+ CacheServer server = null;
+
+ final Duration timeoutToVerifyExpiry = Duration.ofSeconds(30);
+ final Duration timeoutToVerifyAckSend = Duration.ofSeconds(30);
+
+ @Rule
+ public ServerStarterRule serverStarter =
+ new ServerStarterRule().withNoCacheServer().withAutoStart();
+
+ @Before
+ public void setUp() throws Exception {
+ cache = serverStarter.getCache();
+ system = cache.getDistributedSystem();
+ }
+
+ @After
+ public void after() throws Exception {
+ if (server != null) {
+ server.stop();
+ }
+ }
+
+ @Test
+ public void connectedServerCount() throws Exception {
+ int port3 = getRandomAvailableTCPPort();
+
+ PoolFactory poolFactory = PoolManager.createFactory();
+ poolFactory.addServer("localhost", port3);
+ poolFactory.setSubscriptionEnabled(false);
+ poolFactory.setReadTimeout(2000);
+ poolFactory.setMinConnections(1);
+ poolFactory.setSocketBufferSize(32768);
+ poolFactory.setRetryAttempts(1);
+ poolFactory.setPingInterval(500);
+ proxy = (PoolImpl) poolFactory.create("clientPool");
+
+ assertThatThrownBy(() -> proxy.acquireConnection())
+ .isInstanceOf(NoAvailableServersException.class);
+
+ assertThat(proxy.getConnectedServerCount()).isEqualTo(0);
+ addCacheServer(port3, 15000);
+ await().untilAsserted(() -> {
+ assertThat(proxy.getConnectedServerCount()).isEqualTo(1);
+ });
+
+ }
+
+ @Test
+ public void threadIdToSequenceIdMapCreation() throws Exception {
+ int port3 = getRandomAvailableTCPPort();
+ addCacheServer(port3, 10000);
+
+ PoolFactory poolFactory = PoolManager.createFactory();
+ poolFactory.addServer("localhost", port3);
+ poolFactory.setSubscriptionEnabled(true);
+ poolFactory.setSubscriptionRedundancy(-1);
+ proxy = (PoolImpl) poolFactory.create("clientPool");
+ assertThat(proxy.getThreadIdToSequenceIdMap()).isNotNull();
+ }
+
+ @Test
+ public void threadIdToSequenceIdMapExpiryPositive() throws Exception {
+ int port3 = getRandomAvailableTCPPort();
+ addCacheServer(port3, 10000);
+
+ PoolFactory poolFactory = PoolManager.createFactory();
+ poolFactory.addServer("localhost", port3);
+ poolFactory.setSubscriptionEnabled(true);
+ poolFactory.setSubscriptionRedundancy(-1);
+ poolFactory.setSubscriptionMessageTrackingTimeout(4000);
+ poolFactory.setSubscriptionAckInterval(2000);
+ proxy = (PoolImpl) poolFactory.create("clientPool");
+
+ EventID eventID = new EventID(new byte[0], 1, 1);
+ assertThat(proxy.verifyIfDuplicate(eventID))
+ .describedAs(" eventID should not be duplicate as it is a new entry")
+ .isFalse();
+
+ verifyExpiry();
+
+ assertThat(proxy.verifyIfDuplicate(eventID))
+ .describedAs(" eventID should not be duplicate as it is a new entry")
+ .isFalse();
+ }
+
+
+ @Test
+ public void threadIdToSequenceIdMapExpiryNegative() throws Exception {
+ int port3 = getRandomAvailableTCPPort();
+ addCacheServer(port3, 10000);
+
+ PoolFactory poolFactory = createFactory();
+ poolFactory.addServer("localhost", port3);
+ poolFactory.setSubscriptionEnabled(true);
+ poolFactory.setSubscriptionRedundancy(-1);
+ poolFactory.setSubscriptionMessageTrackingTimeout(10000);
+
+ proxy = (PoolImpl) poolFactory.create("clientPool");
+
+ final EventID eventID = new EventID(new byte[0], 1, 1);
+ assertThat(proxy.verifyIfDuplicate(eventID))
+ .describedAs(" eventID should not be duplicate as it is a new entry")
+ .isFalse();
+
+ await().untilAsserted(() ->
assertThat(proxy.verifyIfDuplicate(eventID)).isTrue());
+ }
+
+ @Test
+ public void threadIdToSequenceIdMapConcurrency() throws Exception {
+ int port3 = getRandomAvailableTCPPort();
+ addCacheServer(port3, 10000);
+
+ PoolFactory poolFactory = PoolManager.createFactory();
+ poolFactory.addServer("localhost", port3);
+ poolFactory.setSubscriptionEnabled(true);
+ poolFactory.setSubscriptionRedundancy(-1);
+ poolFactory.setSubscriptionMessageTrackingTimeout(5000);
+ poolFactory.setSubscriptionAckInterval(2000);
+ proxy = (PoolImpl) poolFactory.create("clientPool");
+
+ final int EVENT_ID_COUNT = 10000; // why 10,000?
+ EventID[] eventIds = new EventID[EVENT_ID_COUNT];
+ for (int i = 0; i < EVENT_ID_COUNT; i++) {
+ eventIds[i] = new EventID(new byte[0], i, i);
+ assertThat(proxy.verifyIfDuplicate(eventIds[i]))
+ .describedAs("eventIds can never be duplicate, it is being created
for the first time!")
+ .isFalse();
+ }
+ verifyExpiry();
+
+ for (int i = 0; i < EVENT_ID_COUNT; i++) {
+ assertThat(proxy.verifyIfDuplicate(eventIds[i]))
+ .describedAs(
+ "eventIds can not be found to be duplicate since the entry
should have expired! " + i)
+ .isFalse();
+ }
+ }
+
+
+ @Test
+ public void duplicateSeqIdLesserThanCurrentSeqIdBeingIgnored() throws
Exception {
+ int port3 = getRandomAvailableTCPPort();
+ addCacheServer(port3, 10000);
+
+ PoolFactory poolFactory = PoolManager.createFactory();
+ poolFactory.addServer("localhost", port3);
+ poolFactory.setSubscriptionEnabled(true);
+ poolFactory.setSubscriptionRedundancy(-1);
+ poolFactory.setSubscriptionMessageTrackingTimeout(100000);
+ proxy = (PoolImpl) poolFactory.create("clientPool");
+
+ EventID eventId1 = new EventID(new byte[0], 1, 5);
+ assertThat(proxy.verifyIfDuplicate(eventId1))
+ .describedAs("eventId1 can never be duplicate, it is being created for
the first time!")
+ .isFalse();
+
+ EventID eventId2 = new EventID(new byte[0], 1, 2);
+
+ assertThat(proxy.verifyIfDuplicate(eventId2))
+ .describedAs("eventId2 should be duplicate, seqId is less than highest
(5)")
+ .isTrue();
+
+ EventID eventId3 = new EventID(new byte[0], 1, 3);
+
+ assertThat(proxy.verifyIfDuplicate(eventId3))
+ .describedAs("eventId3 should be duplicate, seqId is less than highest
(5)")
+ .isTrue();
+
+ assertThat(!proxy.getThreadIdToSequenceIdMap().isEmpty()).isTrue();
+ proxy.destroy();
+ }
+
+
+ @Test
+ public void cleanCloseOfThreadIdToSeqId() throws Exception {
+ int port3 = getRandomAvailableTCPPort();
+ addCacheServer(port3, 10000);
+
+ PoolFactory poolFactory = PoolManager.createFactory();
+ poolFactory.addServer("localhost", port3);
+ poolFactory.setSubscriptionEnabled(true);
+ poolFactory.setSubscriptionRedundancy(-1);
+ poolFactory.setSubscriptionMessageTrackingTimeout(100000);
+ proxy = (PoolImpl) poolFactory.create("clientPool");
+
+ EventID eventID1 = new EventID(new byte[0], 1, 2);
+
+ assertThat(proxy.verifyIfDuplicate(eventID1))
+ .describedAs("eventID1 can never be duplicate, it is being created for
the first time!")
+ .isFalse();
+
+ EventID eventID2 = new EventID(new byte[0], 1, 3);
+ assertThat(proxy.verifyIfDuplicate(eventID2))
+ .describedAs("eventID2 can never be duplicate, since sequenceId is
greater ")
+ .isFalse();
+
+ assertThat(proxy.verifyIfDuplicate(eventID2))
+ .describedAs("eventID2 had to be a duplicate, since sequenceId is
equal ")
+ .isTrue();
+
+ EventID eventID3 = new EventID(new byte[0], 1, 1);
+ assertThat(proxy.verifyIfDuplicate(eventID3))
+ .describedAs("eventId3 had to be a duplicate, since sequenceId is
lesser")
+ .isTrue();
+ }
+
+ @Test
+ public void twoClientsHavingDifferentThreadIdMaps() throws Exception {
+ int port3 = getRandomAvailableTCPPort();
+ addCacheServer(port3, 10000);
+
+ PoolFactory poolFactory = PoolManager.createFactory();
+ poolFactory.addServer("localhost", port3);
+ poolFactory.setSubscriptionEnabled(true);
+ poolFactory.setSubscriptionRedundancy(-1);
+ poolFactory.setSubscriptionMessageTrackingTimeout(100000);
+
+ PoolImpl proxy1 = (PoolImpl) poolFactory.create("clientPool1");
+ try {
+ PoolImpl proxy2 = (PoolImpl) poolFactory.create("clientPool2");
+ try {
+ Map map1 = proxy1.getThreadIdToSequenceIdMap();
+ Map map2 = proxy2.getThreadIdToSequenceIdMap();
+ assertThat(map1 == map2).isFalse();
+ } finally {
+ proxy2.destroy();
+ }
+ } finally {
+ proxy1.destroy();
+ }
+ }
+
+ @Test
+ public void periodicAckSendByClient() throws Exception {
+ int port = getRandomAvailableTCPPort();
+ addCacheServer(port, null);
+
+ PoolFactory poolFactory = PoolManager.createFactory();
+ poolFactory.addServer("localhost", port);
+ poolFactory.setSubscriptionEnabled(true);
+ poolFactory.setSubscriptionRedundancy(1);
+ poolFactory.setReadTimeout(20000);
+ poolFactory.setSubscriptionMessageTrackingTimeout(15000);
+ poolFactory.setSubscriptionAckInterval(5000);
+
+ proxy = (PoolImpl) poolFactory.create("clientPool");
+
+ EventID eventID = new EventID(new byte[0], 1, 1);
+
+ assertThat(proxy.verifyIfDuplicate(eventID))
+ .describedAs("eventID should not be duplicate as it is a new entry")
+ .isFalse();
+
+ seo = (SequenceIdAndExpirationObject) proxy.getThreadIdToSequenceIdMap()
+ .get(new ThreadIdentifier(new byte[0], 1));
+ assertThat(seo.getAckSend()).isFalse();
+
+ // should send the ack to server
+ seo = (SequenceIdAndExpirationObject) proxy.getThreadIdToSequenceIdMap()
+ .get(new ThreadIdentifier(new byte[0], 1));
+ verifyAckSend(true);
+
+ // New update on same threadId
+ eventID = new EventID(new byte[0], 1, 2);
+ assertThat(proxy.verifyIfDuplicate(eventID))
+ .describedAs("eventID should not be duplicate as it is a new entry")
+ .isFalse();
+
+ seo = (SequenceIdAndExpirationObject) proxy.getThreadIdToSequenceIdMap()
+ .get(new ThreadIdentifier(new byte[0], 1));
+ assertThat(seo.getAckSend()).isFalse();
+
+ // should send another ack to server
+ seo = (SequenceIdAndExpirationObject) proxy.getThreadIdToSequenceIdMap()
+ .get(new ThreadIdentifier(new byte[0], 1));
+ verifyAckSend(true);
+
+ // should expire with the this mentioned.
+ verifyExpiry();
+ }
+
+ // No ack will be send if Redundancy level = 0
+ @Test
+ public void noAckSendByClient() throws Exception {
+ int port = getRandomAvailableTCPPort();
+ addCacheServer(port, null);
+
+ PoolFactory poolFactory = PoolManager.createFactory();
+ poolFactory.addServer("localhost", port);
+ poolFactory.setSubscriptionEnabled(true);
+ poolFactory.setSubscriptionRedundancy(1);
+ poolFactory.setReadTimeout(20000);
+ poolFactory.setSubscriptionMessageTrackingTimeout(8000);
+ poolFactory.setSubscriptionAckInterval(2000);
+
+ proxy = (PoolImpl) poolFactory.create("clientPool");
+
+ EventID eventID = new EventID(new byte[0], 1, 1);
+ assertThat(proxy.verifyIfDuplicate(eventID))
+ .describedAs("eventID should not be duplicate as it is a new entry")
+ .isFalse();
+
+ seo = (SequenceIdAndExpirationObject) proxy.getThreadIdToSequenceIdMap()
+ .get(new ThreadIdentifier(new byte[0], 1));
+ assertThat(seo.getAckSend()).isFalse();
+
+ // should not send an ack as redundancy level = 0;
+ seo = (SequenceIdAndExpirationObject) proxy.getThreadIdToSequenceIdMap()
+ .get(new ThreadIdentifier(new byte[0], 1));
+ verifyAckSend(false);
+
+ // should expire without sending an ack as redundancy level = 0.
+ verifyExpiry();
+ }
+
+ private void verifyAckSend(final boolean expectedAckSend) {
+ await().timeout(timeoutToVerifyAckSend).untilAsserted(() -> {
+ assertThat(seo.getAckSend()).isEqualTo(expectedAckSend);
+ });
+ }
+
+ private void verifyExpiry() {
+ await().timeout(timeoutToVerifyExpiry).untilAsserted(() -> {
+ assertThat(proxy.getThreadIdToSequenceIdMap().size()).isEqualTo(0);
+ });
+ }
+
+ // start the server
+ private void addCacheServer(int serverPort, Integer maxBetweenPings) throws
IOException {
+ server = cache.addCacheServer();
+ if (maxBetweenPings != null) {
+ server.setMaximumTimeBetweenPings(maxBetweenPings);
+ }
+ server.setPort(serverPort);
+ server.start();
+ }
+}
diff --git
a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/ConnectionProxyJUnitTest.java
b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/ConnectionProxyJUnitTest.java
deleted file mode 100644
index d08606ad09..0000000000
---
a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/ConnectionProxyJUnitTest.java
+++ /dev/null
@@ -1,771 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
- * agreements. See the NOTICE file distributed with this work for additional
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
- * or implied. See the License for the specific language governing permissions
and limitations under
- * the License.
- */
-/*
- * Created on Feb 3, 2006
- *
- */
-package org.apache.geode.internal.cache.tier.sockets;
-
-import static org.apache.geode.cache.client.PoolManager.createFactory;
-import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
-import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
-import static
org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.time.Duration;
-import java.util.Map;
-import java.util.Properties;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.geode.cache.AttributesFactory;
-import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.CacheFactory;
-import org.apache.geode.cache.EntryEvent;
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.RegionAttributes;
-import org.apache.geode.cache.Scope;
-import org.apache.geode.cache.client.PoolFactory;
-import org.apache.geode.cache.client.PoolManager;
-import org.apache.geode.cache.client.internal.Connection;
-import org.apache.geode.cache.client.internal.PoolImpl;
-import org.apache.geode.cache.client.internal.PutOp;
-import
org.apache.geode.cache.client.internal.QueueStateImpl.SequenceIdAndExpirationObject;
-import org.apache.geode.cache.server.CacheServer;
-import org.apache.geode.cache.util.CacheListenerAdapter;
-import org.apache.geode.distributed.DistributedSystem;
-import org.apache.geode.internal.cache.EntryEventImpl;
-import org.apache.geode.internal.cache.EventID;
-import org.apache.geode.internal.cache.ha.ThreadIdentifier;
-import org.apache.geode.test.awaitility.GeodeAwaitility;
-import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
-
-/**
- *
- * Tests the functionality of operations of AbstractConnectionProxy & its
derived classes.
- */
-@Category({ClientSubscriptionTest.class})
-public class ConnectionProxyJUnitTest {
- private static final String expectedRedundantErrorMsg =
- "Could not find any server to host redundant client queue.";
- private static final String expectedPrimaryErrorMsg =
- "Could not find any server to host primary client queue.";
-
- DistributedSystem system;
-
- Cache cache;
-
- PoolImpl proxy = null;
-
- SequenceIdAndExpirationObject seo = null;
-
- final Duration timeoutToVerifyExpiry = Duration.ofSeconds(30);
- final Duration timeoutToVerifyAckSend = Duration.ofSeconds(30);
-
- @Before
- public void setUp() throws Exception {
-
- Properties p = new Properties();
- p.setProperty(MCAST_PORT, "0");
- p.setProperty(LOCATORS, "");
- system = DistributedSystem.connect(p);
- cache = CacheFactory.create(system);
- final String addExpectedPEM =
- "<ExpectedException action=add>" + expectedPrimaryErrorMsg +
"</ExpectedException>";
- final String addExpectedREM =
- "<ExpectedException action=add>" + expectedRedundantErrorMsg +
"</ExpectedException>";
- system.getLogWriter().info(addExpectedPEM);
- system.getLogWriter().info(addExpectedREM);
- }
-
- @After
- public void tearDown() throws Exception {
- cache.close();
-
- final String removeExpectedPEM =
- "<ExpectedException action=remove>" + expectedPrimaryErrorMsg +
"</ExpectedException>";
- final String removeExpectedREM =
- "<ExpectedException action=remove>" + expectedRedundantErrorMsg +
"</ExpectedException>";
-
- system.getLogWriter().info(removeExpectedPEM);
- system.getLogWriter().info(removeExpectedREM);
-
- system.disconnect();
- if (proxy != null) {
- proxy.destroy();
- }
- }
-
- /**
- * This test verifies the behaviour of client request when the listener on
the server sits
- * forever. This is done in following steps:<br>
- * 1)create server<br>
- * 2)initialize proxy object and create region for client having a
CacheListener and make
- * afterCreate in the listener to wait infinitely<br>
- * 3)perform a PUT on client by acquiring Connection through proxy<br>
- * 4)Verify that exception occurs due to infinite wait in the listener<br>
- * 5)Verify that above exception occurs sometime after the readTimeout
configured for the client
- * <br>
- *
- */
- @Ignore
- @Test
- public void testListenerOnServerSitForever() throws Exception {
- int port3 = getRandomAvailableTCPPort();
- Region testRegion = null;
-
- CacheServer server = cache.addCacheServer();
- server.setMaximumTimeBetweenPings(10000);
- server.setPort(port3);
- server.start();
-
- try {
- PoolFactory pf = PoolManager.createFactory();
- pf.addServer("localhost", port3);
- pf.setSubscriptionEnabled(false);
- pf.setSubscriptionRedundancy(-1);
- pf.setReadTimeout(2000);
- pf.setSocketBufferSize(32768);
- pf.setRetryAttempts(1);
- pf.setPingInterval(10000);
-
- proxy = (PoolImpl) pf.create("clientPool");
-
- AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.DISTRIBUTED_ACK);
- factory.setCacheListener(new CacheListenerAdapter() {
- @Override
- public void afterCreate(EntryEvent event) {
- synchronized (ConnectionProxyJUnitTest.this) {
- try {
- ConnectionProxyJUnitTest.this.wait();
- } catch (InterruptedException e) {
- fail("interrupted");
- }
- }
- }
- });
- RegionAttributes attrs = factory.create();
- testRegion = cache.createRegion("testregion", attrs);
-
- } catch (Exception ex) {
- ex.printStackTrace();
- fail("Failed to initialize client");
- }
- Connection conn = (proxy).acquireConnection();
- long t1 = 0;
- try {
- t1 = System.currentTimeMillis();
- EntryEventImpl event = new EntryEventImpl((Object) null, false);
- try {
- event.setEventId(new EventID(new byte[] {1}, 1, 1));
- PutOp.execute(conn, proxy, testRegion.getFullPath(), "key1", "val1",
event, null, false);
- } finally {
- event.release();
- }
- fail("Test failed as exception was expected");
- } catch (Exception e) {
- long t2 = System.currentTimeMillis();
- long net = (t2 - t1);
- assertTrue(net / 1000 < 5);
- }
- synchronized (this) {
- notify();
- }
- }
-
- /**
- * Tests the DeadServerMonitor when identifying an Endpoint as alive , does
not create a
- * persistent Ping connection ( i.e sends a CLOSE protocol , if the number
of connections is zero.
- */
- @Test
- public void testDeadServerMonitorPingNature1() {
- int port3 = getRandomAvailableTCPPort();
-
- // final int maxWaitTime = 10000;
- try {
- PoolFactory pf = PoolManager.createFactory();
- pf.addServer("localhost", port3);
- pf.setSubscriptionEnabled(false);
- pf.setReadTimeout(2000);
- pf.setMinConnections(1);
- pf.setSocketBufferSize(32768);
- pf.setRetryAttempts(1);
- pf.setPingInterval(500);
-
- proxy = (PoolImpl) pf.create("clientPool");
- } catch (Exception ex) {
- ex.printStackTrace();
- fail("Failed to initialize client");
- }
- try {
- (proxy).acquireConnection();
- } catch (Exception ok) {
- ok.printStackTrace();
- }
-
- try {
- (proxy).acquireConnection();
- } catch (Exception ok) {
- ok.printStackTrace();
- }
-
- // long start = System.currentTimeMillis();
- assertEquals(0, proxy.getConnectedServerCount());
- // start the server
- CacheServer server = null;
- try {
- try {
- server = cache.addCacheServer();
- server.setMaximumTimeBetweenPings(15000);
- server.setPort(port3);
- server.start();
- } catch (Exception e) {
- e.printStackTrace();
- fail("Failed to create server");
- }
- GeodeAwaitility.await().untilAsserted(() -> {
- assertEquals(1, proxy.getConnectedServerCount());
- });
- } finally {
- if (server != null) {
- server.stop();
- }
- }
- }
-
- /**
- * Tests the DeadServerMonitor when identifying an Endpoint as alive , does
creates a persistent
- * Ping connection ( i.e sends a PING protocol , if the number of
connections is more than zero.
- */
- @Test
- public void testDeadServerMonitorPingNature2() {
- int port3 = getRandomAvailableTCPPort();
-
- // final int maxWaitTime = 10000;
- try {
- PoolFactory pf = PoolManager.createFactory();
- pf.addServer("localhost", port3);
- pf.setSubscriptionEnabled(false);
- pf.setReadTimeout(2000);
- pf.setMinConnections(1);
- pf.setSocketBufferSize(32768);
- pf.setRetryAttempts(1);
- pf.setPingInterval(500);
- proxy = (PoolImpl) pf.create("clientPool");
- } catch (Exception ex) {
- ex.printStackTrace();
- fail("Failed to initialize client");
- }
- // let LiveServerMonitor detect it as alive as the numConnection is more
than zero
-
- // long start = System.currentTimeMillis();
- assertEquals(0, proxy.getConnectedServerCount());
- // start the server
- CacheServer server = null;
- try {
- try {
- server = cache.addCacheServer();
- server.setMaximumTimeBetweenPings(15000);
- server.setPort(port3);
- server.start();
- } catch (Exception e) {
- e.printStackTrace();
- fail("Failed to create server");
- }
- GeodeAwaitility.await().untilAsserted(() -> {
- assertEquals(1, proxy.getConnectedServerCount());
- });
- } finally {
- if (server != null) {
- server.stop();
- }
- }
- }
-
- @Test
- public void testThreadIdToSequenceIdMapCreation() {
- int port3 = getRandomAvailableTCPPort();
- CacheServer server = null;
- try {
- try {
- server = cache.addCacheServer();
- server.setMaximumTimeBetweenPings(10000);
- server.setPort(port3);
- server.start();
- } catch (Exception e) {
- e.printStackTrace();
- fail("Failed to create server");
- }
- try {
- PoolFactory pf = PoolManager.createFactory();
- pf.addServer("localhost", port3);
- pf.setSubscriptionEnabled(true);
- pf.setSubscriptionRedundancy(-1);
- proxy = (PoolImpl) pf.create("clientPool");
- if (proxy.getThreadIdToSequenceIdMap() == null) {
- fail(" ThreadIdToSequenceIdMap is null. ");
- }
- } catch (Exception ex) {
- ex.printStackTrace();
- fail("Failed to initialize client");
- }
- } finally {
- if (server != null) {
- try {
- Thread.sleep(500);
- } catch (InterruptedException ie) {
- fail("interrupted");
- }
- server.stop();
- }
- }
- }
-
- @Test
- public void testThreadIdToSequenceIdMapExpiryPositive() {
- int port3 = getRandomAvailableTCPPort();
- CacheServer server = null;
- try {
- try {
- server = cache.addCacheServer();
- server.setMaximumTimeBetweenPings(10000);
- server.setPort(port3);
- server.start();
- } catch (Exception e) {
- e.printStackTrace();
- fail("Failed to create server");
- }
- try {
- PoolFactory pf = PoolManager.createFactory();
- pf.addServer("localhost", port3);
- pf.setSubscriptionEnabled(true);
- pf.setSubscriptionRedundancy(-1);
- pf.setSubscriptionMessageTrackingTimeout(4000);
- pf.setSubscriptionAckInterval(2000);
- proxy = (PoolImpl) pf.create("clientPool");
-
- EventID eid = new EventID(new byte[0], 1, 1);
- if (proxy.verifyIfDuplicate(eid)) {
- fail(" eid should not be duplicate as it is a new entry");
- }
-
- verifyExpiry();
-
- if (proxy.verifyIfDuplicate(eid)) {
- fail(" eid should not be duplicate as the previous entry should have
expired ");
- }
-
- } catch (Exception ex) {
- ex.printStackTrace();
- fail("Failed to initialize client");
- }
- } finally {
- if (server != null) {
- server.stop();
- }
- }
- }
-
-
- @Test
- public void testThreadIdToSequenceIdMapExpiryNegative() {
- int port3 = getRandomAvailableTCPPort();
- CacheServer server = null;
- try {
- try {
- server = cache.addCacheServer();
- server.setMaximumTimeBetweenPings(10000);
- server.setPort(port3);
- server.start();
- } catch (Exception e) {
- e.printStackTrace();
- fail("Failed to create server");
- }
- try {
- PoolFactory pf = createFactory();
- pf.addServer("localhost", port3);
- pf.setSubscriptionEnabled(true);
- pf.setSubscriptionRedundancy(-1);
- pf.setSubscriptionMessageTrackingTimeout(10000);
-
- proxy = (PoolImpl) pf.create("clientPool");
-
- final EventID eid = new EventID(new byte[0], 1, 1);
- if (proxy.verifyIfDuplicate(eid)) {
- fail(" eid should not be duplicate as it is a new entry");
- }
-
- GeodeAwaitility.await().untilAsserted(() ->
assertTrue(proxy.verifyIfDuplicate(eid)));
- } catch (Exception ex) {
- ex.printStackTrace();
- fail("Failed to initialize client");
- }
- } finally {
- if (server != null) {
- server.stop();
- }
- }
- }
-
- @Test
- public void testThreadIdToSequenceIdMapConcurrency() {
- int port3 = getRandomAvailableTCPPort();
- CacheServer server = null;
- try {
- try {
- server = cache.addCacheServer();
- server.setMaximumTimeBetweenPings(10000);
- server.setPort(port3);
- server.start();
- } catch (Exception e) {
- e.printStackTrace();
- fail("Failed to create server");
- }
- try {
- PoolFactory pf = PoolManager.createFactory();
- pf.addServer("localhost", port3);
- pf.setSubscriptionEnabled(true);
- pf.setSubscriptionRedundancy(-1);
- pf.setSubscriptionMessageTrackingTimeout(5000);
- pf.setSubscriptionAckInterval(2000);
- proxy = (PoolImpl) pf.create("clientPool");
-
- final int EVENT_ID_COUNT = 10000; // why 10,000?
- EventID[] eid = new EventID[EVENT_ID_COUNT];
- for (int i = 0; i < EVENT_ID_COUNT; i++) {
- eid[i] = new EventID(new byte[0], i, i);
- if (proxy.verifyIfDuplicate(eid[i])) {
- fail(" eid can never be duplicate, it is being created for the
first time! ");
- }
- }
- verifyExpiry();
-
- for (int i = 0; i < EVENT_ID_COUNT; i++) {
- if (proxy.verifyIfDuplicate(eid[i])) {
- fail(
- " eid can not be found to be duplicate since the entry should
have expired! " + i);
- }
- }
- } catch (Exception ex) {
- ex.printStackTrace();
- fail("Failed to initialize client");
- }
- } finally {
- if (server != null) {
- server.stop();
- }
- }
- }
-
-
-
- @Test
- public void testDuplicateSeqIdLesserThanCurrentSeqIdBeingIgnored() {
- int port3 = getRandomAvailableTCPPort();
- CacheServer server = null;
- try {
- try {
- server = cache.addCacheServer();
- server.setMaximumTimeBetweenPings(10000);
- server.setPort(port3);
- server.start();
- } catch (Exception e) {
- e.printStackTrace();
- fail("Failed to create server");
- }
- try {
- PoolFactory pf = PoolManager.createFactory();
- pf.addServer("localhost", port3);
- pf.setSubscriptionEnabled(true);
- pf.setSubscriptionRedundancy(-1);
- pf.setSubscriptionMessageTrackingTimeout(100000);
- proxy = (PoolImpl) pf.create("clientPool");
-
- EventID eid1 = new EventID(new byte[0], 1, 5);
- if (proxy.verifyIfDuplicate(eid1)) {
- fail(" eid1 can never be duplicate, it is being created for the
first time! ");
- }
-
- EventID eid2 = new EventID(new byte[0], 1, 2);
-
- if (!proxy.verifyIfDuplicate(eid2)) {
- fail(" eid2 should be duplicate, seqId is less than highest (5)");
- }
-
- EventID eid3 = new EventID(new byte[0], 1, 3);
-
- if (!proxy.verifyIfDuplicate(eid3)) {
- fail(" eid3 should be duplicate, seqId is less than highest (5)");
- }
-
- assertTrue(!proxy.getThreadIdToSequenceIdMap().isEmpty());
- proxy.destroy();
- } catch (Exception ex) {
- ex.printStackTrace();
- fail("Failed to initialize client");
- }
- } finally {
- if (server != null) {
- server.stop();
- }
- }
- }
-
-
-
- @Test
- public void testCleanCloseOfThreadIdToSeqId() {
- int port3 = getRandomAvailableTCPPort();
- CacheServer server = null;
- try {
- try {
- server = cache.addCacheServer();
- server.setMaximumTimeBetweenPings(10000);
- server.setPort(port3);
- server.start();
- } catch (Exception e) {
- e.printStackTrace();
- fail("Failed to create server");
- }
- try {
- PoolFactory pf = PoolManager.createFactory();
- pf.addServer("localhost", port3);
- pf.setSubscriptionEnabled(true);
- pf.setSubscriptionRedundancy(-1);
- pf.setSubscriptionMessageTrackingTimeout(100000);
- proxy = (PoolImpl) pf.create("clientPool");
-
- EventID eid1 = new EventID(new byte[0], 1, 2);
- if (proxy.verifyIfDuplicate(eid1)) {
- fail(" eid can never be duplicate, it is being created for the first
time! ");
- }
- EventID eid2 = new EventID(new byte[0], 1, 3);
- if (proxy.verifyIfDuplicate(eid2)) {
- fail(" eid can never be duplicate, since sequenceId is greater ");
- }
-
- if (!proxy.verifyIfDuplicate(eid2)) {
- fail(" eid had to be a duplicate, since sequenceId is equal ");
- }
- EventID eid3 = new EventID(new byte[0], 1, 1);
- if (!proxy.verifyIfDuplicate(eid3)) {
- fail(" eid had to be a duplicate, since sequenceId is lesser ");
- }
- } catch (Exception ex) {
- ex.printStackTrace();
- fail("Failed to initialize client");
- }
- } finally {
- if (server != null) {
- try {
- Thread.sleep(500);
- } catch (InterruptedException ie) {
- fail("interrupted");
- }
- server.stop();
- }
- }
- }
-
- @Test
- public void testTwoClientsHavingDifferentThreadIdMaps() {
- int port3 = getRandomAvailableTCPPort();
- CacheServer server = null;
- try {
- try {
- server = cache.addCacheServer();
- server.setMaximumTimeBetweenPings(10000);
- server.setPort(port3);
- server.start();
- } catch (Exception e) {
- e.printStackTrace();
- fail("Failed to create server");
- }
- try {
- PoolFactory pf = PoolManager.createFactory();
- pf.addServer("localhost", port3);
- pf.setSubscriptionEnabled(true);
- pf.setSubscriptionRedundancy(-1);
- pf.setSubscriptionMessageTrackingTimeout(100000);
-
- PoolImpl proxy1 = (PoolImpl) pf.create("clientPool1");
- try {
- PoolImpl proxy2 = (PoolImpl) pf.create("clientPool2");
- try {
-
- Map map1 = proxy1.getThreadIdToSequenceIdMap();
- Map map2 = proxy2.getThreadIdToSequenceIdMap();
-
- assertTrue(!(map1 == map2));
-
- } finally {
- proxy2.destroy();
- }
- } finally {
- proxy1.destroy();
- }
- } catch (Exception ex) {
- ex.printStackTrace();
- fail("Failed to initialize client");
- }
- } finally {
- if (server != null) {
- server.stop();
- }
- }
- }
-
- @Test
- public void testPeriodicAckSendByClient() {
- int port = getRandomAvailableTCPPort();
- CacheServer server = null;
- try {
- try {
- server = cache.addCacheServer();
- server.setPort(port);
- server.start();
- } catch (Exception e) {
- e.printStackTrace();
- fail("Failed to create server");
- }
- try {
- PoolFactory pf = PoolManager.createFactory();
- pf.addServer("localhost", port);
- pf.setSubscriptionEnabled(true);
- pf.setSubscriptionRedundancy(1);
- pf.setReadTimeout(20000);
- pf.setSubscriptionMessageTrackingTimeout(15000);
- pf.setSubscriptionAckInterval(5000);
-
- proxy = (PoolImpl) pf.create("clientPool");
-
- EventID eid = new EventID(new byte[0], 1, 1);
-
- if (proxy.verifyIfDuplicate(eid)) {
- fail(" eid should not be duplicate as it is a new entry");
- }
-
- seo = (SequenceIdAndExpirationObject)
proxy.getThreadIdToSequenceIdMap()
- .get(new ThreadIdentifier(new byte[0], 1));
- assertFalse(seo.getAckSend());
-
- // should send the ack to server
- seo = (SequenceIdAndExpirationObject)
proxy.getThreadIdToSequenceIdMap()
- .get(new ThreadIdentifier(new byte[0], 1));
- verifyAckSend(true);
-
- // New update on same threadId
- eid = new EventID(new byte[0], 1, 2);
- if (proxy.verifyIfDuplicate(eid)) {
- fail(" eid should not be duplicate as it is a new entry");
- }
- seo = (SequenceIdAndExpirationObject)
proxy.getThreadIdToSequenceIdMap()
- .get(new ThreadIdentifier(new byte[0], 1));
- assertFalse(seo.getAckSend());
-
- // should send another ack to server
- seo = (SequenceIdAndExpirationObject)
proxy.getThreadIdToSequenceIdMap()
- .get(new ThreadIdentifier(new byte[0], 1));
- verifyAckSend(true);
-
- // should expire with the this mentioned.
- verifyExpiry();
- } catch (Exception ex) {
- ex.printStackTrace();
- fail("Test testPeriodicAckSendByClient Failed");
- }
- } finally {
- if (server != null) {
- server.stop();
- }
- }
- }
-
- // No ack will be send if Redundancy level = 0
- @Test
- public void testNoAckSendByClient() {
- int port = getRandomAvailableTCPPort();
- CacheServer server = null;
- try {
- try {
- server = cache.addCacheServer();
- server.setPort(port);
- server.start();
- } catch (Exception e) {
- e.printStackTrace();
- fail("Failed to create server");
- }
- try {
- PoolFactory pf = PoolManager.createFactory();
- pf.addServer("localhost", port);
- pf.setSubscriptionEnabled(true);
- pf.setSubscriptionRedundancy(1);
- pf.setReadTimeout(20000);
- pf.setSubscriptionMessageTrackingTimeout(8000);
- pf.setSubscriptionAckInterval(2000);
-
- proxy = (PoolImpl) pf.create("clientPool");
-
- EventID eid = new EventID(new byte[0], 1, 1);
-
- if (proxy.verifyIfDuplicate(eid)) {
- fail(" eid should not be duplicate as it is a new entry");
- }
-
- seo = (SequenceIdAndExpirationObject)
proxy.getThreadIdToSequenceIdMap()
- .get(new ThreadIdentifier(new byte[0], 1));
- assertFalse(seo.getAckSend());
-
- // should not send an ack as redundancy level = 0;
- seo = (SequenceIdAndExpirationObject)
proxy.getThreadIdToSequenceIdMap()
- .get(new ThreadIdentifier(new byte[0], 1));
- verifyAckSend(false);
-
- // should expire without sending an ack as redundancy level = 0.
- verifyExpiry();
- }
-
- catch (Exception ex) {
- ex.printStackTrace();
- fail("Test testPeriodicAckSendByClient Failed");
- }
- } finally {
- if (server != null) {
- server.stop();
- }
- }
- }
-
- private void verifyAckSend(final boolean expectedAckSend) {
- GeodeAwaitility.await().timeout(timeoutToVerifyAckSend).untilAsserted(()
-> {
- assertEquals(expectedAckSend, seo.getAckSend());
- });
- }
-
- private void verifyExpiry() {
- GeodeAwaitility.await().timeout(timeoutToVerifyExpiry).untilAsserted(() ->
{
- assertEquals(0, proxy.getThreadIdToSequenceIdMap().size());
- });
- }
-
-}