This is an automated email from the ASF dual-hosted git repository.
ctubbsii pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new b1b2557f94 Use custom Transport Factory to set Transport message and
frame size (#3737)
b1b2557f94 is described below
commit b1b2557f949e9212a1b1ca9b65f2d66c01a69edb
Author: Dave Marion <[email protected]>
AuthorDate: Fri Sep 1 12:04:27 2023 -0400
Use custom Transport Factory to set Transport message and frame size (#3737)
Use a custom TFramedTransport.Factory implementation so that when
getTransport
is called, the frame and message size are set on the underlying
configuration.
This is a workaround for https://issues.apache.org/jira/browse/THRIFT-5732
Fixes #3731
Also:
* Throw EOFException when TTransportException type is END_OF_FILE
* Refactor ThriftMaxFrameSizeIT to cover testing messages bigger and
smaller than the configured value and also to use separate mini dirs
for each test
* Include stack trace in TabletServerBatchWriter log message for debugging
* Add default value for timeout.factor in Wait class to avoid error message
and 24 timeout default in IDEs when the system property isn't set
---------
Co-authored-by: Christopher Tubbs <[email protected]>
---
.../TabletServerBatchReaderIterator.java | 11 +-
.../core/clientImpl/TabletServerBatchWriter.java | 2 +-
.../core/rpc/AccumuloTFramedTransportFactory.java | 58 +++++++++
.../org/apache/accumulo/core/rpc/ThriftUtil.java | 7 +-
.../apache/accumulo/core/rpc/ThriftUtilTest.java | 132 +++++++++++++++++++++
.../test/functional/ThriftMaxFrameSizeIT.java | 115 ++++++++++++------
.../java/org/apache/accumulo/test/util/Wait.java | 2 +-
7 files changed, 282 insertions(+), 45 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
index 963a9f2c4a..b937f40a66 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
@@ -22,6 +22,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
+import java.io.EOFException;
import java.io.IOException;
import java.time.Duration;
import java.util.AbstractMap.SimpleImmutableEntry;
@@ -407,7 +408,8 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
failures.putAll(tsFailures);
}
}
-
+ } catch (EOFException e) {
+ fatalException = e;
} catch (IOException e) {
if
(!TabletServerBatchReaderIterator.this.queryThreadPool.isShutdown()) {
synchronized (failures) {
@@ -910,6 +912,13 @@ public class TabletServerBatchReaderIterator implements
Iterator<Entry<Key,Value
} catch (TTransportException e) {
log.debug("Server : {} msg : {}", server, e.getMessage());
timeoutTracker.errorOccured();
+ if (e.getType() == TTransportException.END_OF_FILE) {
+ // END_OF_FILE is used in TEndpointTransport when the
+ // maxMessageSize has been reached.
+ EOFException eof = new EOFException(e.getMessage());
+ eof.addSuppressed(e);
+ throw eof;
+ }
throw new IOException(e);
} catch (ThriftSecurityException e) {
log.debug("Server : {} msg : {}", server, e.getMessage(), e);
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
index de66339886..fadc408bea 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
@@ -916,7 +916,7 @@ public class TabletServerBatchWriter implements
AutoCloseable {
span.end();
}
} catch (IOException e) {
- log.debug("failed to send mutations to {} : {}", location,
e.getMessage());
+ log.debug("failed to send mutations to {}", location, e);
HashSet<TableId> tables = new HashSet<>();
for (KeyExtent ke : mutationBatch.keySet()) {
diff --git
a/core/src/main/java/org/apache/accumulo/core/rpc/AccumuloTFramedTransportFactory.java
b/core/src/main/java/org/apache/accumulo/core/rpc/AccumuloTFramedTransportFactory.java
new file mode 100644
index 0000000000..85882f500e
--- /dev/null
+++
b/core/src/main/java/org/apache/accumulo/core/rpc/AccumuloTFramedTransportFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.rpc;
+
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.layered.TFramedTransport;
+
+/**
+ * This is a workaround for the issue reported in
https://issues.apache.org/jira/browse/THRIFT-5732
+ * and can be removed once that issue is fixed.
+ */
+public class AccumuloTFramedTransportFactory extends TFramedTransport.Factory {
+
+ private final int maxMessageSize;
+
+ public AccumuloTFramedTransportFactory(int maxMessageSize) {
+ super(maxMessageSize);
+ this.maxMessageSize = maxMessageSize;
+ }
+
+ @Override
+ public TTransport getTransport(TTransport base) throws TTransportException {
+ // The input parameter "base" is typically going to be a TSocket
implementation
+ // that represents a connection between two Accumulo endpoints
(client-server,
+ // or server-server). The base transport has a maxMessageSize which
defaults to
+ // 100MB. The FramedTransport that is created by this factory adds a
header to
+ // the message with payload size information. The FramedTransport has a
default
+ // frame size of 16MB, but the TFramedTransport constructor sets the frame
size
+ // to the frame size set on the underlying transport ("base" in this
case").
+ // According to current Thrift docs, a message has to fit into 1 frame, so
the
+ // frame size will be set to the value that is lower. Prior to this class
being
+ // created, we were only setting the frame size, so messages were capped
at 100MB
+ // because that's the default maxMessageSize. Here we are setting the
maxMessageSize
+ // and maxFrameSize to the same value on the "base" transport so that when
the
+ // TFramedTransport object is created, it ends up using the values that we
want.
+ base.getConfiguration().setMaxFrameSize(maxMessageSize);
+ base.getConfiguration().setMaxMessageSize(maxMessageSize);
+ return super.getTransport(base);
+ }
+
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
index d961188509..a448908f75 100644
--- a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
@@ -49,7 +49,6 @@ import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;
-import org.apache.thrift.transport.layered.TFramedTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,8 +62,8 @@ public class ThriftUtil {
private static final Logger log = LoggerFactory.getLogger(ThriftUtil.class);
private static final TraceProtocolFactory protocolFactory = new
TraceProtocolFactory();
- private static final TFramedTransport.Factory transportFactory =
- new TFramedTransport.Factory(Integer.MAX_VALUE);
+ private static final AccumuloTFramedTransportFactory transportFactory =
+ new AccumuloTFramedTransportFactory(Integer.MAX_VALUE);
private static final Map<Integer,TTransportFactory> factoryCache = new
HashMap<>();
public static final String GSSAPI = "GSSAPI", DIGEST_MD5 = "DIGEST-MD5";
@@ -186,7 +185,7 @@ public class ThriftUtil {
int maxFrameSize1 = (int) maxFrameSize;
TTransportFactory factory = factoryCache.get(maxFrameSize1);
if (factory == null) {
- factory = new TFramedTransport.Factory(maxFrameSize1);
+ factory = new AccumuloTFramedTransportFactory(maxFrameSize1);
factoryCache.put(maxFrameSize1, factory);
}
return factory;
diff --git
a/core/src/test/java/org/apache/accumulo/core/rpc/ThriftUtilTest.java
b/core/src/test/java/org/apache/accumulo/core/rpc/ThriftUtilTest.java
new file mode 100644
index 0000000000..dd3df43eb7
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/rpc/ThriftUtilTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.rpc;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.thrift.transport.TByteBuffer;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.layered.TFramedTransport;
+import org.junit.jupiter.api.Test;
+
+public class ThriftUtilTest {
+
+ public static final int FRAME_HDR_SIZE = 4;
+ public static final int MB1 = 1 * 1024 * 1024;
+ public static final int MB10 = 10 * 1024 * 1024;
+ public static final int MB100 = 100 * 1024 * 1024;
+ public static final int GB = 1 * 1024 * 1024 * 1024;
+
+ @Test
+ public void testDefaultTFramedTransportFactory() throws TTransportException {
+
+ // This test confirms that the default maxMessageSize in Thrift is 100MB
+ // even when we set the frame size to be 1GB
+
+ TByteBuffer underlyingTransport = new
TByteBuffer(ByteBuffer.allocate(1024));
+
+ TFramedTransport.Factory factory = new TFramedTransport.Factory(GB);
+ TTransport framedTransport = factory.getTransport(underlyingTransport);
+
+ assertEquals(framedTransport.getConfiguration().getMaxFrameSize(), GB);
+ assertEquals(framedTransport.getConfiguration().getMaxMessageSize(),
MB100);
+ }
+
+ @Test
+ public void testAccumuloTFramedTransportFactory() throws TTransportException
{
+
+ // This test confirms that our custom FramedTransportFactory sets the max
+ // message size and max frame size to the value that we want.
+
+ TByteBuffer underlyingTransport = new
TByteBuffer(ByteBuffer.allocate(1024));
+
+ AccumuloTFramedTransportFactory factory = new
AccumuloTFramedTransportFactory(GB);
+ TTransport framedTransport = factory.getTransport(underlyingTransport);
+
+ assertEquals(framedTransport.getConfiguration().getMaxFrameSize(), GB);
+ assertEquals(framedTransport.getConfiguration().getMaxMessageSize(), GB);
+ }
+
+ @Test
+ public void testMessageSizeReadWriteSuccess() throws Exception {
+
+ // This test creates an 10MB buffer in memory as the underlying transport,
then
+ // creates a TFramedTransport with a 1MB maxFrameSize and maxMessageSize.
It then
+ // writes 1MB - 4 bytes (to account for the frame header) to the transport
and
+ // reads the data back out.
+
+ TByteBuffer underlyingTransport = new
TByteBuffer(ByteBuffer.allocate(MB10));
+ AccumuloTFramedTransportFactory factory = new
AccumuloTFramedTransportFactory(MB1);
+ TTransport framedTransport = factory.getTransport(underlyingTransport);
+ assertEquals(framedTransport.getConfiguration().getMaxFrameSize(), MB1);
+ assertEquals(framedTransport.getConfiguration().getMaxMessageSize(), MB1);
+
+ byte[] writeBuf = new byte[MB1 - FRAME_HDR_SIZE];
+ Arrays.fill(writeBuf, (byte) 1);
+ framedTransport.write(writeBuf);
+ framedTransport.flush();
+
+ assertEquals(MB1, underlyingTransport.getByteBuffer().position());
+ underlyingTransport.flip();
+ assertEquals(0, underlyingTransport.getByteBuffer().position());
+ assertEquals(MB1, underlyingTransport.getByteBuffer().limit());
+
+ byte[] readBuf = new byte[MB1];
+ framedTransport.read(readBuf, 0, MB1);
+ }
+
+ @Test
+ public void testMessageSizeWriteFailure() throws Exception {
+
+ // This test creates an 10MB buffer in memory as the underlying transport,
then
+ // creates a TFramedTransport with a 1MB maxFrameSize and maxMessageSize.
It then
+ // writes 1MB + 100 bytes to the transport, which fails as it's larger
than the
+ // configured frame and message size.
+
+ TByteBuffer underlyingTransport = new
TByteBuffer(ByteBuffer.allocate(MB10));
+ AccumuloTFramedTransportFactory factory = new
AccumuloTFramedTransportFactory(MB1);
+ TTransport framedTransport = factory.getTransport(underlyingTransport);
+ assertEquals(framedTransport.getConfiguration().getMaxFrameSize(), MB1);
+ assertEquals(framedTransport.getConfiguration().getMaxMessageSize(), MB1);
+
+ // Write more than 1MB to the TByteBuffer, it's possible to write more data
+ // than allowed by the frame, it's enforced on the read.
+ final int ourSize = MB1 + 100;
+ byte[] writeBuf = new byte[ourSize];
+ Arrays.fill(writeBuf, (byte) 1);
+ framedTransport.write(writeBuf);
+ framedTransport.flush();
+
+ assertEquals(ourSize + FRAME_HDR_SIZE,
underlyingTransport.getByteBuffer().position());
+ underlyingTransport.flip();
+ assertEquals(0, underlyingTransport.getByteBuffer().position());
+ assertEquals(ourSize + FRAME_HDR_SIZE,
underlyingTransport.getByteBuffer().limit());
+
+ byte[] readBuf = new byte[ourSize];
+ var e =
+ assertThrows(TTransportException.class, () ->
framedTransport.read(readBuf, 0, ourSize));
+ assertEquals("Frame size (" + ourSize + ") larger than max length (" + MB1
+ ")!",
+ e.getMessage());
+ }
+}
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/ThriftMaxFrameSizeIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/ThriftMaxFrameSizeIT.java
index 8b957d74f8..0a94d0a974 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/ThriftMaxFrameSizeIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/ThriftMaxFrameSizeIT.java
@@ -18,89 +18,128 @@
*/
package org.apache.accumulo.test.functional;
-import static
org.apache.accumulo.test.functional.ConfigurableMacBase.configureForSsl;
+import static org.apache.accumulo.harness.AccumuloITBase.MINI_CLUSTER_ONLY;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assumptions.assumeFalse;
import java.time.Duration;
import org.apache.accumulo.core.client.Accumulo;
-import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.server.rpc.ThriftServerType;
import org.apache.hadoop.conf.Configuration;
import org.apache.thrift.TConfiguration;
import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
-public class ThriftMaxFrameSizeIT extends AccumuloClusterHarness {
+@Tag(MINI_CLUSTER_ONLY)
+public class ThriftMaxFrameSizeIT {
private ThriftServerType serverType;
- @Override
- protected Duration defaultTimeout() {
- return Duration.ofMinutes(1);
- }
-
- @Override
- public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration
hadoopCoreSite) {
- cfg.setProperty(Property.GENERAL_RPC_SERVER_TYPE, serverType.name());
- if (serverType == ThriftServerType.SSL) {
- configureForSsl(cfg,
- getSslDir(createTestDir(this.getClass().getName() + "_" +
this.testName())));
- }
- }
+ // use something other than TConfiguration.DEFAULT_MAX_FRAME_SIZE to make
sure the override works
+ // small values seem to be insufficient for Accumulo, at least for this test
+ private static final int CONFIGURED_MAX_FRAME_SIZE = 32 * 1024 * 1024;
@Nested
- class TestDefault extends TestMaxFrameSize {
- TestDefault() {
+ class DefaultServerNestedIT extends TestMaxFrameSize {
+ DefaultServerNestedIT() {
serverType = ThriftServerType.getDefault();
}
}
@Nested
- class TestThreadedSelector extends TestMaxFrameSize {
- TestThreadedSelector() {
+ class ThreadedSelectorNestedIT extends TestMaxFrameSize {
+ ThreadedSelectorNestedIT() {
serverType = ThriftServerType.THREADED_SELECTOR;
}
}
@Nested
- class TestCustomHsHa extends TestMaxFrameSize {
- TestCustomHsHa() {
+ class CustomHsHaNestedIT extends TestMaxFrameSize {
+ CustomHsHaNestedIT() {
serverType = ThriftServerType.CUSTOM_HS_HA;
}
}
@Nested
- class TestThreadPool extends TestMaxFrameSize {
- TestThreadPool() {
+ class ThreadPoolNestedIT extends TestMaxFrameSize {
+ ThreadPoolNestedIT() {
serverType = ThriftServerType.THREADPOOL;
}
}
@Nested
- class TestSsl extends TestMaxFrameSize {
- TestSsl() {
- serverType = ThriftServerType.THREADPOOL;
+ class SslNestedIT extends TestMaxFrameSize {
+ SslNestedIT() {
+ serverType = ThriftServerType.SSL;
}
}
- protected abstract class TestMaxFrameSize {
+ protected abstract class TestMaxFrameSize extends ConfigurableMacBase {
- @Test
- public void testMaxFrameSizeLargerThanDefault() throws Exception {
+ @Override
+ protected Duration defaultTimeout() {
+ return Duration.ofMinutes(2);
+ }
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration
hadoopCoreSite) {
+ cfg.setNumTservers(1);
+ cfg.setProperty(Property.GENERAL_RPC_SERVER_TYPE, serverType.name());
+ String maxFrameSizeStr = Integer.toString(CONFIGURED_MAX_FRAME_SIZE);
+ cfg.setProperty(Property.GENERAL_MAX_MESSAGE_SIZE, maxFrameSizeStr);
+ cfg.setProperty(Property.TSERV_MAX_MESSAGE_SIZE, maxFrameSizeStr);
+ if (serverType == ThriftServerType.SSL) {
+ configureForSsl(cfg,
+ getSslDir(createTestDir(this.getClass().getName() + "_" +
this.testName())));
+ }
+ }
+
+ private void testWithSpecificSize(final int testSize) throws Exception {
// Ingest with a value width greater than the thrift default size to
verify our setting works
- // for max frame wize
- try (AccumuloClient accumuloClient =
Accumulo.newClient().from(getClientProps()).build()) {
- String table = getUniqueNames(1)[0];
- ReadWriteIT.ingest(accumuloClient, 1, 1,
TConfiguration.DEFAULT_MAX_FRAME_SIZE + 1, 0,
- table);
- ReadWriteIT.verify(accumuloClient, 1, 1,
TConfiguration.DEFAULT_MAX_FRAME_SIZE + 1, 0,
- table);
+ // for max frame size
+ try (var accumuloClient =
Accumulo.newClient().from(cluster.getClientProperties()).build()) {
+ String table = getUniqueNames(1)[0] + "_" + serverType.name();
+ ReadWriteIT.ingest(accumuloClient, 1, 1, testSize, 0, table);
+ ReadWriteIT.verify(accumuloClient, 1, 1, testSize, 0, table);
}
}
+
+ // Messages bigger than the default size, but smaller than the configured
max should work. This
+ // means that we successfully were able to override the default values.
+ @Test
+ public void testFrameSizeLessThanConfiguredMax() throws Exception {
+ // just use a size a little bigger than the default that would not work
unless the server
+ // configuration worked
+ int testSize = TConfiguration.DEFAULT_MAX_FRAME_SIZE + 100;
+ // just make sure it's less than what we set as the max, so we expect
this to work
+ assertTrue(testSize < CONFIGURED_MAX_FRAME_SIZE);
+ testWithSpecificSize(testSize);
+ }
+
+ // Messages bigger than the configured size should not work.
+ @Test
+ public void testFrameSizeGreaterThanConfiguredMax() throws Exception {
+ // ssl is weird seems to pass, at least for some values less than the
default max message size
+ // of 100MB; more troubleshooting might be needed to figure out how to
get max message
+ // configurability with ssl
+ assumeFalse(this instanceof SslNestedIT);
+
+ // just use a size a little bigger than the default that would not work
with the default
+ int testSize = CONFIGURED_MAX_FRAME_SIZE + 100;
+
+ // assume it hangs forever if it doesn't finish before the timeout
+ // if the timeout is too short, then we might get false negatives; in
other words, the test
+ // will still pass, but might not detect that the specific size
unexpectedly worked
+ assertThrows(AssertionError.class, () ->
assertTimeoutPreemptively(Duration.ofSeconds(15),
+ () -> testWithSpecificSize(testSize)));
+ }
+
}
}
diff --git a/test/src/main/java/org/apache/accumulo/test/util/Wait.java
b/test/src/main/java/org/apache/accumulo/test/util/Wait.java
index c7a470c70b..d73bd52866 100644
--- a/test/src/main/java/org/apache/accumulo/test/util/Wait.java
+++ b/test/src/main/java/org/apache/accumulo/test/util/Wait.java
@@ -38,7 +38,7 @@ public class Wait {
* @return the parsed value or the value from the onError function, if an
error occurred
*/
public static int getTimeoutFactor(ToIntFunction<NumberFormatException>
onError) {
- String timeoutString = System.getProperty("timeout.factor");
+ String timeoutString = System.getProperty("timeout.factor", "1");
try {
int factor = Integer.parseInt(timeoutString);
if (factor < 1) {