This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new eb033cef8e Make sure maxMessageSize is correctly set when Thrift is configured to use a non blocking server (#3134) eb033cef8e is described below commit eb033cef8efee856c77235ddae15d0fd81f054c3 Author: Christopher L. Shannon <christopher.l.shan...@gmail.com> AuthorDate: Wed Jan 11 12:09:49 2023 -0500 Make sure maxMessageSize is correctly set when Thrift is configured to use a non blocking server (#3134) Make sure maxMessageSize is correctly set when Thrift is configured to use a non blocking server TServerUtils was not correctly setting the max frame size value on the constructor when creating a TNonblockingServerSocket leading to failures if the frame size exceeded the default. --- .../apache/accumulo/server/rpc/TServerUtils.java | 8 +- .../test/functional/ThriftMaxFrameSizeIT.java | 106 +++++++++++++++++++++ 2 files changed, 112 insertions(+), 2 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java index b1a4f2e50b..3a3088358f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java @@ -69,6 +69,8 @@ import org.apache.thrift.transport.TTransportFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.primitives.Ints; + /** * Factory methods for creating Thrift server objects */ @@ -221,7 +223,8 @@ public class TServerUtils { long timeBetweenThreadChecks, long maxMessageSize) throws TTransportException { final TNonblockingServerSocket transport = - new TNonblockingServerSocket(new InetSocketAddress(address.getHost(), address.getPort())); + new TNonblockingServerSocket(new InetSocketAddress(address.getHost(), address.getPort()), 0, + Ints.saturatedCast(maxMessageSize)); TThreadedSelectorServer.Args options = new TThreadedSelectorServer.Args(transport); @@ -256,7 +259,8 @@ public class TServerUtils { long maxMessageSize) throws TTransportException { final TNonblockingServerSocket transport = - new TNonblockingServerSocket(new InetSocketAddress(address.getHost(), address.getPort())); + new TNonblockingServerSocket(new InetSocketAddress(address.getHost(), address.getPort()), 0, + Ints.saturatedCast(maxMessageSize)); final CustomNonBlockingServer.Args options = new CustomNonBlockingServer.Args(transport); options.protocolFactory(protocolFactory); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ThriftMaxFrameSizeIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ThriftMaxFrameSizeIT.java new file mode 100644 index 0000000000..8b957d74f8 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/functional/ThriftMaxFrameSizeIT.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.apache.accumulo.test.functional.ConfigurableMacBase.configureForSsl; + +import java.time.Duration; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.server.rpc.ThriftServerType; +import org.apache.hadoop.conf.Configuration; +import org.apache.thrift.TConfiguration; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +public class ThriftMaxFrameSizeIT extends AccumuloClusterHarness { + + private ThriftServerType serverType; + + @Override + protected Duration defaultTimeout() { + return Duration.ofMinutes(1); + } + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setProperty(Property.GENERAL_RPC_SERVER_TYPE, serverType.name()); + if (serverType == ThriftServerType.SSL) { + configureForSsl(cfg, + getSslDir(createTestDir(this.getClass().getName() + "_" + this.testName()))); + } + } + + @Nested + class TestDefault extends TestMaxFrameSize { + TestDefault() { + serverType = ThriftServerType.getDefault(); + } + } + + @Nested + class TestThreadedSelector extends TestMaxFrameSize { + TestThreadedSelector() { + serverType = ThriftServerType.THREADED_SELECTOR; + } + } + + @Nested + class TestCustomHsHa extends TestMaxFrameSize { + TestCustomHsHa() { + serverType = ThriftServerType.CUSTOM_HS_HA; + } + } + + @Nested + class TestThreadPool extends TestMaxFrameSize { + TestThreadPool() { + serverType = ThriftServerType.THREADPOOL; + } + } + + @Nested + class TestSsl extends TestMaxFrameSize { + TestSsl() { + serverType = ThriftServerType.THREADPOOL; + } + } + + protected abstract class TestMaxFrameSize { + + @Test + public void testMaxFrameSizeLargerThanDefault() throws Exception { + + // Ingest with a value width greater than the thrift default size to verify our setting works + // for max frame wize + try (AccumuloClient accumuloClient = Accumulo.newClient().from(getClientProps()).build()) { + String table = getUniqueNames(1)[0]; + ReadWriteIT.ingest(accumuloClient, 1, 1, TConfiguration.DEFAULT_MAX_FRAME_SIZE + 1, 0, + table); + ReadWriteIT.verify(accumuloClient, 1, 1, TConfiguration.DEFAULT_MAX_FRAME_SIZE + 1, 0, + table); + } + } + } + +}