This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 662227405690dd2a8676841d952e354d82de76f4
Merge: 1772379b04 3409d958bb
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Thu Sep 7 16:50:40 2023 +0000

    Merge branch 'main' into elasticity

 .../TabletServerBatchReaderIterator.java           |  11 +-
 .../core/clientImpl/TabletServerBatchWriter.java   | 260 +++++++++++++++------
 .../apache/accumulo/core/clientImpl/Writer.java    | 125 ----------
 .../org/apache/accumulo/core/conf/Property.java    |  10 +-
 .../core/rpc/AccumuloTFramedTransportFactory.java  |  58 +++++
 .../org/apache/accumulo/core/rpc/ThriftUtil.java   |   7 +-
 .../apache/accumulo/core/rpc/ThriftUtilTest.java   | 132 +++++++++++
 .../accumulo/server/problems/ProblemReport.java    |  10 +-
 .../accumulo/server/problems/ProblemReports.java   |   5 +-
 .../server/tablets/UniqueNameAllocator.java        |  40 +++-
 .../accumulo/server/util/MetadataTableUtil.java    |  52 ++---
 .../accumulo/tserver/TabletClientHandler.java      |  97 ++++----
 .../org/apache/accumulo/test/BatchWriterIT.java    | 101 ++++++++
 .../accumulo/test/MetaConstraintRetryIT.java       |  13 +-
 .../apache/accumulo/test/WriteAfterCloseIT.java    | 203 ++++++++++++++++
 .../test/functional/ThriftMaxFrameSizeIT.java      | 115 ++++++---
 .../java/org/apache/accumulo/test/util/Wait.java   |   2 +-
 17 files changed, 900 insertions(+), 341 deletions(-)

diff --cc test/src/main/java/org/apache/accumulo/test/BatchWriterIT.java
index 242de8afbf,cd3a727ded..5c48549831
--- a/test/src/main/java/org/apache/accumulo/test/BatchWriterIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/BatchWriterIT.java
@@@ -22,13 -28,33 +28,36 @@@ import java.util.stream.Collectors
  
  import org.apache.accumulo.core.client.Accumulo;
  import org.apache.accumulo.core.client.AccumuloClient;
+ import org.apache.accumulo.core.client.AccumuloSecurityException;
  import org.apache.accumulo.core.client.BatchWriter;
  import org.apache.accumulo.core.client.BatchWriterConfig;
+ import org.apache.accumulo.core.client.admin.NewTableConfiguration;
++import org.apache.accumulo.core.client.admin.TabletHostingGoal;
+ import org.apache.accumulo.core.clientImpl.ClientContext;
 -import org.apache.accumulo.core.clientImpl.TabletLocator;
++import org.apache.accumulo.core.clientImpl.ClientTabletCache;
++import org.apache.accumulo.core.clientImpl.ClientTabletCache.CachedTablet;
++import org.apache.accumulo.core.clientImpl.ClientTabletCache.LocationNeed;
+ import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
+ import org.apache.accumulo.core.conf.Property;
+ import org.apache.accumulo.core.data.Key;
  import org.apache.accumulo.core.data.Mutation;
+ import org.apache.accumulo.core.data.TableId;
  import org.apache.accumulo.core.data.Value;
+ import org.apache.accumulo.core.dataImpl.KeyExtent;
+ import org.apache.accumulo.core.rpc.ThriftUtil;
+ import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
+ import 
org.apache.accumulo.core.tabletingest.thrift.ConstraintViolationException;
+ import org.apache.accumulo.core.tabletingest.thrift.TDurability;
+ import org.apache.accumulo.core.tabletingest.thrift.TabletIngestClientService;
+ import org.apache.accumulo.core.trace.TraceUtil;
  import org.apache.accumulo.harness.AccumuloClusterHarness;
+ import org.apache.accumulo.test.constraints.NumericValueConstraint;
+ import org.apache.hadoop.io.Text;
+ import org.apache.thrift.TServiceClient;
  import org.junit.jupiter.api.Test;
  
+ import com.google.common.net.HostAndPort;
+ 
  public class BatchWriterIT extends AccumuloClusterHarness {
  
    @Override
@@@ -52,4 -78,73 +81,76 @@@
      }
    }
  
