This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 662227405690dd2a8676841d952e354d82de76f4 Merge: 1772379b04 3409d958bb Author: Dave Marion <dlmar...@apache.org> AuthorDate: Thu Sep 7 16:50:40 2023 +0000 Merge branch 'main' into elasticity .../TabletServerBatchReaderIterator.java | 11 +- .../core/clientImpl/TabletServerBatchWriter.java | 260 +++++++++++++++------ .../apache/accumulo/core/clientImpl/Writer.java | 125 ---------- .../org/apache/accumulo/core/conf/Property.java | 10 +- .../core/rpc/AccumuloTFramedTransportFactory.java | 58 +++++ .../org/apache/accumulo/core/rpc/ThriftUtil.java | 7 +- .../apache/accumulo/core/rpc/ThriftUtilTest.java | 132 +++++++++++ .../accumulo/server/problems/ProblemReport.java | 10 +- .../accumulo/server/problems/ProblemReports.java | 5 +- .../server/tablets/UniqueNameAllocator.java | 40 +++- .../accumulo/server/util/MetadataTableUtil.java | 52 ++--- .../accumulo/tserver/TabletClientHandler.java | 97 ++++---- .../org/apache/accumulo/test/BatchWriterIT.java | 101 ++++++++ .../accumulo/test/MetaConstraintRetryIT.java | 13 +- .../apache/accumulo/test/WriteAfterCloseIT.java | 203 ++++++++++++++++ .../test/functional/ThriftMaxFrameSizeIT.java | 115 ++++++--- .../java/org/apache/accumulo/test/util/Wait.java | 2 +- 17 files changed, 900 insertions(+), 341 deletions(-) diff --cc test/src/main/java/org/apache/accumulo/test/BatchWriterIT.java index 242de8afbf,cd3a727ded..5c48549831 --- a/test/src/main/java/org/apache/accumulo/test/BatchWriterIT.java +++ b/test/src/main/java/org/apache/accumulo/test/BatchWriterIT.java @@@ -22,13 -28,33 +28,36 @@@ import java.util.stream.Collectors import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; + import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; + import org.apache.accumulo.core.client.admin.NewTableConfiguration; ++import org.apache.accumulo.core.client.admin.TabletHostingGoal; + import org.apache.accumulo.core.clientImpl.ClientContext; -import org.apache.accumulo.core.clientImpl.TabletLocator; ++import org.apache.accumulo.core.clientImpl.ClientTabletCache; ++import org.apache.accumulo.core.clientImpl.ClientTabletCache.CachedTablet; ++import org.apache.accumulo.core.clientImpl.ClientTabletCache.LocationNeed; + import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; + import org.apache.accumulo.core.conf.Property; + import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; + import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; + import org.apache.accumulo.core.dataImpl.KeyExtent; + import org.apache.accumulo.core.rpc.ThriftUtil; + import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; + import org.apache.accumulo.core.tabletingest.thrift.ConstraintViolationException; + import org.apache.accumulo.core.tabletingest.thrift.TDurability; + import org.apache.accumulo.core.tabletingest.thrift.TabletIngestClientService; + import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.harness.AccumuloClusterHarness; + import org.apache.accumulo.test.constraints.NumericValueConstraint; + import org.apache.hadoop.io.Text; + import org.apache.thrift.TServiceClient; import org.junit.jupiter.api.Test; + import com.google.common.net.HostAndPort; + public class BatchWriterIT extends AccumuloClusterHarness { @Override @@@ -52,4 -78,73 +81,76 @@@ } } + private static void update(ClientContext context, Mutation m, KeyExtent extent) throws Exception { + - TabletLocator.TabletLocation tabLoc = TabletLocator.getLocator(context, extent.tableId()) - .locateTablet(context, new Text(m.getRow()), false, true); ++ ClientTabletCache ctc = ClientTabletCache.getInstance(context, extent.tableId()); ++ ctc.invalidateCache(); ++ CachedTablet tabLoc = ++ ctc.findTablet(context, new Text(m.getRow()), true, LocationNeed.REQUIRED); + - var server = HostAndPort.fromString(tabLoc.getTserverLocation()); ++ var server = HostAndPort.fromString(tabLoc.getTserverLocation().orElseThrow()); + + TabletIngestClientService.Iface client = null; + try { + client = ThriftUtil.getClient(ThriftClientTypes.TABLET_INGEST, server, context); + client.update(TraceUtil.traceInfo(), context.rpcCreds(), extent.toThrift(), m.toThrift(), + TDurability.DEFAULT); + } catch (ThriftSecurityException e) { + throw new AccumuloSecurityException(e.user, e.code); + } finally { + ThriftUtil.returnClient((TServiceClient) client, context); + } + } + + static String toString(Map.Entry<Key,Value> e) { + return e.getKey().getRow() + ":" + e.getKey().getColumnFamily() + ":" + + e.getKey().getColumnQualifier() + ":" + e.getKey().getColumnVisibility() + ":" + + e.getValue(); + } + + @Test + public void testSingleMutationWriteRPC() throws Exception { + // The batchwriter used to use this RPC and no longer does. This test exist to exercise the + // unused RPC until its removed in 3.x. Older client versions of Accumulo 2.1.x may call this + // RPC. + + String table = getUniqueNames(1)[0]; + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + NewTableConfiguration ntc = new NewTableConfiguration(); + ntc.setProperties(Map.of(Property.TABLE_CONSTRAINT_PREFIX.getKey() + "1", + NumericValueConstraint.class.getName())); ++ ntc.withInitialHostingGoal(TabletHostingGoal.ALWAYS); + c.tableOperations().create(table, ntc); + + var tableId = TableId.of(c.tableOperations().tableIdMap().get(table)); + + Mutation m = new Mutation("r1"); + m.put("f1", "q3", new Value("1")); + m.put("f1", "q4", new Value("2")); + + update((ClientContext) c, m, new KeyExtent(tableId, null, null)); + + try (var scanner = c.createScanner(table)) { + var entries = scanner.stream().map(BatchWriterIT::toString).collect(Collectors.toList()); + assertEquals(List.of("r1:f1:q3::1", "r1:f1:q4::2"), entries); + } + + m = new Mutation("r1"); + m.put("f1", "q3", new Value("5")); + m.put("f1", "q7", new Value("3")); + + update((ClientContext) c, m, new KeyExtent(tableId, null, null)); + + try (var scanner = c.createScanner(table)) { + var entries = scanner.stream().map(BatchWriterIT::toString).collect(Collectors.toList()); + assertEquals(List.of("r1:f1:q3::5", "r1:f1:q4::2", "r1:f1:q7::3"), entries); + } + + var m2 = new Mutation("r2"); + m2.put("f1", "q1", new Value("abc")); + assertThrows(ConstraintViolationException.class, + () -> update((ClientContext) c, m2, new KeyExtent(tableId, null, null))); + } + + } } diff --cc test/src/main/java/org/apache/accumulo/test/functional/ThriftMaxFrameSizeIT.java index 8b957d74f8,0a94d0a974..d8ae2d2022 --- a/test/src/main/java/org/apache/accumulo/test/functional/ThriftMaxFrameSizeIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ThriftMaxFrameSizeIT.java @@@ -86,21 -80,66 +80,66 @@@ public class ThriftMaxFrameSizeIT } } - protected abstract class TestMaxFrameSize { + protected abstract class TestMaxFrameSize extends ConfigurableMacBase { - @Test - public void testMaxFrameSizeLargerThanDefault() throws Exception { + @Override + protected Duration defaultTimeout() { + return Duration.ofMinutes(2); + } + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - cfg.setNumTservers(1); ++ cfg.getClusterServerConfiguration().setNumDefaultTabletServers(1); + cfg.setProperty(Property.GENERAL_RPC_SERVER_TYPE, serverType.name()); + String maxFrameSizeStr = Integer.toString(CONFIGURED_MAX_FRAME_SIZE); + cfg.setProperty(Property.GENERAL_MAX_MESSAGE_SIZE, maxFrameSizeStr); + cfg.setProperty(Property.TSERV_MAX_MESSAGE_SIZE, maxFrameSizeStr); + if (serverType == ThriftServerType.SSL) { + configureForSsl(cfg, + getSslDir(createTestDir(this.getClass().getName() + "_" + this.testName()))); + } + } + + private void testWithSpecificSize(final int testSize) throws Exception { // Ingest with a value width greater than the thrift default size to verify our setting works - // for max frame wize - try (AccumuloClient accumuloClient = Accumulo.newClient().from(getClientProps()).build()) { - String table = getUniqueNames(1)[0]; - ReadWriteIT.ingest(accumuloClient, 1, 1, TConfiguration.DEFAULT_MAX_FRAME_SIZE + 1, 0, - table); - ReadWriteIT.verify(accumuloClient, 1, 1, TConfiguration.DEFAULT_MAX_FRAME_SIZE + 1, 0, - table); + // for max frame size + try (var accumuloClient = Accumulo.newClient().from(cluster.getClientProperties()).build()) { + String table = getUniqueNames(1)[0] + "_" + serverType.name(); + ReadWriteIT.ingest(accumuloClient, 1, 1, testSize, 0, table); + ReadWriteIT.verify(accumuloClient, 1, 1, testSize, 0, table); } } + + // Messages bigger than the default size, but smaller than the configured max should work. This + // means that we successfully were able to override the default values. + @Test + public void testFrameSizeLessThanConfiguredMax() throws Exception { + // just use a size a little bigger than the default that would not work unless the server + // configuration worked + int testSize = TConfiguration.DEFAULT_MAX_FRAME_SIZE + 100; + // just make sure it's less than what we set as the max, so we expect this to work + assertTrue(testSize < CONFIGURED_MAX_FRAME_SIZE); + testWithSpecificSize(testSize); + } + + // Messages bigger than the configured size should not work. + @Test + public void testFrameSizeGreaterThanConfiguredMax() throws Exception { + // ssl is weird seems to pass, at least for some values less than the default max message size + // of 100MB; more troubleshooting might be needed to figure out how to get max message + // configurability with ssl + assumeFalse(this instanceof SslNestedIT); + + // just use a size a little bigger than the default that would not work with the default + int testSize = CONFIGURED_MAX_FRAME_SIZE + 100; + + // assume it hangs forever if it doesn't finish before the timeout + // if the timeout is too short, then we might get false negatives; in other words, the test + // will still pass, but might not detect that the specific size unexpectedly worked + assertThrows(AssertionError.class, () -> assertTimeoutPreemptively(Duration.ofSeconds(15), + () -> testWithSpecificSize(testSize))); + } + } }