This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new bdd88f2b84 Implement custom Thrift protocol to ensure client/server 
compatibility (#5691)
bdd88f2b84 is described below

commit bdd88f2b841c21ca3fdb1f58afa4e60f00e05277
Author: Dom G. <[email protected]>
AuthorDate: Tue Sep 16 20:37:26 2025 -0400

    Implement custom Thrift protocol to ensure client/server compatibility 
(#5691)
    
    Implement a custom Thrift protocol that validates that a connection is...
    
    * ... a legitimate Accumulo client (contains the correct magic number and 
protocol version),
    * ... is running a compatible version of Accumulo with the server (same 
major.minor), and
    * ... is communicating with the intended Accumulo instance (same InstanceId)
    
    ---------
    
    Co-authored-by: Christopher Tubbs <[email protected]>
---
 .../core/clientImpl/InstanceOperationsImpl.java    |   3 +-
 .../accumulo/core/rpc/AccumuloProtocolFactory.java | 236 +++++++++++++++++++++
 .../org/apache/accumulo/core/rpc/ThriftUtil.java   |  33 ++-
 .../accumulo/core/rpc/TraceProtocolFactory.java    |  66 ------
 .../accumulo/core/rpc/clients/TServerClient.java   |   5 +-
 .../accumulo/core/rpc/AccumuloProtocolTest.java    | 184 ++++++++++++++++
 .../accumulo/server/manager/LiveTServerSet.java    |   8 +-
 .../apache/accumulo/server/rpc/TServerUtils.java   |  32 +--
 .../apache/accumulo/gc/SimpleGarbageCollector.java |   5 +-
 .../accumulo/test/functional/ZombieTServer.java    |   2 +-
 .../accumulo/test/performance/NullTserver.java     |   2 +-
 11 files changed, 475 insertions(+), 101 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
index d89e208a81..3008b079f0 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
@@ -449,7 +449,8 @@ public class InstanceOperationsImpl implements 
InstanceOperations {
   @Override
   public void ping(String server) throws AccumuloException {
     try (TTransport transport = 
createTransport(AddressUtil.parseAddress(server), context)) {
-      ClientService.Client client = createClient(ThriftClientTypes.CLIENT, 
transport);
+      ClientService.Client client =
+          createClient(ThriftClientTypes.CLIENT, transport, 
context.getInstanceID());
       client.ping(context.rpcCreds());
     } catch (TException e) {
       throw new AccumuloException(e);
diff --git 
a/core/src/main/java/org/apache/accumulo/core/rpc/AccumuloProtocolFactory.java 
b/core/src/main/java/org/apache/accumulo/core/rpc/AccumuloProtocolFactory.java
new file mode 100644
index 0000000000..f4b122a4dd
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/rpc/AccumuloProtocolFactory.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.rpc;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.data.InstanceId;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TMessage;
+import org.apache.thrift.transport.TTransport;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
+
+/**
+ * Factory for creating instances of the AccumuloProtocol.
+ * <p>
+ * This protocol includes a custom header to ensure compatibility between 
different versions of the
+ * protocol. It also traces RPC calls.
+ */
+public class AccumuloProtocolFactory extends TCompactProtocol.Factory {
+
+  private static final long serialVersionUID = 1L;
+
+  private final boolean isClient;
+  private final InstanceId instanceId;
+
+  static class AccumuloProtocol extends TCompactProtocol {
+
+    static final int MAGIC_NUMBER = 0x41434355; // "ACCU" in ASCII
+    static final byte PROTOCOL_VERSION = 1; // changes only when the header 
format changes
+
+    private final boolean isClient;
+    private final InstanceId instanceId;
+
+    private Span span = null;
+    private Scope scope = null;
+
+    private AccumuloProtocol(TTransport transport, InstanceId instanceId, 
boolean isClient) {
+      super(transport);
+      this.instanceId = instanceId;
+      this.isClient = isClient;
+    }
+
+    /**
+     * For client calls, write the Accumulo protocol header before writing the 
message
+     */
+    @Override
+    public void writeMessageBegin(TMessage message) throws TException {
+      span = TraceUtil.startClientRpcSpan(this.getClass(), message.name);
+      scope = span.makeCurrent();
+
+      if (this.isClient) {
+        this.writeClientHeader();
+      }
+      super.writeMessageBegin(message);
+    }
+
+    @Override
+    public void writeMessageEnd() throws TException {
+      try {
+        super.writeMessageEnd();
+      } finally {
+        scope.close();
+        span.end();
+      }
+    }
+
+    /**
+     * For server calls, validate the header before reading the message
+     */
+    @Override
+    public TMessage readMessageBegin() throws TException {
+      if (!this.isClient) {
+        this.readAndValidateHeader();
+      }
+
+      return super.readMessageBegin();
+    }
+
+    /**
+     * Writes the Accumulo protocol header containing version and 
identification info
+     */
+    private void writeClientHeader() throws TException {
+      super.writeI32(MAGIC_NUMBER);
+      super.writeByte(PROTOCOL_VERSION);
+      super.writeString(Constants.VERSION);
+      super.writeString(this.instanceId.canonical());
+    }
+
+    /**
+     * Reads and validates the Accumulo protocol header
+     *
+     * @throws TException if the header is invalid or incompatible
+     */
+    void readAndValidateHeader() throws TException {
+
+      final int magic;
+      try {
+        magic = super.readI32();
+      } catch (TException e) {
+        throw new TException("Failed to read magic number from header", e);
+      }
+      if (magic != MAGIC_NUMBER) {
+        throw new TException("Invalid Accumulo protocol: magic number 
mismatch. Expected: 0x"
+            + Integer.toHexString(MAGIC_NUMBER) + ", got: 0x" + 
Integer.toHexString(magic));
+      }
+
+      final byte clientProtocolVersion;
+      try {
+        clientProtocolVersion = super.readByte();
+      } catch (TException e) {
+        throw new TException("Failed to read protocol version from header", e);
+      }
+      validateProtocolVersion(clientProtocolVersion);
+
+      final String clientAccumuloVersion;
+      try {
+        clientAccumuloVersion = super.readString();
+      } catch (TException e) {
+        throw new TException("Failed to read accumulo version from header", e);
+      }
+      validateAccumuloVersion(clientAccumuloVersion);
+
+      final String clientInstanceId;
+      try {
+        clientInstanceId = super.readString();
+      } catch (TException e) {
+        throw new TException("Failed to read instance id from header", e);
+      }
+      validateInstanceId(clientInstanceId);
+    }
+
+    /**
+     * @throws TException if the given protocol version is incompatible with 
the current version
+     */
+    private void validateProtocolVersion(byte protocolVersion) throws 
TException {
+      if (protocolVersion != PROTOCOL_VERSION) {
+        throw new TException("Incompatible protocol version. Version seen: " + 
protocolVersion
+            + ", expected version: " + PROTOCOL_VERSION);
+      }
+    }
+
+    /**
+     * @throws TException if the given Accumulo version (client) is 
incompatible with the current
+     *         version (server)
+     */
+    private void validateAccumuloVersion(String clientAccumuloVersion) throws 
TException {
+      final String serverAccumuloVersion = Constants.VERSION;
+
+      // Extract major.minor version components
+      final String serverMajorMinor = 
extractMajorMinorVersion(serverAccumuloVersion);
+      final String clientMajorMinor = 
extractMajorMinorVersion(clientAccumuloVersion);
+
+      if (!serverMajorMinor.equals(clientMajorMinor)) {
+        throw new TException("Incompatible Accumulo versions. Client version: "
+            + clientAccumuloVersion + ", Server version: " + 
serverAccumuloVersion
+            + ". Major.minor versions must match.");
+      }
+    }
+
+    /**
+     * @return the major.minor portion from a version string (e.g., 
"4.0.0-SNAPSHOT" -> "4.0")
+     */
+    private String extractMajorMinorVersion(String version) throws TException {
+      final int lastDotIndex = version.lastIndexOf('.');
+      if (lastDotIndex == -1) {
+        throw new TException("Invalid version format: " + version);
+      }
+      return version.substring(0, lastDotIndex);
+    }
+
+    /**
+     * @throws TException if the given instance ID (client) does not match the 
current instance ID
+     *         (server)
+     */
+    private void validateInstanceId(String clientInstanceId) throws TException 
{
+      if (!clientInstanceId.equals(this.instanceId.canonical())) {
+        throw new TException(
+            "Mismatched instance ID in header. Expected to match server 
instance ID: "
+                + this.instanceId + ", but got: " + clientInstanceId);
+      }
+    }
+
+  }
+
+  @Override
+  public AccumuloProtocol getProtocol(TTransport trans) {
+    return new AccumuloProtocol(requireNonNull(trans), this.instanceId, 
isClient);
+  }
+
+  /**
+   * Creates a factory for producing AccumuloProtocol instances
+   *
+   * @param instanceId the instance ID of the client or server
+   * @param isClient true if this factory produces protocols for the client 
side, false for the
+   *        server side
+   */
+  private AccumuloProtocolFactory(InstanceId instanceId, boolean isClient) {
+    this.isClient = isClient;
+    this.instanceId = requireNonNull(instanceId);
+  }
+
+  /**
+   * Creates a client-side factory for use in clients making RPC calls
+   */
+  public static AccumuloProtocolFactory clientFactory(InstanceId instanceId) {
+    return new AccumuloProtocolFactory(instanceId, true);
+  }
+
+  /**
+   * Creates a server-side factory for use in servers receiving RPC calls
+   */
+  public static AccumuloProtocolFactory serverFactory(InstanceId instanceId) {
+    return new AccumuloProtocolFactory(instanceId, false);
+  }
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java 
b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
index e7792e3b8d..4fd6d57e66 100644
--- a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.data.InstanceId;
 import org.apache.accumulo.core.rpc.SaslConnectionParams.SaslMechanism;
 import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -53,7 +54,6 @@ public class ThriftUtil {
 
   private static final Logger log = LoggerFactory.getLogger(ThriftUtil.class);
 
-  private static final TraceProtocolFactory protocolFactory = new 
TraceProtocolFactory();
   private static final AccumuloTFramedTransportFactory transportFactory =
       new AccumuloTFramedTransportFactory(Integer.MAX_VALUE);
   private static final Map<Integer,TTransportFactory> factoryCache = new 
HashMap<>();
@@ -64,12 +64,27 @@ public class ThriftUtil {
   private static final int RELOGIN_MAX_BACKOFF = 5000;
 
   /**
-   * An instance of {@link TraceProtocolFactory}
+   * Returns the client-side Accumulo protocol factory used for RPC.
+   * <p>
+   * This protocol factory creates protocol instances that prepend a custom 
header with magic number
+   * and protocol version
+   *
+   * @return The client-side Accumulo TProtocolFactory for RPC
+   */
+  public static TProtocolFactory clientProtocolFactory(InstanceId instanceId) {
+    return AccumuloProtocolFactory.clientFactory(instanceId);
+  }
+
+  /**
+   * Returns the server-side Accumulo protocol factory used for RPC.
+   * <p>
+   * This protocol factory creates protocol instances that validate a custom 
header with magic
+   * number and protocol version
    *
-   * @return The default Thrift TProtocolFactory for RPC
+   * @return The server-side Accumulo TProtocolFactory for RPC
    */
-  public static TProtocolFactory protocolFactory() {
-    return protocolFactory;
+  public static TProtocolFactory serverProtocolFactory(InstanceId instanceId) {
+    return AccumuloProtocolFactory.serverFactory(instanceId);
   }
 
   /**
@@ -85,8 +100,8 @@ public class ThriftUtil {
    * Create a Thrift client using the given factory and transport
    */
   public static <T extends TServiceClient> T createClient(ThriftClientTypes<T> 
type,
-      TTransport transport) {
-    return type.getClient(protocolFactory.getProtocol(transport));
+      TTransport transport, InstanceId instanceId) {
+    return 
type.getClient(clientProtocolFactory(instanceId).getProtocol(transport));
   }
 
   /**
@@ -114,7 +129,7 @@ public class ThriftUtil {
       HostAndPort address, ClientContext context) throws TTransportException {
     TTransport transport = context.getTransportPool().getTransport(type, 
address,
         context.getClientTimeoutInMillis(), context, true);
-    return createClient(type, transport);
+    return createClient(type, transport, context.getInstanceID());
   }
 
   /**
@@ -130,7 +145,7 @@ public class ThriftUtil {
       HostAndPort address, ClientContext context, long timeout) throws 
TTransportException {
     TTransport transport =
         context.getTransportPool().getTransport(type, address, timeout, 
context, true);
-    return createClient(type, transport);
+    return createClient(type, transport, context.getInstanceID());
   }
 
   public static void close(TServiceClient client, ClientContext context) {
diff --git 
a/core/src/main/java/org/apache/accumulo/core/rpc/TraceProtocolFactory.java 
b/core/src/main/java/org/apache/accumulo/core/rpc/TraceProtocolFactory.java
deleted file mode 100644
index de8e0d7bf0..0000000000
--- a/core/src/main/java/org/apache/accumulo/core/rpc/TraceProtocolFactory.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.core.rpc;
-
-import org.apache.accumulo.core.trace.TraceUtil;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TMessage;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TTransport;
-
-import io.opentelemetry.api.trace.Span;
-import io.opentelemetry.context.Scope;
-
-/**
- * {@link org.apache.thrift.protocol.TCompactProtocol.Factory} implementation 
which uses a protocol
- * which traces
- */
-public class TraceProtocolFactory extends TCompactProtocol.Factory {
-  private static final long serialVersionUID = 1L;
-
-  private static class TraceProtocol extends TCompactProtocol {
-
-    private Span span = null;
-    private Scope scope = null;
-
-    public TraceProtocol(TTransport transport) {
-      super(transport);
-    }
-
-    @Override
-    public void writeMessageBegin(TMessage message) throws TException {
-      span = TraceUtil.startClientRpcSpan(this.getClass(), message.name);
-      scope = span.makeCurrent();
-      super.writeMessageBegin(message);
-    }
-
-    @Override
-    public void writeMessageEnd() throws TException {
-      super.writeMessageEnd();
-      scope.close();
-      span.end();
-    }
-  }
-
-  @Override
-  public TProtocol getProtocol(TTransport trans) {
-    return new TraceProtocol(trans);
-  }
-}
diff --git 
a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java 
b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java
index 6265d981bb..b510422084 100644
--- a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java
@@ -109,7 +109,8 @@ public interface TServerClient<C extends TServiceClient> {
       Pair<String,TTransport> cachedTransport =
           context.getTransportPool().getAnyCachedTransport(type, context, 
service, rgp);
       if (cachedTransport != null) {
-        C client = ThriftUtil.createClient(type, cachedTransport.getSecond());
+        C client =
+            ThriftUtil.createClient(type, cachedTransport.getSecond(), 
context.getInstanceID());
         warned.set(false);
         return new Pair<String,C>(cachedTransport.getFirst(), client);
       }
@@ -160,7 +161,7 @@ public interface TServerClient<C extends TServiceClient> {
           try {
             TTransport transport = 
context.getTransportPool().getTransport(type,
                 tserverClientAddress, rpcTimeout, context, 
preferCachedConnections);
-            C client = ThriftUtil.createClient(type, transport);
+            C client = ThriftUtil.createClient(type, transport, 
context.getInstanceID());
             if (type == ThriftClientTypes.CLIENT && debugHostSpecified) {
               LOG.info("Connecting to debug host: {}", debugHost);
             }
diff --git 
a/core/src/test/java/org/apache/accumulo/core/rpc/AccumuloProtocolTest.java 
b/core/src/test/java/org/apache/accumulo/core/rpc/AccumuloProtocolTest.java
new file mode 100644
index 0000000000..26a5f2ec60
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/rpc/AccumuloProtocolTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.rpc;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.data.InstanceId;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.transport.TMemoryBuffer;
+import org.junit.jupiter.api.Test;
+
+public class AccumuloProtocolTest {
+
+  private static final int VALID_MAGIC_NUMBER =
+      AccumuloProtocolFactory.AccumuloProtocol.MAGIC_NUMBER;
+  private static final int INVALID_MAGIC_NUMBER = 0x12345678;
+  private static final byte VALID_PROTOCOL_VERSION =
+      AccumuloProtocolFactory.AccumuloProtocol.PROTOCOL_VERSION;
+  private static final byte INVALID_PROTOCOL_VERSION = 99;
+  private static final InstanceId INSTANCE_ID = InstanceId.of("instanceId");
+
+  /**
+   * Test that a valid header does not throw an exception
+   */
+  @Test
+  public void testValidHeader() throws TException {
+    try (TMemoryBuffer transport = new TMemoryBuffer(100)) {
+
+      TCompactProtocol protocol = new TCompactProtocol(transport);
+      protocol.writeI32(VALID_MAGIC_NUMBER);
+      protocol.writeByte(VALID_PROTOCOL_VERSION);
+      protocol.writeString(Constants.VERSION);
+      protocol.writeString(INSTANCE_ID.canonical());
+
+      var serverProtocol =
+          
AccumuloProtocolFactory.serverFactory(INSTANCE_ID).getProtocol(transport);
+
+      serverProtocol.readAndValidateHeader();
+
+      assertEquals(0, transport.read(new byte[10], 0, 10), "Expected all data 
to be consumed");
+    }
+  }
+
+  /**
+   * Test that an invalid magic number throws an exception
+   */
+  @Test
+  public void testInvalidMagicNumber() throws TException {
+    try (TMemoryBuffer transport = new TMemoryBuffer(100)) {
+
+      TCompactProtocol protocol = new TCompactProtocol(transport);
+
+      // only need to write the magic number since its checked first
+      protocol.writeI32(INVALID_MAGIC_NUMBER);
+
+      AccumuloProtocolFactory.AccumuloProtocol serverProtocol =
+          
AccumuloProtocolFactory.serverFactory(INSTANCE_ID).getProtocol(transport);
+
+      var e = assertThrows(TException.class, 
serverProtocol::readAndValidateHeader);
+      assertTrue(e.getMessage().contains("magic number mismatch"),
+          "Expected bad magic number msg. Got: " + e.getMessage());
+    }
+  }
+
+  /**
+   * Test that an incompatible protocol version number throws an exception
+   */
+  @Test
+  public void testIncompatibleProtocolVersion() throws TException {
+    try (TMemoryBuffer transport = new TMemoryBuffer(100)) {
+
+      TCompactProtocol protocol = new TCompactProtocol(transport);
+      protocol.writeI32(VALID_MAGIC_NUMBER);
+      protocol.writeByte(INVALID_PROTOCOL_VERSION);
+      // don't need to write the other header parts since it should fail 
before reading them
+
+      AccumuloProtocolFactory.AccumuloProtocol serverProtocol =
+          
AccumuloProtocolFactory.serverFactory(INSTANCE_ID).getProtocol(transport);
+
+      var e = assertThrows(TException.class, 
serverProtocol::readAndValidateHeader);
+      assertTrue(e.getMessage().contains("Incompatible protocol version"),
+          "Expected incompatible version msg. Got: " + e.getMessage());
+    }
+  }
+
+  /**
+   * Test that compatible accumulo version (same major.minor) passes validation
+   */
+  @Test
+  public void testCompatibleVersions() throws TException {
+    try (TMemoryBuffer transport = new TMemoryBuffer(100)) {
+      TCompactProtocol protocol = new TCompactProtocol(transport);
+      protocol.writeI32(VALID_MAGIC_NUMBER);
+      protocol.writeByte(VALID_PROTOCOL_VERSION);
+
+      // Write current version but with different patch version
+      String serverMajorMinor = Constants.VERSION.substring(0, 
Constants.VERSION.lastIndexOf('.'));
+      String clientVersion = serverMajorMinor + ".999";
+
+      protocol.writeString(clientVersion);
+      protocol.writeString(INSTANCE_ID.canonical());
+
+      AccumuloProtocolFactory.AccumuloProtocol serverProtocol =
+          
AccumuloProtocolFactory.serverFactory(INSTANCE_ID).getProtocol(transport);
+
+      serverProtocol.readAndValidateHeader();
+    }
+  }
+
+  /**
+   * Test that incompatible accumulo version (different major.minor) throws an 
exception
+   */
+  @Test
+  public void testIncompatibleVersions() throws TException {
+    // increment major version number so it is incompatible
+    String[] parts = Constants.VERSION.split("\\.");
+    String incompatibleVersionMajorBump = (Integer.parseInt(parts[0]) + 1) + 
".0.0";
+    String incompatibleVersionMinorBump = parts[0] + "." + 
(Integer.parseInt(parts[1]) + 1) + ".0";
+    String incompatibleVersionMajorMinorBump =
+        (Integer.parseInt(parts[0]) + 1) + "." + (Integer.parseInt(parts[1]) + 
1) + ".0";
+
+    for (String incompatibleVersion : Set.of(incompatibleVersionMajorBump,
+        incompatibleVersionMinorBump, incompatibleVersionMajorMinorBump)) {
+      try (TMemoryBuffer transport = new TMemoryBuffer(100)) {
+        TCompactProtocol protocol = new TCompactProtocol(transport);
+        protocol.writeI32(VALID_MAGIC_NUMBER);
+        protocol.writeByte(VALID_PROTOCOL_VERSION);
+
+        protocol.writeString(incompatibleVersion);
+
+        AccumuloProtocolFactory.AccumuloProtocol serverProtocol =
+            
AccumuloProtocolFactory.serverFactory(INSTANCE_ID).getProtocol(transport);
+
+        var e = assertThrows(TException.class, 
serverProtocol::readAndValidateHeader);
+        assertTrue(e.getMessage().contains("Incompatible Accumulo versions"),
+            "Expected incompatible version msg. Got: " + e.getMessage());
+      }
+    }
+  }
+
+  /**
+   * Test that an incomplete protocol header throws an exception
+   */
+  @Test
+  public void testIncompleteHeader() throws TException {
+    try (TMemoryBuffer transport = new TMemoryBuffer(100)) {
+
+      TCompactProtocol protocol = new TCompactProtocol(transport);
+      protocol.writeI32(VALID_MAGIC_NUMBER);
+      protocol.writeByte(VALID_PROTOCOL_VERSION);
+      // don't write the version string
+      protocol.writeBool(false);
+
+      AccumuloProtocolFactory.AccumuloProtocol serverProtocol =
+          
AccumuloProtocolFactory.serverFactory(INSTANCE_ID).getProtocol(transport);
+
+      var e = assertThrows(TException.class, 
serverProtocol::readAndValidateHeader);
+      assertTrue(e.getMessage().contains("Failed to read accumulo version from 
header"),
+          "Expected incomplete header msg. Got: " + e.getMessage());
+    }
+  }
+}
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
 
b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
index 194809d454..8d7f5ee653 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
@@ -106,8 +106,8 @@ public class LiveTServerSet implements ZooCacheWatcher {
       if (extent.isMeta()) {
         // see ACCUMULO-3597
         try (TTransport transport = ThriftUtil.createTransport(address, 
context)) {
-          TabletManagementClientService.Client client =
-              ThriftUtil.createClient(ThriftClientTypes.TABLET_MGMT, 
transport);
+          TabletManagementClientService.Client client = ThriftUtil
+              .createClient(ThriftClientTypes.TABLET_MGMT, transport, 
context.getInstanceID());
           loadTablet(client, lock, extent);
         }
       } else {
@@ -143,8 +143,8 @@ public class LiveTServerSet implements ZooCacheWatcher {
       long start = System.currentTimeMillis();
 
       try (TTransport transport = ThriftUtil.createTransport(address, 
context)) {
-        TabletServerClientService.Client client =
-            ThriftUtil.createClient(ThriftClientTypes.TABLET_SERVER, 
transport);
+        TabletServerClientService.Client client = ThriftUtil
+            .createClient(ThriftClientTypes.TABLET_SERVER, transport, 
context.getInstanceID());
         TabletServerStatus status =
             client.getTabletServerStatus(TraceUtil.traceInfo(), 
context.rpcCreds());
         if (status != null) {
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java 
b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index 24287b9d9f..7d6544d110 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@ -44,6 +44,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.conf.PropertyType;
 import org.apache.accumulo.core.conf.PropertyType.PortRange;
+import org.apache.accumulo.core.data.InstanceId;
 import org.apache.accumulo.core.metrics.MetricsInfo;
 import org.apache.accumulo.core.rpc.SslConnectionParams;
 import org.apache.accumulo.core.rpc.ThriftUtil;
@@ -171,8 +172,8 @@ public class TServerUtils {
 
     HostAndPort[] addresses = getHostAndPorts(hostname, portHint);
     try {
-      return TServerUtils.createThriftServer(serverType, timedProcessor, 
serverName, minThreads,
-          threadTimeOut, config, timeBetweenThreadChecks, maxMessageSize,
+      return TServerUtils.createThriftServer(serverType, timedProcessor, 
context.getInstanceID(),
+          serverName, minThreads, threadTimeOut, config, 
timeBetweenThreadChecks, maxMessageSize,
           context.getServerSslParams(), context.getSaslParams(), 
context.getClientTimeoutInMillis(),
           backlog, portSearch, addresses);
     } catch (TTransportException e) {
@@ -196,10 +197,11 @@ public class TServerUtils {
           }
           try {
             HostAndPort addr = HostAndPort.fromParts(hostname, port);
-            return TServerUtils.createThriftServer(serverType, timedProcessor, 
serverName,
-                minThreads, threadTimeOut, config, timeBetweenThreadChecks, 
maxMessageSize,
-                context.getServerSslParams(), context.getSaslParams(),
-                context.getClientTimeoutInMillis(), backlog, portSearch, addr);
+            return TServerUtils.createThriftServer(serverType, timedProcessor,
+                context.getInstanceID(), serverName, minThreads, 
threadTimeOut, config,
+                timeBetweenThreadChecks, maxMessageSize, 
context.getServerSslParams(),
+                context.getSaslParams(), context.getClientTimeoutInMillis(), 
backlog, portSearch,
+                addr);
           } catch (TTransportException tte) {
             log.info("Unable to use port {}, retrying.", port);
           }
@@ -565,8 +567,8 @@ public class TServerUtils {
    * {@code ServerAddress#startThriftServer(String)}
    */
   public static ServerAddress createThriftServer(final AccumuloConfiguration 
conf,
-      ThriftServerType serverType, TProcessor processor, String serverName, 
int numThreads,
-      long threadTimeOut, long timeBetweenThreadChecks, long maxMessageSize,
+      ThriftServerType serverType, TProcessor processor, InstanceId 
instanceId, String serverName,
+      int numThreads, long threadTimeOut, long timeBetweenThreadChecks, long 
maxMessageSize,
       SslConnectionParams sslParams, SaslServerConnectionParams saslParams,
       long serverSocketTimeout, int backlog, MetricsInfo metricsInfo, boolean 
portSearch,
       HostAndPort... addresses) {
@@ -576,9 +578,9 @@ public class TServerUtils {
     }
 
     try {
-      return createThriftServer(serverType, new TimedProcessor(processor, 
metricsInfo), serverName,
-          numThreads, threadTimeOut, conf, timeBetweenThreadChecks, 
maxMessageSize, sslParams,
-          saslParams, serverSocketTimeout, backlog, portSearch, addresses);
+      return createThriftServer(serverType, new TimedProcessor(processor, 
metricsInfo), instanceId,
+          serverName, numThreads, threadTimeOut, conf, 
timeBetweenThreadChecks, maxMessageSize,
+          sslParams, saslParams, serverSocketTimeout, backlog, portSearch, 
addresses);
     } catch (TTransportException e) {
       throw new IllegalStateException(e);
     }
@@ -592,12 +594,12 @@ public class TServerUtils {
    *         bound to.
    */
   private static ServerAddress createThriftServer(ThriftServerType serverType,
-      TimedProcessor processor, String serverName, int numThreads, long 
threadTimeOut,
-      final AccumuloConfiguration conf, long timeBetweenThreadChecks, long 
maxMessageSize,
-      SslConnectionParams sslParams, SaslServerConnectionParams saslParams,
+      TimedProcessor processor, InstanceId instanceId, String serverName, int 
numThreads,
+      long threadTimeOut, final AccumuloConfiguration conf, long 
timeBetweenThreadChecks,
+      long maxMessageSize, SslConnectionParams sslParams, 
SaslServerConnectionParams saslParams,
       long serverSocketTimeout, int backlog, boolean portSearch, 
HostAndPort... addresses)
       throws TTransportException {
-    TProtocolFactory protocolFactory = ThriftUtil.protocolFactory();
+    TProtocolFactory protocolFactory = 
ThriftUtil.serverProtocolFactory(instanceId);
     // This is presently not supported. It's hypothetically possible, I 
believe, to work, but it
     // would require changes in how the transports
     // work at the Thrift layer to ensure that both the SSL and SASL 
handshakes function. SASL's
diff --git 
a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java 
b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 5b09324d4a..bc5edfa168 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -426,8 +426,9 @@ public class SimpleGarbageCollector extends AbstractServer 
implements Iface {
     long maxMessageSize = 
getConfiguration().getAsBytes(Property.RPC_MAX_MESSAGE_SIZE);
     updateThriftServer(() -> {
       return TServerUtils.createThriftServer(getConfiguration(), 
getContext().getThriftServerType(),
-          processor, this.getClass().getSimpleName(), 2, 
ThreadPools.DEFAULT_TIMEOUT_MILLISECS,
-          1000, maxMessageSize, getContext().getServerSslParams(), 
getContext().getSaslParams(), 0,
+          processor, getContext().getInstanceID(), 
this.getClass().getSimpleName(), 2,
+          ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, maxMessageSize,
+          getContext().getServerSslParams(), getContext().getSaslParams(), 0,
           getConfiguration().getCount(Property.RPC_BACKLOG), 
getContext().getMetricsInfo(), false,
           addresses);
     }, true);
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java 
b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
index 2e9768397a..3437310e34 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
@@ -125,7 +125,7 @@ public class ZombieTServer {
             TabletScanClientService.Iface.class, tch, context));
 
     ServerAddress serverPort = 
TServerUtils.createThriftServer(context.getConfiguration(),
-        ThriftServerType.CUSTOM_HS_HA, muxProcessor, "ZombieTServer", 2,
+        ThriftServerType.CUSTOM_HS_HA, muxProcessor, context.getInstanceID(), 
"ZombieTServer", 2,
         ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, 10 * 1024 * 1024, null, 
null, -1,
         context.getConfiguration().getCount(Property.RPC_BACKLOG), 
context.getMetricsInfo(), false,
         HostAndPort.fromParts(ConfigOpts.BIND_ALL_ADDRESSES, port));
diff --git 
a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java 
b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
index b03abdb8a8..785eeacfca 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
@@ -316,7 +316,7 @@ public class NullTserver {
             TabletManagementClientService.Iface.class, tch, context));
 
     ServerAddress sa = 
TServerUtils.createThriftServer(context.getConfiguration(),
-        ThriftServerType.CUSTOM_HS_HA, muxProcessor, "NullTServer", 2,
+        ThriftServerType.CUSTOM_HS_HA, muxProcessor, context.getInstanceID(), 
"NullTServer", 2,
         ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, 10 * 1024 * 1024, null, 
null, -1,
         context.getConfiguration().getCount(Property.RPC_BACKLOG), 
context.getMetricsInfo(), false,
         HostAndPort.fromParts(ConfigOpts.BIND_ALL_ADDRESSES, opts.port));


Reply via email to