+   private static void update(ClientContext context, Mutation m, KeyExtent 
extent) throws Exception {
+ 
 -    TabletLocator.TabletLocation tabLoc = TabletLocator.getLocator(context, 
extent.tableId())
 -        .locateTablet(context, new Text(m.getRow()), false, true);
++    ClientTabletCache ctc = ClientTabletCache.getInstance(context, 
extent.tableId());
++    ctc.invalidateCache();
++    CachedTablet tabLoc =
++        ctc.findTablet(context, new Text(m.getRow()), true, 
LocationNeed.REQUIRED);
+ 
 -    var server = HostAndPort.fromString(tabLoc.getTserverLocation());
++    var server = 
HostAndPort.fromString(tabLoc.getTserverLocation().orElseThrow());
+ 
+     TabletIngestClientService.Iface client = null;
+     try {
+       client = ThriftUtil.getClient(ThriftClientTypes.TABLET_INGEST, server, 
context);
+       client.update(TraceUtil.traceInfo(), context.rpcCreds(), 
extent.toThrift(), m.toThrift(),
+           TDurability.DEFAULT);
+     } catch (ThriftSecurityException e) {
+       throw new AccumuloSecurityException(e.user, e.code);
+     } finally {
+       ThriftUtil.returnClient((TServiceClient) client, context);
+     }
+   }
+ 
+   static String toString(Map.Entry<Key,Value> e) {
+     return e.getKey().getRow() + ":" + e.getKey().getColumnFamily() + ":"
+         + e.getKey().getColumnQualifier() + ":" + 
e.getKey().getColumnVisibility() + ":"
+         + e.getValue();
+   }
+ 
+   @Test
+   public void testSingleMutationWriteRPC() throws Exception {
+     // The batchwriter used to use this RPC and no longer does. This test 
exist to exercise the
+     // unused RPC until its removed in 3.x. Older client versions of Accumulo 
2.1.x may call this
+     // RPC.
+ 
+     String table = getUniqueNames(1)[0];
+     try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+       NewTableConfiguration ntc = new NewTableConfiguration();
+       ntc.setProperties(Map.of(Property.TABLE_CONSTRAINT_PREFIX.getKey() + 
"1",
+           NumericValueConstraint.class.getName()));
++      ntc.withInitialHostingGoal(TabletHostingGoal.ALWAYS);
+       c.tableOperations().create(table, ntc);
+ 
+       var tableId = TableId.of(c.tableOperations().tableIdMap().get(table));
+ 
+       Mutation m = new Mutation("r1");
+       m.put("f1", "q3", new Value("1"));
+       m.put("f1", "q4", new Value("2"));
+ 
+       update((ClientContext) c, m, new KeyExtent(tableId, null, null));
+ 
+       try (var scanner = c.createScanner(table)) {
+         var entries = 
scanner.stream().map(BatchWriterIT::toString).collect(Collectors.toList());
+         assertEquals(List.of("r1:f1:q3::1", "r1:f1:q4::2"), entries);
+       }
+ 
+       m = new Mutation("r1");
+       m.put("f1", "q3", new Value("5"));
+       m.put("f1", "q7", new Value("3"));
+ 
+       update((ClientContext) c, m, new KeyExtent(tableId, null, null));
+ 
+       try (var scanner = c.createScanner(table)) {
+         var entries = 
scanner.stream().map(BatchWriterIT::toString).collect(Collectors.toList());
+         assertEquals(List.of("r1:f1:q3::5", "r1:f1:q4::2", "r1:f1:q7::3"), 
entries);
+       }
+ 
+       var m2 = new Mutation("r2");
+       m2.put("f1", "q1", new Value("abc"));
+       assertThrows(ConstraintViolationException.class,
+           () -> update((ClientContext) c, m2, new KeyExtent(tableId, null, 
null)));
+     }
+ 
+   }
  }
