This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new b1b2557f94 Use custom Transport Factory to set Transport message and frame size (#3737) b1b2557f94 is described below commit b1b2557f949e9212a1b1ca9b65f2d66c01a69edb Author: Dave Marion <dlmar...@apache.org> AuthorDate: Fri Sep 1 12:04:27 2023 -0400 Use custom Transport Factory to set Transport message and frame size (#3737) Use a custom TFramedTransport.Factory implementation so that when getTransport is called, the frame and message size are set on the underlying configuration. This is a workaround for https://issues.apache.org/jira/browse/THRIFT-5732 Fixes #3731 Also: * Throw EOFException when TTransportException type is END_OF_FILE * Refactor ThriftMaxFrameSizeIT to cover testing messages bigger and smaller than the configured value and also to use separate mini dirs for each test * Include stack trace in TabletServerBatchWriter log message for debugging * Add default value for timeout.factor in Wait class to avoid error message and 24 timeout default in IDEs when the system property isn't set --------- Co-authored-by: Christopher Tubbs <ctubb...@apache.org> --- .../TabletServerBatchReaderIterator.java | 11 +- .../core/clientImpl/TabletServerBatchWriter.java | 2 +- .../core/rpc/AccumuloTFramedTransportFactory.java | 58 +++++++++ .../org/apache/accumulo/core/rpc/ThriftUtil.java | 7 +- .../apache/accumulo/core/rpc/ThriftUtilTest.java | 132 +++++++++++++++++++++ .../test/functional/ThriftMaxFrameSizeIT.java | 115 ++++++++++++------ .../java/org/apache/accumulo/test/util/Wait.java | 2 +- 7 files changed, 282 insertions(+), 45 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java index 963a9f2c4a..b937f40a66 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java @@ -22,6 +22,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; +import java.io.EOFException; import java.io.IOException; import java.time.Duration; import java.util.AbstractMap.SimpleImmutableEntry; @@ -407,7 +408,8 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value failures.putAll(tsFailures); } } - + } catch (EOFException e) { + fatalException = e; } catch (IOException e) { if (!TabletServerBatchReaderIterator.this.queryThreadPool.isShutdown()) { synchronized (failures) { @@ -910,6 +912,13 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value } catch (TTransportException e) { log.debug("Server : {} msg : {}", server, e.getMessage()); timeoutTracker.errorOccured(); + if (e.getType() == TTransportException.END_OF_FILE) { + // END_OF_FILE is used in TEndpointTransport when the + // maxMessageSize has been reached. + EOFException eof = new EOFException(e.getMessage()); + eof.addSuppressed(e); + throw eof; + } throw new IOException(e); } catch (ThriftSecurityException e) { log.debug("Server : {} msg : {}", server, e.getMessage(), e); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java index de66339886..fadc408bea 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java @@ -916,7 +916,7 @@ public class TabletServerBatchWriter implements AutoCloseable { span.end(); } } catch (IOException e) { - log.debug("failed to send mutations to {} : {}", location, e.getMessage()); + log.debug("failed to send mutations to {}", location, e); HashSet<TableId> tables = new HashSet<>(); for (KeyExtent ke : mutationBatch.keySet()) { diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/AccumuloTFramedTransportFactory.java b/core/src/main/java/org/apache/accumulo/core/rpc/AccumuloTFramedTransportFactory.java new file mode 100644 index 0000000000..85882f500e --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/rpc/AccumuloTFramedTransportFactory.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.rpc; + +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.apache.thrift.transport.layered.TFramedTransport; + +/** + * This is a workaround for the issue reported in https://issues.apache.org/jira/browse/THRIFT-5732 + * and can be removed once that issue is fixed. + */ +public class AccumuloTFramedTransportFactory extends TFramedTransport.Factory { + + private final int maxMessageSize; + + public AccumuloTFramedTransportFactory(int maxMessageSize) { + super(maxMessageSize); + this.maxMessageSize = maxMessageSize; + } + + @Override + public TTransport getTransport(TTransport base) throws TTransportException { + // The input parameter "base" is typically going to be a TSocket implementation + // that represents a connection between two Accumulo endpoints (client-server, + // or server-server). The base transport has a maxMessageSize which defaults to + // 100MB. The FramedTransport that is created by this factory adds a header to + // the message with payload size information. The FramedTransport has a default + // frame size of 16MB, but the TFramedTransport constructor sets the frame size + // to the frame size set on the underlying transport ("base" in this case"). + // According to current Thrift docs, a message has to fit into 1 frame, so the + // frame size will be set to the value that is lower. Prior to this class being + // created, we were only setting the frame size, so messages were capped at 100MB + // because that's the default maxMessageSize. Here we are setting the maxMessageSize + // and maxFrameSize to the same value on the "base" transport so that when the + // TFramedTransport object is created, it ends up using the values that we want. + base.getConfiguration().setMaxFrameSize(maxMessageSize); + base.getConfiguration().setMaxMessageSize(maxMessageSize); + return super.getTransport(base); + } + +} diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java index d961188509..a448908f75 100644 --- a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java @@ -49,7 +49,6 @@ import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; import org.apache.thrift.transport.TTransportFactory; -import org.apache.thrift.transport.layered.TFramedTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,8 +62,8 @@ public class ThriftUtil { private static final Logger log = LoggerFactory.getLogger(ThriftUtil.class); private static final TraceProtocolFactory protocolFactory = new TraceProtocolFactory(); - private static final TFramedTransport.Factory transportFactory = - new TFramedTransport.Factory(Integer.MAX_VALUE); + private static final AccumuloTFramedTransportFactory transportFactory = + new AccumuloTFramedTransportFactory(Integer.MAX_VALUE); private static final Map<Integer,TTransportFactory> factoryCache = new HashMap<>(); public static final String GSSAPI = "GSSAPI", DIGEST_MD5 = "DIGEST-MD5"; @@ -186,7 +185,7 @@ public class ThriftUtil { int maxFrameSize1 = (int) maxFrameSize; TTransportFactory factory = factoryCache.get(maxFrameSize1); if (factory == null) { - factory = new TFramedTransport.Factory(maxFrameSize1); + factory = new AccumuloTFramedTransportFactory(maxFrameSize1); factoryCache.put(maxFrameSize1, factory); } return factory; diff --git a/core/src/test/java/org/apache/accumulo/core/rpc/ThriftUtilTest.java b/core/src/test/java/org/apache/accumulo/core/rpc/ThriftUtilTest.java new file mode 100644 index 0000000000..dd3df43eb7 --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/rpc/ThriftUtilTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.rpc; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +import org.apache.thrift.transport.TByteBuffer; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.apache.thrift.transport.layered.TFramedTransport; +import org.junit.jupiter.api.Test; + +public class ThriftUtilTest { + + public static final int FRAME_HDR_SIZE = 4; + public static final int MB1 = 1 * 1024 * 1024; + public static final int MB10 = 10 * 1024 * 1024; + public static final int MB100 = 100 * 1024 * 1024; + public static final int GB = 1 * 1024 * 1024 * 1024; + + @Test + public void testDefaultTFramedTransportFactory() throws TTransportException { + + // This test confirms that the default maxMessageSize in Thrift is 100MB + // even when we set the frame size to be 1GB + + TByteBuffer underlyingTransport = new TByteBuffer(ByteBuffer.allocate(1024)); + + TFramedTransport.Factory factory = new TFramedTransport.Factory(GB); + TTransport framedTransport = factory.getTransport(underlyingTransport); + + assertEquals(framedTransport.getConfiguration().getMaxFrameSize(), GB); + assertEquals(framedTransport.getConfiguration().getMaxMessageSize(), MB100); + } + + @Test + public void testAccumuloTFramedTransportFactory() throws TTransportException { + + // This test confirms that our custom FramedTransportFactory sets the max + // message size and max frame size to the value that we want. + + TByteBuffer underlyingTransport = new TByteBuffer(ByteBuffer.allocate(1024)); + + AccumuloTFramedTransportFactory factory = new AccumuloTFramedTransportFactory(GB); + TTransport framedTransport = factory.getTransport(underlyingTransport); + + assertEquals(framedTransport.getConfiguration().getMaxFrameSize(), GB); + assertEquals(framedTransport.getConfiguration().getMaxMessageSize(), GB); + } + + @Test + public void testMessageSizeReadWriteSuccess() throws Exception { + + // This test creates an 10MB buffer in memory as the underlying transport, then + // creates a TFramedTransport with a 1MB maxFrameSize and maxMessageSize. It then + // writes 1MB - 4 bytes (to account for the frame header) to the transport and + // reads the data back out. + + TByteBuffer underlyingTransport = new TByteBuffer(ByteBuffer.allocate(MB10)); + AccumuloTFramedTransportFactory factory = new AccumuloTFramedTransportFactory(MB1); + TTransport framedTransport = factory.getTransport(underlyingTransport); + assertEquals(framedTransport.getConfiguration().getMaxFrameSize(), MB1); + assertEquals(framedTransport.getConfiguration().getMaxMessageSize(), MB1); + + byte[] writeBuf = new byte[MB1 - FRAME_HDR_SIZE]; + Arrays.fill(writeBuf, (byte) 1); + framedTransport.write(writeBuf); + framedTransport.flush(); + + assertEquals(MB1, underlyingTransport.getByteBuffer().position()); + underlyingTransport.flip(); + assertEquals(0, underlyingTransport.getByteBuffer().position()); + assertEquals(MB1, underlyingTransport.getByteBuffer().limit()); + + byte[] readBuf = new byte[MB1]; + framedTransport.read(readBuf, 0, MB1); + } + + @Test + public void testMessageSizeWriteFailure() throws Exception { + + // This test creates an 10MB buffer in memory as the underlying transport, then + // creates a TFramedTransport with a 1MB maxFrameSize and maxMessageSize. It then + // writes 1MB + 100 bytes to the transport, which fails as it's larger than the + // configured frame and message size. + + TByteBuffer underlyingTransport = new TByteBuffer(ByteBuffer.allocate(MB10)); + AccumuloTFramedTransportFactory factory = new AccumuloTFramedTransportFactory(MB1); + TTransport framedTransport = factory.getTransport(underlyingTransport); + assertEquals(framedTransport.getConfiguration().getMaxFrameSize(), MB1); + assertEquals(framedTransport.getConfiguration().getMaxMessageSize(), MB1); + + // Write more than 1MB to the TByteBuffer, it's possible to write more data + // than allowed by the frame, it's enforced on the read. + final int ourSize = MB1 + 100; + byte[] writeBuf = new byte[ourSize]; + Arrays.fill(writeBuf, (byte) 1); + framedTransport.write(writeBuf); + framedTransport.flush(); + + assertEquals(ourSize + FRAME_HDR_SIZE, underlyingTransport.getByteBuffer().position()); + underlyingTransport.flip(); + assertEquals(0, underlyingTransport.getByteBuffer().position()); + assertEquals(ourSize + FRAME_HDR_SIZE, underlyingTransport.getByteBuffer().limit()); + + byte[] readBuf = new byte[ourSize]; + var e = + assertThrows(TTransportException.class, () -> framedTransport.read(readBuf, 0, ourSize)); + assertEquals("Frame size (" + ourSize + ") larger than max length (" + MB1 + ")!", + e.getMessage()); + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ThriftMaxFrameSizeIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ThriftMaxFrameSizeIT.java index 8b957d74f8..0a94d0a974 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ThriftMaxFrameSizeIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ThriftMaxFrameSizeIT.java @@ -18,89 +18,128 @@ */ package org.apache.accumulo.test.functional; -import static org.apache.accumulo.test.functional.ConfigurableMacBase.configureForSsl; +import static org.apache.accumulo.harness.AccumuloITBase.MINI_CLUSTER_ONLY; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assumptions.assumeFalse; import java.time.Duration; import org.apache.accumulo.core.client.Accumulo; -import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.server.rpc.ThriftServerType; import org.apache.hadoop.conf.Configuration; import org.apache.thrift.TConfiguration; import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; -public class ThriftMaxFrameSizeIT extends AccumuloClusterHarness { +@Tag(MINI_CLUSTER_ONLY) +public class ThriftMaxFrameSizeIT { private ThriftServerType serverType; - @Override - protected Duration defaultTimeout() { - return Duration.ofMinutes(1); - } - - @Override - public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - cfg.setProperty(Property.GENERAL_RPC_SERVER_TYPE, serverType.name()); - if (serverType == ThriftServerType.SSL) { - configureForSsl(cfg, - getSslDir(createTestDir(this.getClass().getName() + "_" + this.testName()))); - } - } + // use something other than TConfiguration.DEFAULT_MAX_FRAME_SIZE to make sure the override works + // small values seem to be insufficient for Accumulo, at least for this test + private static final int CONFIGURED_MAX_FRAME_SIZE = 32 * 1024 * 1024; @Nested - class TestDefault extends TestMaxFrameSize { - TestDefault() { + class DefaultServerNestedIT extends TestMaxFrameSize { + DefaultServerNestedIT() { serverType = ThriftServerType.getDefault(); } } @Nested - class TestThreadedSelector extends TestMaxFrameSize { - TestThreadedSelector() { + class ThreadedSelectorNestedIT extends TestMaxFrameSize { + ThreadedSelectorNestedIT() { serverType = ThriftServerType.THREADED_SELECTOR; } } @Nested - class TestCustomHsHa extends TestMaxFrameSize { - TestCustomHsHa() { + class CustomHsHaNestedIT extends TestMaxFrameSize { + CustomHsHaNestedIT() { serverType = ThriftServerType.CUSTOM_HS_HA; } } @Nested - class TestThreadPool extends TestMaxFrameSize { - TestThreadPool() { + class ThreadPoolNestedIT extends TestMaxFrameSize { + ThreadPoolNestedIT() { serverType = ThriftServerType.THREADPOOL; } } @Nested - class TestSsl extends TestMaxFrameSize { - TestSsl() { - serverType = ThriftServerType.THREADPOOL; + class SslNestedIT extends TestMaxFrameSize { + SslNestedIT() { + serverType = ThriftServerType.SSL; } } - protected abstract class TestMaxFrameSize { + protected abstract class TestMaxFrameSize extends ConfigurableMacBase { - @Test - public void testMaxFrameSizeLargerThanDefault() throws Exception { + @Override + protected Duration defaultTimeout() { + return Duration.ofMinutes(2); + } + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setNumTservers(1); + cfg.setProperty(Property.GENERAL_RPC_SERVER_TYPE, serverType.name()); + String maxFrameSizeStr = Integer.toString(CONFIGURED_MAX_FRAME_SIZE); + cfg.setProperty(Property.GENERAL_MAX_MESSAGE_SIZE, maxFrameSizeStr); + cfg.setProperty(Property.TSERV_MAX_MESSAGE_SIZE, maxFrameSizeStr); + if (serverType == ThriftServerType.SSL) { + configureForSsl(cfg, + getSslDir(createTestDir(this.getClass().getName() + "_" + this.testName()))); + } + } + + private void testWithSpecificSize(final int testSize) throws Exception { // Ingest with a value width greater than the thrift default size to verify our setting works - // for max frame wize - try (AccumuloClient accumuloClient = Accumulo.newClient().from(getClientProps()).build()) { - String table = getUniqueNames(1)[0]; - ReadWriteIT.ingest(accumuloClient, 1, 1, TConfiguration.DEFAULT_MAX_FRAME_SIZE + 1, 0, - table); - ReadWriteIT.verify(accumuloClient, 1, 1, TConfiguration.DEFAULT_MAX_FRAME_SIZE + 1, 0, - table); + // for max frame size + try (var accumuloClient = Accumulo.newClient().from(cluster.getClientProperties()).build()) { + String table = getUniqueNames(1)[0] + "_" + serverType.name(); + ReadWriteIT.ingest(accumuloClient, 1, 1, testSize, 0, table); + ReadWriteIT.verify(accumuloClient, 1, 1, testSize, 0, table); } } + + // Messages bigger than the default size, but smaller than the configured max should work. This + // means that we successfully were able to override the default values. + @Test + public void testFrameSizeLessThanConfiguredMax() throws Exception { + // just use a size a little bigger than the default that would not work unless the server + // configuration worked + int testSize = TConfiguration.DEFAULT_MAX_FRAME_SIZE + 100; + // just make sure it's less than what we set as the max, so we expect this to work + assertTrue(testSize < CONFIGURED_MAX_FRAME_SIZE); + testWithSpecificSize(testSize); + } + + // Messages bigger than the configured size should not work. + @Test + public void testFrameSizeGreaterThanConfiguredMax() throws Exception { + // ssl is weird seems to pass, at least for some values less than the default max message size + // of 100MB; more troubleshooting might be needed to figure out how to get max message + // configurability with ssl + assumeFalse(this instanceof SslNestedIT); + + // just use a size a little bigger than the default that would not work with the default + int testSize = CONFIGURED_MAX_FRAME_SIZE + 100; + + // assume it hangs forever if it doesn't finish before the timeout + // if the timeout is too short, then we might get false negatives; in other words, the test + // will still pass, but might not detect that the specific size unexpectedly worked + assertThrows(AssertionError.class, () -> assertTimeoutPreemptively(Duration.ofSeconds(15), + () -> testWithSpecificSize(testSize))); + } + } } diff --git a/test/src/main/java/org/apache/accumulo/test/util/Wait.java b/test/src/main/java/org/apache/accumulo/test/util/Wait.java index c7a470c70b..d73bd52866 100644 --- a/test/src/main/java/org/apache/accumulo/test/util/Wait.java +++ b/test/src/main/java/org/apache/accumulo/test/util/Wait.java @@ -38,7 +38,7 @@ public class Wait { * @return the parsed value or the value from the onError function, if an error occurred */ public static int getTimeoutFactor(ToIntFunction<NumberFormatException> onError) { - String timeoutString = System.getProperty("timeout.factor"); + String timeoutString = System.getProperty("timeout.factor", "1"); try { int factor = Integer.parseInt(timeoutString); if (factor < 1) {