This is an automated email from the ASF dual-hosted git repository.
ctubbsii pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new bdd88f2b84 Implement custom Thrift protocol to ensure client/server
compatibility (#5691)
bdd88f2b84 is described below
commit bdd88f2b841c21ca3fdb1f58afa4e60f00e05277
Author: Dom G. <[email protected]>
AuthorDate: Tue Sep 16 20:37:26 2025 -0400
Implement custom Thrift protocol to ensure client/server compatibility
(#5691)
Implement a custom Thrift protocol that validates that a connection is...
* ... a legitimate Accumulo client (contains the correct magic number and
protocol version),
* ... is running a compatible version of Accumulo with the server (same
major.minor), and
* ... is communicating with the intended Accumulo instance (same InstanceId)
---------
Co-authored-by: Christopher Tubbs <[email protected]>
---
.../core/clientImpl/InstanceOperationsImpl.java | 3 +-
.../accumulo/core/rpc/AccumuloProtocolFactory.java | 236 +++++++++++++++++++++
.../org/apache/accumulo/core/rpc/ThriftUtil.java | 33 ++-
.../accumulo/core/rpc/TraceProtocolFactory.java | 66 ------
.../accumulo/core/rpc/clients/TServerClient.java | 5 +-
.../accumulo/core/rpc/AccumuloProtocolTest.java | 184 ++++++++++++++++
.../accumulo/server/manager/LiveTServerSet.java | 8 +-
.../apache/accumulo/server/rpc/TServerUtils.java | 32 +--
.../apache/accumulo/gc/SimpleGarbageCollector.java | 5 +-
.../accumulo/test/functional/ZombieTServer.java | 2 +-
.../accumulo/test/performance/NullTserver.java | 2 +-
11 files changed, 475 insertions(+), 101 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
index d89e208a81..3008b079f0 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
@@ -449,7 +449,8 @@ public class InstanceOperationsImpl implements
InstanceOperations {
@Override
public void ping(String server) throws AccumuloException {
try (TTransport transport =
createTransport(AddressUtil.parseAddress(server), context)) {
- ClientService.Client client = createClient(ThriftClientTypes.CLIENT,
transport);
+ ClientService.Client client =
+ createClient(ThriftClientTypes.CLIENT, transport,
context.getInstanceID());
client.ping(context.rpcCreds());
} catch (TException e) {
throw new AccumuloException(e);
diff --git
a/core/src/main/java/org/apache/accumulo/core/rpc/AccumuloProtocolFactory.java
b/core/src/main/java/org/apache/accumulo/core/rpc/AccumuloProtocolFactory.java
new file mode 100644
index 0000000000..f4b122a4dd
--- /dev/null
+++
b/core/src/main/java/org/apache/accumulo/core/rpc/AccumuloProtocolFactory.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.rpc;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.data.InstanceId;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TMessage;
+import org.apache.thrift.transport.TTransport;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
+
+/**
+ * Factory for creating instances of the AccumuloProtocol.
+ * <p>
+ * This protocol includes a custom header to ensure compatibility between
different versions of the
+ * protocol. It also traces RPC calls.
+ */
+public class AccumuloProtocolFactory extends TCompactProtocol.Factory {
+
+ private static final long serialVersionUID = 1L;
+
+ private final boolean isClient;
+ private final InstanceId instanceId;
+
+ static class AccumuloProtocol extends TCompactProtocol {
+
+ static final int MAGIC_NUMBER = 0x41434355; // "ACCU" in ASCII
+ static final byte PROTOCOL_VERSION = 1; // changes only when the header
format changes
+
+ private final boolean isClient;
+ private final InstanceId instanceId;
+
+ private Span span = null;
+ private Scope scope = null;
+
+ private AccumuloProtocol(TTransport transport, InstanceId instanceId,
boolean isClient) {
+ super(transport);
+ this.instanceId = instanceId;
+ this.isClient = isClient;
+ }
+
+ /**
+ * For client calls, write the Accumulo protocol header before writing the
message
+ */
+ @Override
+ public void writeMessageBegin(TMessage message) throws TException {
+ span = TraceUtil.startClientRpcSpan(this.getClass(), message.name);
+ scope = span.makeCurrent();
+
+ if (this.isClient) {
+ this.writeClientHeader();
+ }
+ super.writeMessageBegin(message);
+ }
+
+ @Override
+ public void writeMessageEnd() throws TException {
+ try {
+ super.writeMessageEnd();
+ } finally {
+ scope.close();
+ span.end();
+ }
+ }
+
+ /**
+ * For server calls, validate the header before reading the message
+ */
+ @Override
+ public TMessage readMessageBegin() throws TException {
+ if (!this.isClient) {
+ this.readAndValidateHeader();
+ }
+
+ return super.readMessageBegin();
+ }
+
+ /**
+ * Writes the Accumulo protocol header containing version and
identification info
+ */
+ private void writeClientHeader() throws TException {
+ super.writeI32(MAGIC_NUMBER);
+ super.writeByte(PROTOCOL_VERSION);
+ super.writeString(Constants.VERSION);
+ super.writeString(this.instanceId.canonical());
+ }
+
+ /**
+ * Reads and validates the Accumulo protocol header
+ *
+ * @throws TException if the header is invalid or incompatible
+ */
+ void readAndValidateHeader() throws TException {
+
+ final int magic;
+ try {
+ magic = super.readI32();
+ } catch (TException e) {
+ throw new TException("Failed to read magic number from header", e);
+ }
+ if (magic != MAGIC_NUMBER) {
+ throw new TException("Invalid Accumulo protocol: magic number
mismatch. Expected: 0x"
+ + Integer.toHexString(MAGIC_NUMBER) + ", got: 0x" +
Integer.toHexString(magic));
+ }
+
+ final byte clientProtocolVersion;
+ try {
+ clientProtocolVersion = super.readByte();
+ } catch (TException e) {
+ throw new TException("Failed to read protocol version from header", e);
+ }
+ validateProtocolVersion(clientProtocolVersion);
+
+ final String clientAccumuloVersion;
+ try {
+ clientAccumuloVersion = super.readString();
+ } catch (TException e) {
+ throw new TException("Failed to read accumulo version from header", e);
+ }
+ validateAccumuloVersion(clientAccumuloVersion);
+
+ final String clientInstanceId;
+ try {
+ clientInstanceId = super.readString();
+ } catch (TException e) {
+ throw new TException("Failed to read instance id from header", e);
+ }
+ validateInstanceId(clientInstanceId);
+ }
+
+ /**
+ * @throws TException if the given protocol version is incompatible with
the current version
+ */
+ private void validateProtocolVersion(byte protocolVersion) throws
TException {
+ if (protocolVersion != PROTOCOL_VERSION) {
+ throw new TException("Incompatible protocol version. Version seen: " +
protocolVersion
+ + ", expected version: " + PROTOCOL_VERSION);
+ }
+ }
+
+ /**
+ * @throws TException if the given Accumulo version (client) is
incompatible with the current
+ * version (server)
+ */
+ private void validateAccumuloVersion(String clientAccumuloVersion) throws
TException {
+ final String serverAccumuloVersion = Constants.VERSION;
+
+ // Extract major.minor version components
+ final String serverMajorMinor =
extractMajorMinorVersion(serverAccumuloVersion);
+ final String clientMajorMinor =
extractMajorMinorVersion(clientAccumuloVersion);
+
+ if (!serverMajorMinor.equals(clientMajorMinor)) {
+ throw new TException("Incompatible Accumulo versions. Client version: "
+ + clientAccumuloVersion + ", Server version: " +
serverAccumuloVersion
+ + ". Major.minor versions must match.");
+ }
+ }
+
+ /**
+ * @return the major.minor portion from a version string (e.g.,
"4.0.0-SNAPSHOT" -> "4.0")
+ */
+ private String extractMajorMinorVersion(String version) throws TException {
+ final int lastDotIndex = version.lastIndexOf('.');
+ if (lastDotIndex == -1) {
+ throw new TException("Invalid version format: " + version);
+ }
+ return version.substring(0, lastDotIndex);
+ }
+
+ /**
+ * @throws TException if the given instance ID (client) does not match the
current instance ID
+ * (server)
+ */
+ private void validateInstanceId(String clientInstanceId) throws TException
{
+ if (!clientInstanceId.equals(this.instanceId.canonical())) {
+ throw new TException(
+ "Mismatched instance ID in header. Expected to match server
instance ID: "
+ + this.instanceId + ", but got: " + clientInstanceId);
+ }
+ }
+
+ }
+
+ @Override
+ public AccumuloProtocol getProtocol(TTransport trans) {
+ return new AccumuloProtocol(requireNonNull(trans), this.instanceId,
isClient);
+ }
+
+ /**
+ * Creates a factory for producing AccumuloProtocol instances
+ *
+ * @param instanceId the instance ID of the client or server
+ * @param isClient true if this factory produces protocols for the client
side, false for the
+ * server side
+ */
+ private AccumuloProtocolFactory(InstanceId instanceId, boolean isClient) {
+ this.isClient = isClient;
+ this.instanceId = requireNonNull(instanceId);
+ }
+
+ /**
+ * Creates a client-side factory for use in clients making RPC calls
+ */
+ public static AccumuloProtocolFactory clientFactory(InstanceId instanceId) {
+ return new AccumuloProtocolFactory(instanceId, true);
+ }
+
+ /**
+ * Creates a server-side factory for use in servers receiving RPC calls
+ */
+ public static AccumuloProtocolFactory serverFactory(InstanceId instanceId) {
+ return new AccumuloProtocolFactory(instanceId, false);
+ }
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
index e7792e3b8d..4fd6d57e66 100644
--- a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.rpc.SaslConnectionParams.SaslMechanism;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
import org.apache.hadoop.security.UserGroupInformation;
@@ -53,7 +54,6 @@ public class ThriftUtil {
private static final Logger log = LoggerFactory.getLogger(ThriftUtil.class);
- private static final TraceProtocolFactory protocolFactory = new
TraceProtocolFactory();
private static final AccumuloTFramedTransportFactory transportFactory =
new AccumuloTFramedTransportFactory(Integer.MAX_VALUE);
private static final Map<Integer,TTransportFactory> factoryCache = new
HashMap<>();
@@ -64,12 +64,27 @@ public class ThriftUtil {
private static final int RELOGIN_MAX_BACKOFF = 5000;
/**
- * An instance of {@link TraceProtocolFactory}
+ * Returns the client-side Accumulo protocol factory used for RPC.
+ * <p>
+ * This protocol factory creates protocol instances that prepend a custom
header with magic number
+ * and protocol version
+ *
+ * @return The client-side Accumulo TProtocolFactory for RPC
+ */
+ public static TProtocolFactory clientProtocolFactory(InstanceId instanceId) {
+ return AccumuloProtocolFactory.clientFactory(instanceId);
+ }
+
+ /**
+ * Returns the server-side Accumulo protocol factory used for RPC.
+ * <p>
+ * This protocol factory creates protocol instances that validate a custom
header with magic
+ * number and protocol version
*
- * @return The default Thrift TProtocolFactory for RPC
+ * @return The server-side Accumulo TProtocolFactory for RPC
*/
- public static TProtocolFactory protocolFactory() {
- return protocolFactory;
+ public static TProtocolFactory serverProtocolFactory(InstanceId instanceId) {
+ return AccumuloProtocolFactory.serverFactory(instanceId);
}
/**
@@ -85,8 +100,8 @@ public class ThriftUtil {
* Create a Thrift client using the given factory and transport
*/
public static <T extends TServiceClient> T createClient(ThriftClientTypes<T>
type,
- TTransport transport) {
- return type.getClient(protocolFactory.getProtocol(transport));
+ TTransport transport, InstanceId instanceId) {
+ return
type.getClient(clientProtocolFactory(instanceId).getProtocol(transport));
}
/**
@@ -114,7 +129,7 @@ public class ThriftUtil {
HostAndPort address, ClientContext context) throws TTransportException {
TTransport transport = context.getTransportPool().getTransport(type,
address,
context.getClientTimeoutInMillis(), context, true);
- return createClient(type, transport);
+ return createClient(type, transport, context.getInstanceID());
}
/**
@@ -130,7 +145,7 @@ public class ThriftUtil {
HostAndPort address, ClientContext context, long timeout) throws
TTransportException {
TTransport transport =
context.getTransportPool().getTransport(type, address, timeout,
context, true);
- return createClient(type, transport);
+ return createClient(type, transport, context.getInstanceID());
}
public static void close(TServiceClient client, ClientContext context) {
diff --git
a/core/src/main/java/org/apache/accumulo/core/rpc/TraceProtocolFactory.java
b/core/src/main/java/org/apache/accumulo/core/rpc/TraceProtocolFactory.java
deleted file mode 100644
index de8e0d7bf0..0000000000
--- a/core/src/main/java/org/apache/accumulo/core/rpc/TraceProtocolFactory.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.core.rpc;
-
-import org.apache.accumulo.core.trace.TraceUtil;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TMessage;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TTransport;
-
-import io.opentelemetry.api.trace.Span;
-import io.opentelemetry.context.Scope;
-
-/**
- * {@link org.apache.thrift.protocol.TCompactProtocol.Factory} implementation
which uses a protocol
- * which traces
- */
-public class TraceProtocolFactory extends TCompactProtocol.Factory {
- private static final long serialVersionUID = 1L;
-
- private static class TraceProtocol extends TCompactProtocol {
-
- private Span span = null;
- private Scope scope = null;
-
- public TraceProtocol(TTransport transport) {
- super(transport);
- }
-
- @Override
- public void writeMessageBegin(TMessage message) throws TException {
- span = TraceUtil.startClientRpcSpan(this.getClass(), message.name);
- scope = span.makeCurrent();
- super.writeMessageBegin(message);
- }
-
- @Override
- public void writeMessageEnd() throws TException {
- super.writeMessageEnd();
- scope.close();
- span.end();
- }
- }
-
- @Override
- public TProtocol getProtocol(TTransport trans) {
- return new TraceProtocol(trans);
- }
-}
diff --git
a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java
b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java
index 6265d981bb..b510422084 100644
--- a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java
@@ -109,7 +109,8 @@ public interface TServerClient<C extends TServiceClient> {
Pair<String,TTransport> cachedTransport =
context.getTransportPool().getAnyCachedTransport(type, context,
service, rgp);
if (cachedTransport != null) {
- C client = ThriftUtil.createClient(type, cachedTransport.getSecond());
+ C client =
+ ThriftUtil.createClient(type, cachedTransport.getSecond(),
context.getInstanceID());
warned.set(false);
return new Pair<String,C>(cachedTransport.getFirst(), client);
}
@@ -160,7 +161,7 @@ public interface TServerClient<C extends TServiceClient> {
try {
TTransport transport =
context.getTransportPool().getTransport(type,
tserverClientAddress, rpcTimeout, context,
preferCachedConnections);
- C client = ThriftUtil.createClient(type, transport);
+ C client = ThriftUtil.createClient(type, transport,
context.getInstanceID());
if (type == ThriftClientTypes.CLIENT && debugHostSpecified) {
LOG.info("Connecting to debug host: {}", debugHost);
}
diff --git
a/core/src/test/java/org/apache/accumulo/core/rpc/AccumuloProtocolTest.java
b/core/src/test/java/org/apache/accumulo/core/rpc/AccumuloProtocolTest.java
new file mode 100644
index 0000000000..26a5f2ec60
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/rpc/AccumuloProtocolTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.rpc;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.data.InstanceId;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.transport.TMemoryBuffer;
+import org.junit.jupiter.api.Test;
+
+public class AccumuloProtocolTest {
+
+ private static final int VALID_MAGIC_NUMBER =
+ AccumuloProtocolFactory.AccumuloProtocol.MAGIC_NUMBER;
+ private static final int INVALID_MAGIC_NUMBER = 0x12345678;
+ private static final byte VALID_PROTOCOL_VERSION =
+ AccumuloProtocolFactory.AccumuloProtocol.PROTOCOL_VERSION;
+ private static final byte INVALID_PROTOCOL_VERSION = 99;
+ private static final InstanceId INSTANCE_ID = InstanceId.of("instanceId");
+
+ /**
+ * Test that a valid header does not throw an exception
+ */
+ @Test
+ public void testValidHeader() throws TException {
+ try (TMemoryBuffer transport = new TMemoryBuffer(100)) {
+
+ TCompactProtocol protocol = new TCompactProtocol(transport);
+ protocol.writeI32(VALID_MAGIC_NUMBER);
+ protocol.writeByte(VALID_PROTOCOL_VERSION);
+ protocol.writeString(Constants.VERSION);
+ protocol.writeString(INSTANCE_ID.canonical());
+
+ var serverProtocol =
+
AccumuloProtocolFactory.serverFactory(INSTANCE_ID).getProtocol(transport);
+
+ serverProtocol.readAndValidateHeader();
+
+ assertEquals(0, transport.read(new byte[10], 0, 10), "Expected all data
to be consumed");
+ }
+ }
+
+ /**
+ * Test that an invalid magic number throws an exception
+ */
+ @Test
+ public void testInvalidMagicNumber() throws TException {
+ try (TMemoryBuffer transport = new TMemoryBuffer(100)) {
+
+ TCompactProtocol protocol = new TCompactProtocol(transport);
+
+ // only need to write the magic number since its checked first
+ protocol.writeI32(INVALID_MAGIC_NUMBER);
+
+ AccumuloProtocolFactory.AccumuloProtocol serverProtocol =
+
AccumuloProtocolFactory.serverFactory(INSTANCE_ID).getProtocol(transport);
+
+ var e = assertThrows(TException.class,
serverProtocol::readAndValidateHeader);
+ assertTrue(e.getMessage().contains("magic number mismatch"),
+ "Expected bad magic number msg. Got: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Test that an incompatible protocol version number throws an exception
+ */
+ @Test
+ public void testIncompatibleProtocolVersion() throws TException {
+ try (TMemoryBuffer transport = new TMemoryBuffer(100)) {
+
+ TCompactProtocol protocol = new TCompactProtocol(transport);
+ protocol.writeI32(VALID_MAGIC_NUMBER);
+ protocol.writeByte(INVALID_PROTOCOL_VERSION);
+ // don't need to write the other header parts since it should fail
before reading them
+
+ AccumuloProtocolFactory.AccumuloProtocol serverProtocol =
+
AccumuloProtocolFactory.serverFactory(INSTANCE_ID).getProtocol(transport);
+
+ var e = assertThrows(TException.class,
serverProtocol::readAndValidateHeader);
+ assertTrue(e.getMessage().contains("Incompatible protocol version"),
+ "Expected incompatible version msg. Got: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Test that compatible accumulo version (same major.minor) passes validation
+ */
+ @Test
+ public void testCompatibleVersions() throws TException {
+ try (TMemoryBuffer transport = new TMemoryBuffer(100)) {
+ TCompactProtocol protocol = new TCompactProtocol(transport);
+ protocol.writeI32(VALID_MAGIC_NUMBER);
+ protocol.writeByte(VALID_PROTOCOL_VERSION);
+
+ // Write current version but with different patch version
+ String serverMajorMinor = Constants.VERSION.substring(0,
Constants.VERSION.lastIndexOf('.'));
+ String clientVersion = serverMajorMinor + ".999";
+
+ protocol.writeString(clientVersion);
+ protocol.writeString(INSTANCE_ID.canonical());
+
+ AccumuloProtocolFactory.AccumuloProtocol serverProtocol =
+
AccumuloProtocolFactory.serverFactory(INSTANCE_ID).getProtocol(transport);
+
+ serverProtocol.readAndValidateHeader();
+ }
+ }
+
+ /**
+ * Test that incompatible accumulo version (different major.minor) throws an
exception
+ */
+ @Test
+ public void testIncompatibleVersions() throws TException {
+ // increment major version number so it is incompatible
+ String[] parts = Constants.VERSION.split("\\.");
+ String incompatibleVersionMajorBump = (Integer.parseInt(parts[0]) + 1) +
".0.0";
+ String incompatibleVersionMinorBump = parts[0] + "." +
(Integer.parseInt(parts[1]) + 1) + ".0";
+ String incompatibleVersionMajorMinorBump =
+ (Integer.parseInt(parts[0]) + 1) + "." + (Integer.parseInt(parts[1]) +
1) + ".0";
+
+ for (String incompatibleVersion : Set.of(incompatibleVersionMajorBump,
+ incompatibleVersionMinorBump, incompatibleVersionMajorMinorBump)) {
+ try (TMemoryBuffer transport = new TMemoryBuffer(100)) {
+ TCompactProtocol protocol = new TCompactProtocol(transport);
+ protocol.writeI32(VALID_MAGIC_NUMBER);
+ protocol.writeByte(VALID_PROTOCOL_VERSION);
+
+ protocol.writeString(incompatibleVersion);
+
+ AccumuloProtocolFactory.AccumuloProtocol serverProtocol =
+
AccumuloProtocolFactory.serverFactory(INSTANCE_ID).getProtocol(transport);
+
+ var e = assertThrows(TException.class,
serverProtocol::readAndValidateHeader);
+ assertTrue(e.getMessage().contains("Incompatible Accumulo versions"),
+ "Expected incompatible version msg. Got: " + e.getMessage());
+ }
+ }
+ }
+
+ /**
+ * Test that an incomplete protocol header throws an exception
+ */
+ @Test
+ public void testIncompleteHeader() throws TException {
+ try (TMemoryBuffer transport = new TMemoryBuffer(100)) {
+
+ TCompactProtocol protocol = new TCompactProtocol(transport);
+ protocol.writeI32(VALID_MAGIC_NUMBER);
+ protocol.writeByte(VALID_PROTOCOL_VERSION);
+ // don't write the version string
+ protocol.writeBool(false);
+
+ AccumuloProtocolFactory.AccumuloProtocol serverProtocol =
+
AccumuloProtocolFactory.serverFactory(INSTANCE_ID).getProtocol(transport);
+
+ var e = assertThrows(TException.class,
serverProtocol::readAndValidateHeader);
+ assertTrue(e.getMessage().contains("Failed to read accumulo version from
header"),
+ "Expected incomplete header msg. Got: " + e.getMessage());
+ }
+ }
+}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
index 194809d454..8d7f5ee653 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
@@ -106,8 +106,8 @@ public class LiveTServerSet implements ZooCacheWatcher {
if (extent.isMeta()) {
// see ACCUMULO-3597
try (TTransport transport = ThriftUtil.createTransport(address,
context)) {
- TabletManagementClientService.Client client =
- ThriftUtil.createClient(ThriftClientTypes.TABLET_MGMT,
transport);
+ TabletManagementClientService.Client client = ThriftUtil
+ .createClient(ThriftClientTypes.TABLET_MGMT, transport,
context.getInstanceID());
loadTablet(client, lock, extent);
}
} else {
@@ -143,8 +143,8 @@ public class LiveTServerSet implements ZooCacheWatcher {
long start = System.currentTimeMillis();
try (TTransport transport = ThriftUtil.createTransport(address,
context)) {
- TabletServerClientService.Client client =
- ThriftUtil.createClient(ThriftClientTypes.TABLET_SERVER,
transport);
+ TabletServerClientService.Client client = ThriftUtil
+ .createClient(ThriftClientTypes.TABLET_SERVER, transport,
context.getInstanceID());
TabletServerStatus status =
client.getTabletServerStatus(TraceUtil.traceInfo(),
context.rpcCreds());
if (status != null) {
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index 24287b9d9f..7d6544d110 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@ -44,6 +44,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.conf.PropertyType;
import org.apache.accumulo.core.conf.PropertyType.PortRange;
+import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.metrics.MetricsInfo;
import org.apache.accumulo.core.rpc.SslConnectionParams;
import org.apache.accumulo.core.rpc.ThriftUtil;
@@ -171,8 +172,8 @@ public class TServerUtils {
HostAndPort[] addresses = getHostAndPorts(hostname, portHint);
try {
- return TServerUtils.createThriftServer(serverType, timedProcessor,
serverName, minThreads,
- threadTimeOut, config, timeBetweenThreadChecks, maxMessageSize,
+ return TServerUtils.createThriftServer(serverType, timedProcessor,
context.getInstanceID(),
+ serverName, minThreads, threadTimeOut, config,
timeBetweenThreadChecks, maxMessageSize,
context.getServerSslParams(), context.getSaslParams(),
context.getClientTimeoutInMillis(),
backlog, portSearch, addresses);
} catch (TTransportException e) {
@@ -196,10 +197,11 @@ public class TServerUtils {
}
try {
HostAndPort addr = HostAndPort.fromParts(hostname, port);
- return TServerUtils.createThriftServer(serverType, timedProcessor,
serverName,
- minThreads, threadTimeOut, config, timeBetweenThreadChecks,
maxMessageSize,
- context.getServerSslParams(), context.getSaslParams(),
- context.getClientTimeoutInMillis(), backlog, portSearch, addr);
+ return TServerUtils.createThriftServer(serverType, timedProcessor,
+ context.getInstanceID(), serverName, minThreads,
threadTimeOut, config,
+ timeBetweenThreadChecks, maxMessageSize,
context.getServerSslParams(),
+ context.getSaslParams(), context.getClientTimeoutInMillis(),
backlog, portSearch,
+ addr);
} catch (TTransportException tte) {
log.info("Unable to use port {}, retrying.", port);
}
@@ -565,8 +567,8 @@ public class TServerUtils {
* {@code ServerAddress#startThriftServer(String)}
*/
public static ServerAddress createThriftServer(final AccumuloConfiguration
conf,
- ThriftServerType serverType, TProcessor processor, String serverName,
int numThreads,
- long threadTimeOut, long timeBetweenThreadChecks, long maxMessageSize,
+ ThriftServerType serverType, TProcessor processor, InstanceId
instanceId, String serverName,
+ int numThreads, long threadTimeOut, long timeBetweenThreadChecks, long
maxMessageSize,
SslConnectionParams sslParams, SaslServerConnectionParams saslParams,
long serverSocketTimeout, int backlog, MetricsInfo metricsInfo, boolean
portSearch,
HostAndPort... addresses) {
@@ -576,9 +578,9 @@ public class TServerUtils {
}
try {
- return createThriftServer(serverType, new TimedProcessor(processor,
metricsInfo), serverName,
- numThreads, threadTimeOut, conf, timeBetweenThreadChecks,
maxMessageSize, sslParams,
- saslParams, serverSocketTimeout, backlog, portSearch, addresses);
+ return createThriftServer(serverType, new TimedProcessor(processor,
metricsInfo), instanceId,
+ serverName, numThreads, threadTimeOut, conf,
timeBetweenThreadChecks, maxMessageSize,
+ sslParams, saslParams, serverSocketTimeout, backlog, portSearch,
addresses);
} catch (TTransportException e) {
throw new IllegalStateException(e);
}
@@ -592,12 +594,12 @@ public class TServerUtils {
* bound to.
*/
private static ServerAddress createThriftServer(ThriftServerType serverType,
- TimedProcessor processor, String serverName, int numThreads, long
threadTimeOut,
- final AccumuloConfiguration conf, long timeBetweenThreadChecks, long
maxMessageSize,
- SslConnectionParams sslParams, SaslServerConnectionParams saslParams,
+ TimedProcessor processor, InstanceId instanceId, String serverName, int
numThreads,
+ long threadTimeOut, final AccumuloConfiguration conf, long
timeBetweenThreadChecks,
+ long maxMessageSize, SslConnectionParams sslParams,
SaslServerConnectionParams saslParams,
long serverSocketTimeout, int backlog, boolean portSearch,
HostAndPort... addresses)
throws TTransportException {
- TProtocolFactory protocolFactory = ThriftUtil.protocolFactory();
+ TProtocolFactory protocolFactory =
ThriftUtil.serverProtocolFactory(instanceId);
// This is presently not supported. It's hypothetically possible, I
believe, to work, but it
// would require changes in how the transports
// work at the Thrift layer to ensure that both the SSL and SASL
handshakes function. SASL's
diff --git
a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 5b09324d4a..bc5edfa168 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -426,8 +426,9 @@ public class SimpleGarbageCollector extends AbstractServer
implements Iface {
long maxMessageSize =
getConfiguration().getAsBytes(Property.RPC_MAX_MESSAGE_SIZE);
updateThriftServer(() -> {
return TServerUtils.createThriftServer(getConfiguration(),
getContext().getThriftServerType(),
- processor, this.getClass().getSimpleName(), 2,
ThreadPools.DEFAULT_TIMEOUT_MILLISECS,
- 1000, maxMessageSize, getContext().getServerSslParams(),
getContext().getSaslParams(), 0,
+ processor, getContext().getInstanceID(),
this.getClass().getSimpleName(), 2,
+ ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, maxMessageSize,
+ getContext().getServerSslParams(), getContext().getSaslParams(), 0,
getConfiguration().getCount(Property.RPC_BACKLOG),
getContext().getMetricsInfo(), false,
addresses);
}, true);
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
index 2e9768397a..3437310e34 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
@@ -125,7 +125,7 @@ public class ZombieTServer {
TabletScanClientService.Iface.class, tch, context));
ServerAddress serverPort =
TServerUtils.createThriftServer(context.getConfiguration(),
- ThriftServerType.CUSTOM_HS_HA, muxProcessor, "ZombieTServer", 2,
+ ThriftServerType.CUSTOM_HS_HA, muxProcessor, context.getInstanceID(),
"ZombieTServer", 2,
ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, 10 * 1024 * 1024, null,
null, -1,
context.getConfiguration().getCount(Property.RPC_BACKLOG),
context.getMetricsInfo(), false,
HostAndPort.fromParts(ConfigOpts.BIND_ALL_ADDRESSES, port));
diff --git
a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
index b03abdb8a8..785eeacfca 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
@@ -316,7 +316,7 @@ public class NullTserver {
TabletManagementClientService.Iface.class, tch, context));
ServerAddress sa =
TServerUtils.createThriftServer(context.getConfiguration(),
- ThriftServerType.CUSTOM_HS_HA, muxProcessor, "NullTServer", 2,
+ ThriftServerType.CUSTOM_HS_HA, muxProcessor, context.getInstanceID(),
"NullTServer", 2,
ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, 10 * 1024 * 1024, null,
null, -1,
context.getConfiguration().getCount(Property.RPC_BACKLOG),
context.getMetricsInfo(), false,
HostAndPort.fromParts(ConfigOpts.BIND_ALL_ADDRESSES, opts.port));