diff --cc 
test/src/main/java/org/apache/accumulo/test/functional/ThriftMaxFrameSizeIT.java
index 8b957d74f8,0a94d0a974..d8ae2d2022
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/ThriftMaxFrameSizeIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/ThriftMaxFrameSizeIT.java
@@@ -86,21 -80,66 +80,66 @@@ public class ThriftMaxFrameSizeIT 
      }
    }
  
-   protected abstract class TestMaxFrameSize {
+   protected abstract class TestMaxFrameSize extends ConfigurableMacBase {
  
-     @Test
-     public void testMaxFrameSizeLargerThanDefault() throws Exception {
+     @Override
+     protected Duration defaultTimeout() {
+       return Duration.ofMinutes(2);
+     }
  
+     @Override
+     public void configure(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
 -      cfg.setNumTservers(1);
++      cfg.getClusterServerConfiguration().setNumDefaultTabletServers(1);
+       cfg.setProperty(Property.GENERAL_RPC_SERVER_TYPE, serverType.name());
+       String maxFrameSizeStr = Integer.toString(CONFIGURED_MAX_FRAME_SIZE);
+       cfg.setProperty(Property.GENERAL_MAX_MESSAGE_SIZE, maxFrameSizeStr);
+       cfg.setProperty(Property.TSERV_MAX_MESSAGE_SIZE, maxFrameSizeStr);
+       if (serverType == ThriftServerType.SSL) {
+         configureForSsl(cfg,
+             getSslDir(createTestDir(this.getClass().getName() + "_" + 
this.testName())));
+       }
+     }
+ 
+     private void testWithSpecificSize(final int testSize) throws Exception {
        // Ingest with a value width greater than the thrift default size to 
verify our setting works
-       // for max frame wize
-       try (AccumuloClient accumuloClient = 
Accumulo.newClient().from(getClientProps()).build()) {
-         String table = getUniqueNames(1)[0];
-         ReadWriteIT.ingest(accumuloClient, 1, 1, 
TConfiguration.DEFAULT_MAX_FRAME_SIZE + 1, 0,
-             table);
-         ReadWriteIT.verify(accumuloClient, 1, 1, 
TConfiguration.DEFAULT_MAX_FRAME_SIZE + 1, 0,
-             table);
+       // for max frame size
+       try (var accumuloClient = 
Accumulo.newClient().from(cluster.getClientProperties()).build()) {
+         String table = getUniqueNames(1)[0] + "_" + serverType.name();
+         ReadWriteIT.ingest(accumuloClient, 1, 1, testSize, 0, table);
+         ReadWriteIT.verify(accumuloClient, 1, 1, testSize, 0, table);
        }
      }
+ 
+     // Messages bigger than the default size, but smaller than the configured 
max should work. This
+     // means that we successfully were able to override the default values.
+     @Test
+     public void testFrameSizeLessThanConfiguredMax() throws Exception {
+       // just use a size a little bigger than the default that would not work 
unless the server
+       // configuration worked
+       int testSize = TConfiguration.DEFAULT_MAX_FRAME_SIZE + 100;
+       // just make sure it's less than what we set as the max, so we expect 
this to work
+       assertTrue(testSize < CONFIGURED_MAX_FRAME_SIZE);
+       testWithSpecificSize(testSize);
+     }
+ 
+     // Messages bigger than the configured size should not work.
+     @Test
+     public void testFrameSizeGreaterThanConfiguredMax() throws Exception {
+       // ssl is weird seems to pass, at least for some values less than the 
default max message size
+       // of 100MB; more troubleshooting might be needed to figure out how to 
get max message
+       // configurability with ssl
+       assumeFalse(this instanceof SslNestedIT);
+ 
+       // just use a size a little bigger than the default that would not work 
with the default
+       int testSize = CONFIGURED_MAX_FRAME_SIZE + 100;
+ 
+       // assume it hangs forever if it doesn't finish before the timeout
+       // if the timeout is too short, then we might get false negatives; in 
other words, the test
+       // will still pass, but might not detect that the specific size 
unexpectedly worked
+       assertThrows(AssertionError.class, () -> 
assertTimeoutPreemptively(Duration.ofSeconds(15),
+           () -> testWithSpecificSize(testSize)));
+     }
+ 
    }
  
  }

Reply via email to