This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new eb033cef8e Make sure maxMessageSize is correctly set when Thrift is
configured to use a non blocking server (#3134)
eb033cef8e is described below
commit eb033cef8efee856c77235ddae15d0fd81f054c3
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Wed Jan 11 12:09:49 2023 -0500
Make sure maxMessageSize is correctly set when Thrift is configured to use
a non blocking server (#3134)
Make sure maxMessageSize is correctly set when Thrift is configured to
use a non blocking server
TServerUtils was not correctly setting the max frame size value on the
constructor when creating a TNonblockingServerSocket leading to failures
if the frame size exceeded the default.
---
.../apache/accumulo/server/rpc/TServerUtils.java | 8 +-
.../test/functional/ThriftMaxFrameSizeIT.java | 106 +++++++++++++++++++++
2 files changed, 112 insertions(+), 2 deletions(-)
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index b1a4f2e50b..3a3088358f 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@ -69,6 +69,8 @@ import org.apache.thrift.transport.TTransportFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.primitives.Ints;
+
/**
* Factory methods for creating Thrift server objects
*/
@@ -221,7 +223,8 @@ public class TServerUtils {
long timeBetweenThreadChecks, long maxMessageSize) throws
TTransportException {
final TNonblockingServerSocket transport =
- new TNonblockingServerSocket(new InetSocketAddress(address.getHost(),
address.getPort()));
+ new TNonblockingServerSocket(new InetSocketAddress(address.getHost(),
address.getPort()), 0,
+ Ints.saturatedCast(maxMessageSize));
TThreadedSelectorServer.Args options = new
TThreadedSelectorServer.Args(transport);
@@ -256,7 +259,8 @@ public class TServerUtils {
long maxMessageSize) throws TTransportException {
final TNonblockingServerSocket transport =
- new TNonblockingServerSocket(new InetSocketAddress(address.getHost(),
address.getPort()));
+ new TNonblockingServerSocket(new InetSocketAddress(address.getHost(),
address.getPort()), 0,
+ Ints.saturatedCast(maxMessageSize));
final CustomNonBlockingServer.Args options = new
CustomNonBlockingServer.Args(transport);
options.protocolFactory(protocolFactory);
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/ThriftMaxFrameSizeIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/ThriftMaxFrameSizeIT.java
new file mode 100644
index 0000000000..8b957d74f8
--- /dev/null
+++
b/test/src/main/java/org/apache/accumulo/test/functional/ThriftMaxFrameSizeIT.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static
org.apache.accumulo.test.functional.ConfigurableMacBase.configureForSsl;
+
+import java.time.Duration;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.rpc.ThriftServerType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.thrift.TConfiguration;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+public class ThriftMaxFrameSizeIT extends AccumuloClusterHarness {
+
+ private ThriftServerType serverType;
+
+ @Override
+ protected Duration defaultTimeout() {
+ return Duration.ofMinutes(1);
+ }
+
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration
hadoopCoreSite) {
+ cfg.setProperty(Property.GENERAL_RPC_SERVER_TYPE, serverType.name());
+ if (serverType == ThriftServerType.SSL) {
+ configureForSsl(cfg,
+ getSslDir(createTestDir(this.getClass().getName() + "_" +
this.testName())));
+ }
+ }
+
+ @Nested
+ class TestDefault extends TestMaxFrameSize {
+ TestDefault() {
+ serverType = ThriftServerType.getDefault();
+ }
+ }
+
+ @Nested
+ class TestThreadedSelector extends TestMaxFrameSize {
+ TestThreadedSelector() {
+ serverType = ThriftServerType.THREADED_SELECTOR;
+ }
+ }
+
+ @Nested
+ class TestCustomHsHa extends TestMaxFrameSize {
+ TestCustomHsHa() {
+ serverType = ThriftServerType.CUSTOM_HS_HA;
+ }
+ }
+
+ @Nested
+ class TestThreadPool extends TestMaxFrameSize {
+ TestThreadPool() {
+ serverType = ThriftServerType.THREADPOOL;
+ }
+ }
+
+ @Nested
+ class TestSsl extends TestMaxFrameSize {
+ TestSsl() {
+ serverType = ThriftServerType.THREADPOOL;
+ }
+ }
+
+ protected abstract class TestMaxFrameSize {
+
+ @Test
+ public void testMaxFrameSizeLargerThanDefault() throws Exception {
+
+ // Ingest with a value width greater than the thrift default size to
verify our setting works
+ // for max frame wize
+ try (AccumuloClient accumuloClient =
Accumulo.newClient().from(getClientProps()).build()) {
+ String table = getUniqueNames(1)[0];
+ ReadWriteIT.ingest(accumuloClient, 1, 1,
TConfiguration.DEFAULT_MAX_FRAME_SIZE + 1, 0,
+ table);
+ ReadWriteIT.verify(accumuloClient, 1, 1,
TConfiguration.DEFAULT_MAX_FRAME_SIZE + 1, 0,
+ table);
+ }
+ }
+ }
+
+}