This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new eb033cef8e Make sure maxMessageSize is correctly set when Thrift is 
configured to use a non blocking server (#3134)
eb033cef8e is described below

commit eb033cef8efee856c77235ddae15d0fd81f054c3
Author: Christopher L. Shannon <christopher.l.shan...@gmail.com>
AuthorDate: Wed Jan 11 12:09:49 2023 -0500

    Make sure maxMessageSize is correctly set when Thrift is configured to use 
a non blocking server (#3134)
    
    Make sure maxMessageSize is correctly set when Thrift is configured to
    use a non blocking server
    
    TServerUtils was not correctly setting the max frame size value on the
    constructor when creating a TNonblockingServerSocket leading to failures
    if the frame size exceeded the default.
---
 .../apache/accumulo/server/rpc/TServerUtils.java   |   8 +-
 .../test/functional/ThriftMaxFrameSizeIT.java      | 106 +++++++++++++++++++++
 2 files changed, 112 insertions(+), 2 deletions(-)

diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java 
b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index b1a4f2e50b..3a3088358f 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@ -69,6 +69,8 @@ import org.apache.thrift.transport.TTransportFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.primitives.Ints;
+
 /**
  * Factory methods for creating Thrift server objects
  */
@@ -221,7 +223,8 @@ public class TServerUtils {
       long timeBetweenThreadChecks, long maxMessageSize) throws 
TTransportException {
 
     final TNonblockingServerSocket transport =
-        new TNonblockingServerSocket(new InetSocketAddress(address.getHost(), 
address.getPort()));
+        new TNonblockingServerSocket(new InetSocketAddress(address.getHost(), 
address.getPort()), 0,
+            Ints.saturatedCast(maxMessageSize));
 
     TThreadedSelectorServer.Args options = new 
TThreadedSelectorServer.Args(transport);
 
@@ -256,7 +259,8 @@ public class TServerUtils {
       long maxMessageSize) throws TTransportException {
 
     final TNonblockingServerSocket transport =
-        new TNonblockingServerSocket(new InetSocketAddress(address.getHost(), 
address.getPort()));
+        new TNonblockingServerSocket(new InetSocketAddress(address.getHost(), 
address.getPort()), 0,
+            Ints.saturatedCast(maxMessageSize));
     final CustomNonBlockingServer.Args options = new 
CustomNonBlockingServer.Args(transport);
 
     options.protocolFactory(protocolFactory);
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/ThriftMaxFrameSizeIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/ThriftMaxFrameSizeIT.java
new file mode 100644
index 0000000000..8b957d74f8
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/ThriftMaxFrameSizeIT.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static 
org.apache.accumulo.test.functional.ConfigurableMacBase.configureForSsl;
+
+import java.time.Duration;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.rpc.ThriftServerType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.thrift.TConfiguration;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+public class ThriftMaxFrameSizeIT extends AccumuloClusterHarness {
+
+  private ThriftServerType serverType;
+
+  @Override
+  protected Duration defaultTimeout() {
+    return Duration.ofMinutes(1);
+  }
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
+    cfg.setProperty(Property.GENERAL_RPC_SERVER_TYPE, serverType.name());
+    if (serverType == ThriftServerType.SSL) {
+      configureForSsl(cfg,
+          getSslDir(createTestDir(this.getClass().getName() + "_" + 
this.testName())));
+    }
+  }
+
+  @Nested
+  class TestDefault extends TestMaxFrameSize {
+    TestDefault() {
+      serverType = ThriftServerType.getDefault();
+    }
+  }
+
+  @Nested
+  class TestThreadedSelector extends TestMaxFrameSize {
+    TestThreadedSelector() {
+      serverType = ThriftServerType.THREADED_SELECTOR;
+    }
+  }
+
+  @Nested
+  class TestCustomHsHa extends TestMaxFrameSize {
+    TestCustomHsHa() {
+      serverType = ThriftServerType.CUSTOM_HS_HA;
+    }
+  }
+
+  @Nested
+  class TestThreadPool extends TestMaxFrameSize {
+    TestThreadPool() {
+      serverType = ThriftServerType.THREADPOOL;
+    }
+  }
+
+  @Nested
+  class TestSsl extends TestMaxFrameSize {
+    TestSsl() {
+      serverType = ThriftServerType.THREADPOOL;
+    }
+  }
+
+  protected abstract class TestMaxFrameSize {
+
+    @Test
+    public void testMaxFrameSizeLargerThanDefault() throws Exception {
+
+      // Ingest with a value width greater than the thrift default size to 
verify our setting works
+      // for max frame wize
+      try (AccumuloClient accumuloClient = 
Accumulo.newClient().from(getClientProps()).build()) {
+        String table = getUniqueNames(1)[0];
+        ReadWriteIT.ingest(accumuloClient, 1, 1, 
TConfiguration.DEFAULT_MAX_FRAME_SIZE + 1, 0,
+            table);
+        ReadWriteIT.verify(accumuloClient, 1, 1, 
TConfiguration.DEFAULT_MAX_FRAME_SIZE + 1, 0,
+            table);
+      }
+    }
+  }
+
+}

Reply via email to