[
https://issues.apache.org/jira/browse/GEODE-3895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236671#comment-16236671
]
ASF GitHub Bot commented on GEODE-3895:
---------------------------------------
galen-pivotal closed pull request #1001: GEODE-3895: Add Handshake/Message
version byte
URL: https://github.com/apache/geode/pull/1001
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
index a673c036b1..e6a797c3dc 100755
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
@@ -63,7 +63,8 @@
import org.apache.geode.internal.cache.client.protocol.ClientProtocolService;
import
org.apache.geode.internal.cache.client.protocol.ClientProtocolServiceLoader;
import org.apache.geode.internal.cache.tier.sockets.HandShake;
-import
org.apache.geode.internal.cache.tier.sockets.ServiceLoadingFailureException;
+import
org.apache.geode.internal.cache.client.protocol.exception.ServiceLoadingFailureException;
+import
org.apache.geode.internal.cache.client.protocol.exception.ServiceVersionNotFoundException;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.net.SocketCreator;
import org.apache.geode.internal.net.SocketCreatorFactory;
@@ -385,8 +386,9 @@ private void processRequest(final Socket socket) {
if (input.readUnsignedByte() == PROTOBUF_CLIENT_SERVER_PROTOCOL
&& Boolean.getBoolean("geode.feature-protobuf-protocol")) {
try {
+ int protocolVersion = input.readUnsignedByte();
ClientProtocolService clientProtocolService =
- clientProtocolServiceLoader.lookupService();
+ clientProtocolServiceLoader.lookupService(protocolVersion);
clientProtocolService.initializeStatistics("LocatorStats",
internalLocator.getDistributedSystem());
try (ClientProtocolProcessor pipeline =
@@ -400,6 +402,11 @@ private void processRequest(final Socket socket) {
log.error("There was an error looking up the client protocol
service", e);
socket.close();
throw new IOException("There was an error looking up the client
protocol service", e);
+ } catch (ServiceVersionNotFoundException e) {
+ log.error("Unable to find service matching the client protocol
version byte", e);
+ socket.close();
+ throw new IOException(
+ "Unable to find service matching the client protocol version
byte", e);
}
} else {
rejectUnknownProtocolConnection(socket, gossipVersion);
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/ClientProtocolService.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/ClientProtocolService.java
index 7f50c9fbe6..b86d76a5bf 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/ClientProtocolService.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/ClientProtocolService.java
@@ -38,4 +38,6 @@
* Create a locator processor. The locator does not currently provide any
authentication.
*/
ClientProtocolProcessor createProcessorForLocator(InternalLocator locator);
+
+ int getServiceProtocolVersion();
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/ClientProtocolServiceLoader.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/ClientProtocolServiceLoader.java
index c7ba6e0dce..4b66062ff0 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/ClientProtocolServiceLoader.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/ClientProtocolServiceLoader.java
@@ -19,7 +19,8 @@
import java.util.List;
import java.util.ServiceLoader;
-import
org.apache.geode.internal.cache.tier.sockets.ServiceLoadingFailureException;
+import
org.apache.geode.internal.cache.client.protocol.exception.ServiceLoadingFailureException;
+import
org.apache.geode.internal.cache.client.protocol.exception.ServiceVersionNotFoundException;
public class ClientProtocolServiceLoader {
private final List<ClientProtocolService> clientProtocolServices;
@@ -38,7 +39,7 @@ public ClientProtocolServiceLoader() {
return resultList;
}
- public ClientProtocolService lookupService() {
+ public ClientProtocolService lookupService(int protocolVersion) {
if (clientProtocolServices.isEmpty()) {
throw new ServiceLoadingFailureException(
"There is no ClientProtocolService implementation found in JVM");
@@ -48,6 +49,11 @@ public ClientProtocolService lookupService() {
throw new ServiceLoadingFailureException(
"There is more than one ClientProtocolService implementation found
in JVM; aborting");
}
- return clientProtocolServices.get(0);
+ ClientProtocolService clientProtocolService =
clientProtocolServices.get(0);
+ if (clientProtocolService.getServiceProtocolVersion() != protocolVersion) {
+ throw new ServiceVersionNotFoundException(
+ "The ClientProtocolService doesn't match the requested version.");
+ }
+ return clientProtocolService;
}
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServiceLoadingFailureException.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/exception/ServiceLoadingFailureException.java
similarity index 94%
rename from
geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServiceLoadingFailureException.java
rename to
geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/exception/ServiceLoadingFailureException.java
index be39672067..2c448456f3 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServiceLoadingFailureException.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/exception/ServiceLoadingFailureException.java
@@ -13,7 +13,7 @@
* the License.
*/
-package org.apache.geode.internal.cache.tier.sockets;
+package org.apache.geode.internal.cache.client.protocol.exception;
import org.apache.geode.GemFireException;
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/exception/ServiceVersionNotFoundException.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/exception/ServiceVersionNotFoundException.java
new file mode 100644
index 0000000000..d6af72712e
--- /dev/null
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/exception/ServiceVersionNotFoundException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.cache.client.protocol.exception;
+
+import org.apache.geode.GemFireException;
+
+/**
+ * Indicates that no service is found for the given service version.
+ */
+public class ServiceVersionNotFoundException extends GemFireException {
+ public ServiceVersionNotFoundException(String message) {
+ super(message);
+ }
+
+ public ServiceVersionNotFoundException(Exception cause) {
+ super(cause);
+ }
+
+ public ServiceVersionNotFoundException(String message, Exception cause) {
+ super(message, cause);
+ }
+}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
index a6fc973d02..7070b3799c 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
@@ -25,6 +25,8 @@
import org.apache.geode.internal.cache.client.protocol.ClientProtocolProcessor;
import org.apache.geode.internal.cache.client.protocol.ClientProtocolService;
import
org.apache.geode.internal.cache.client.protocol.ClientProtocolServiceLoader;
+import
org.apache.geode.internal.cache.client.protocol.exception.ServiceLoadingFailureException;
+import
org.apache.geode.internal.cache.client.protocol.exception.ServiceVersionNotFoundException;
import org.apache.geode.internal.cache.tier.Acceptor;
import org.apache.geode.internal.cache.tier.CachedRegionHelper;
import org.apache.geode.internal.security.SecurityService;
@@ -42,9 +44,9 @@ public ServerConnectionFactory() {
private synchronized ClientProtocolService getClientProtocolService(
- StatisticsFactory statisticsFactory, String serverName) {
+ StatisticsFactory statisticsFactory, String serverName, int
protocolVersion) {
if (clientProtocolService == null) {
- clientProtocolService = clientProtocolServiceLoader.lookupService();
+ clientProtocolService =
clientProtocolServiceLoader.lookupService(protocolVersion);
clientProtocolService.initializeStatistics(serverName,
statisticsFactory);
}
return clientProtocolService;
@@ -58,11 +60,15 @@ public ServerConnection makeServerConnection(Socket socket,
InternalCache cache,
if (!Boolean.getBoolean("geode.feature-protobuf-protocol")) {
throw new IOException("Server received unknown communication mode: " +
communicationMode);
} else {
+ int protocolVersion = readProtocolVersionByte(socket);
try {
return createGenericProtocolServerConnection(socket, cache, helper,
stats, hsTimeout,
- socketBufferSize, communicationModeStr, communicationMode,
acceptor, securityService);
+ socketBufferSize, communicationModeStr, communicationMode,
acceptor, securityService,
+ protocolVersion);
} catch (ServiceLoadingFailureException ex) {
throw new IOException("Could not load protobuf client protocol", ex);
+ } catch (ServiceVersionNotFoundException ex) {
+ throw new IOException("No service matching provided version byte",
ex);
}
}
} else {
@@ -71,12 +77,16 @@ public ServerConnection makeServerConnection(Socket socket,
InternalCache cache,
}
}
+ private int readProtocolVersionByte(Socket socket) throws IOException {
+ return socket.getInputStream().read();
+ }
+
private ServerConnection createGenericProtocolServerConnection(Socket
socket, InternalCache cache,
CachedRegionHelper helper, CacheServerStats stats, int hsTimeout, int
socketBufferSize,
String communicationModeStr, byte communicationMode, Acceptor acceptor,
- SecurityService securityService) {
- ClientProtocolService service =
- getClientProtocolService(cache.getDistributedSystem(),
acceptor.getServerName());
+ SecurityService securityService, int protocolVersion) {
+ ClientProtocolService service =
getClientProtocolService(cache.getDistributedSystem(),
+ acceptor.getServerName(), protocolVersion);
ClientProtocolProcessor processor = service.createProcessorForCache(cache,
securityService);
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
index a3f34b080e..e11b206564 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
@@ -16,6 +16,7 @@
package org.apache.geode.internal.cache.tier.sockets;
import org.apache.geode.internal.cache.InternalCache;
+import
org.apache.geode.internal.cache.client.protocol.exception.ServiceLoadingFailureException;
import org.apache.geode.internal.cache.tier.CommunicationMode;
import org.apache.geode.internal.cache.tier.CachedRegionHelper;
import org.apache.geode.internal.security.SecurityService;
@@ -28,6 +29,7 @@
import org.junit.experimental.categories.Category;
import java.io.IOException;
+import java.io.InputStream;
import java.net.InetAddress;
import java.net.Socket;
@@ -108,6 +110,9 @@ private ServerConnection
serverConnectionMockedExceptForCommunicationMode(byte c
throws IOException {
Socket socketMock = mock(Socket.class);
when(socketMock.getInetAddress()).thenReturn(InetAddress.getByName("localhost"));
+ InputStream streamMock = mock(InputStream.class);
+ when(streamMock.read()).thenReturn(1);
+ when(socketMock.getInputStream()).thenReturn(streamMock);
return new ServerConnectionFactory().makeServerConnection(socketMock,
mock(InternalCache.class),
mock(CachedRegionHelper.class), mock(CacheServerStats.class), 0, 0,
"", communicationMode,
diff --git
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufProtocolService.java
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufProtocolService.java
index c87398fa80..669787d6df 100644
---
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufProtocolService.java
+++
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufProtocolService.java
@@ -59,4 +59,9 @@ ProtocolClientStatistics getStatistics() {
public ClientProtocolProcessor createProcessorForLocator(InternalLocator
locator) {
return new ProtobufLocatorPipeline(protobufStreamProcessor,
getStatistics(), locator);
}
+
+ @Override
+ public int getServiceProtocolVersion() {
+ return ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE;
+ }
}
diff --git
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/AuthenticationIntegrationTest.java
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/AuthenticationIntegrationTest.java
index 1e632ca607..2977c5ae9e 100644
---
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/AuthenticationIntegrationTest.java
+++
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/AuthenticationIntegrationTest.java
@@ -20,6 +20,7 @@
import org.apache.geode.distributed.ConfigurationProperties;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.tier.CommunicationMode;
import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
import org.apache.geode.internal.protocol.protobuf.ConnectionAPI;
import org.apache.geode.internal.protocol.protobuf.RegionAPI;
@@ -88,7 +89,8 @@ public void setupCacheServerAndSocket() throws Exception {
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
outputStream = socket.getOutputStream();
inputStream = socket.getInputStream();
- outputStream.write(110);
+
outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+
outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
protobufProtocolSerializer = new ProtobufProtocolSerializer();
diff --git
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/AuthorizationIntegrationTest.java
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/AuthorizationIntegrationTest.java
index 8f2390b997..4c0cd5286b 100644
---
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/AuthorizationIntegrationTest.java
+++
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/AuthorizationIntegrationTest.java
@@ -18,6 +18,7 @@
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.internal.cache.tier.CommunicationMode;
import
org.apache.geode.internal.protocol.exception.InvalidProtocolMessageException;
import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
import org.apache.geode.internal.protocol.protobuf.ConnectionAPI;
@@ -107,7 +108,8 @@ public void setUp() throws IOException,
InvalidProtocolMessageException {
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
outputStream = socket.getOutputStream();
inputStream = socket.getInputStream();
- outputStream.write(110);
+
outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+
outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
serializationService = new ProtobufSerializationService();
protobufProtocolSerializer = new ProtobufProtocolSerializer();
diff --git
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/HandshakeIntegrationTest.java
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/HandshakeIntegrationTest.java
new file mode 100644
index 0000000000..d3b78fd34c
--- /dev/null
+++
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/HandshakeIntegrationTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.protocol;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SocketChannel;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.logging.log4j.core.config.AwaitCompletionReliabilityStrategy;
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.RestoreSystemProperties;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.internal.cache.tier.CommunicationMode;
+import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
+import org.apache.geode.internal.protocol.protobuf.ConnectionAPI;
+import
org.apache.geode.internal.protocol.protobuf.serializer.ProtobufProtocolSerializer;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+
+@Category(IntegrationTest.class)
+public class HandshakeIntegrationTest {
+ private Cache cache;
+
+ @Rule
+ public final RestoreSystemProperties restoreSystemProperties = new
RestoreSystemProperties();
+
+ private OutputStream outputStream;
+ private InputStream inputStream;
+ private ProtobufProtocolSerializer protobufProtocolSerializer;
+ private Socket socket;
+ private SocketChannel socketChannel;
+
+ @Before
+ public void setUp() throws Exception {
+ System.setProperty("geode.feature-protobuf-protocol", "true");
+
+ // Create a cache with security disabled
+ Properties properties = new Properties();
+ CacheFactory cacheFactory = new CacheFactory(properties);
+ cacheFactory.set(ConfigurationProperties.MCAST_PORT, "0");
+ cacheFactory.set(ConfigurationProperties.USE_CLUSTER_CONFIGURATION,
"false");
+ cacheFactory.set(ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION,
"false");
+ cache = cacheFactory.create();
+
+ CacheServer cacheServer = cache.addCacheServer();
+ int cacheServerPort = AvailablePortHelper.getRandomAvailableTCPPort();
+ cacheServer.setPort(cacheServerPort);
+ cacheServer.start();
+
+ InetSocketAddress localhost = new InetSocketAddress("localhost",
cacheServerPort);
+ socketChannel = SocketChannel.open(localhost);
+
+ socket = socketChannel.socket();
+
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
+ outputStream = socket.getOutputStream();
+ inputStream = socket.getInputStream();
+
+ protobufProtocolSerializer = new ProtobufProtocolSerializer();
+ }
+
+ @After
+ public void tearDown() {
+ if (cache != null) {
+ cache.close();
+ }
+ }
+
+ @Test
+ public void testNormalHandshakeSucceeds() throws Exception {
+
outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+
outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
+
+ ClientProtocol.Message.newBuilder()
+ .setRequest(ClientProtocol.Request.newBuilder()
+ .setHandshakeRequest(ConnectionAPI.HandshakeRequest.newBuilder()
+
.setMajorVersion(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE)
+
.setMinorVersion(ConnectionAPI.MinorVersions.CURRENT_MINOR_VERSION_VALUE)))
+ .build().writeDelimitedTo(outputStream);
+ ClientProtocol.Message handshakeResponse =
protobufProtocolSerializer.deserialize(inputStream);
+
assertTrue(handshakeResponse.getResponse().getHandshakeResponse().getHandshakePassed());
+ }
+
+ @Test
+ public void testInvalidMajorVersionBreaksConnection() throws Exception {
+
outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+
outputStream.write(ConnectionAPI.MajorVersions.INVALID_MAJOR_VERSION_VALUE);
+
+ // Verify that connection is closed
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> {
+ try {
+ assertEquals(-1, socket.getInputStream().read()); // EOF implies
disconnected.
+ return true;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+}
diff --git
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheConnectionJUnitTest.java
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheConnectionJUnitTest.java
index 4257911e0a..d61ac868f4 100644
---
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheConnectionJUnitTest.java
+++
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheConnectionJUnitTest.java
@@ -25,6 +25,7 @@
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.admin.SSLConfig;
import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.tier.CommunicationMode;
import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl;
import org.apache.geode.internal.net.SocketCreator;
import org.apache.geode.internal.net.SocketCreatorFactory;
@@ -57,6 +58,7 @@
import java.net.Socket;
import java.util.Arrays;
import java.util.Collection;
+import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
@@ -121,7 +123,6 @@ public void setup() throws Exception {
cacheFactory.set(ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION,
"false");
cacheFactory.set(ConfigurationProperties.USE_CLUSTER_CONFIGURATION,
"false");
cacheFactory.set(ConfigurationProperties.STATISTIC_SAMPLE_RATE, "100");
- cacheFactory.setSecurityManager(null);
cache = cacheFactory.create();
CacheServer cacheServer = cache.addCacheServer();
@@ -141,7 +142,8 @@ public void setup() throws Exception {
}
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
outputStream = socket.getOutputStream();
- outputStream.write(110);
+
outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+
outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
serializationService = new ProtobufSerializationService();
}
@@ -196,8 +198,11 @@ public void testBasicMessagesAndStats() throws Exception {
@Test
public void testConnectionCountIsProperlyDecremented() throws Exception {
- CacheServer cacheServer =
this.cache.getCacheServers().stream().findFirst().get();
+ List<CacheServer> cacheServers = this.cache.getCacheServers();
+ assertEquals(1, cacheServers.size());
+ CacheServer cacheServer = cacheServers.stream().findFirst().get();
AcceptorImpl acceptor = ((CacheServerImpl) cacheServer).getAcceptor();
+
Awaitility.await().atMost(30, TimeUnit.SECONDS)
.until(() -> acceptor.getClientServerCnxCount() == 1);
diff --git
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheConnectionTimeoutJUnitTest.java
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheConnectionTimeoutJUnitTest.java
index 61a09f25d1..05f56eefb4 100644
---
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheConnectionTimeoutJUnitTest.java
+++
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheConnectionTimeoutJUnitTest.java
@@ -26,6 +26,8 @@
import java.util.Properties;
import java.util.concurrent.TimeUnit;
+import org.apache.geode.internal.cache.tier.CommunicationMode;
+import org.apache.geode.internal.protocol.protobuf.ConnectionAPI;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
@@ -100,7 +102,8 @@ public void setup() throws Exception {
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
outputStream = socket.getOutputStream();
- outputStream.write(110);
+
outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+
outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
serializationService = new ProtobufSerializationService();
diff --git
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheMaxConnectionJUnitTest.java
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheMaxConnectionJUnitTest.java
index a256e84f83..fb5ed5eb39 100644
---
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheMaxConnectionJUnitTest.java
+++
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheMaxConnectionJUnitTest.java
@@ -102,7 +102,8 @@ public void setup() throws Exception {
socket = new Socket("localhost", cacheServerPort);
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
OutputStream outputStream = socket.getOutputStream();
- outputStream.write(110);
+
outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+
outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
serializationService = new ProtobufSerializationService();
protobufProtocolSerializer = new ProtobufProtocolSerializer();
@@ -188,6 +189,7 @@ private void validateSocketCreationAndDestruction(int
cacheServerPort, int conne
Awaitility.await().atMost(5,
TimeUnit.SECONDS).until(socket::isConnected);
OutputStream outputStream = socket.getOutputStream();
outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+
outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
ClientProtocol.Message.newBuilder()
.setRequest(ClientProtocol.Request.newBuilder()
diff --git
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheOperationsJUnitTest.java
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheOperationsJUnitTest.java
index c81f0d6095..248794485c 100644
---
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheOperationsJUnitTest.java
+++
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheOperationsJUnitTest.java
@@ -138,6 +138,7 @@ public void setup() throws Exception {
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
outputStream = socket.getOutputStream();
outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+
outputStream.write(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
ClientProtocol.Message.newBuilder()
.setRequest(ClientProtocol.Request.newBuilder()
diff --git
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/LocatorConnectionDUnitTest.java
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/LocatorConnectionDUnitTest.java
index 3d239b586f..2558b274fb 100644
---
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/LocatorConnectionDUnitTest.java
+++
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/LocatorConnectionDUnitTest.java
@@ -24,6 +24,7 @@
import java.net.Socket;
import java.util.Properties;
+import org.apache.geode.internal.protocol.protobuf.ConnectionAPI;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -75,6 +76,7 @@ private Socket createSocket() throws IOException {
dataOutputStream.writeInt(0);
// Using the constant from AcceptorImpl to ensure that magic byte is the
same
dataOutputStream.writeByte(ProtobufClientServerProtocol.getModeNumber());
+
dataOutputStream.writeByte(ConnectionAPI.MajorVersions.CURRENT_MAJOR_VERSION_VALUE);
return socket;
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Add Handshake/Message version byte
> ----------------------------------
>
> Key: GEODE-3895
> URL: https://issues.apache.org/jira/browse/GEODE-3895
> Project: Geode
> Issue Type: Improvement
> Components: client/server
> Reporter: Brian Baynes
> Priority: Major
>
> Add an extra protocol version byte so that the first two bytes will
> (initially) be 110 followed by 1. The '1' byte will be increased when the
> HandshakeRequest or Message changes in such a way that we can't make the
> handshake backward-compatible.
> Ensure that clients on newer versions will have their connections terminated.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)