This is an automated email from the ASF dual-hosted git repository.

krathbun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new f682971dd0 Added Eventual Scan test in ComprehensiveIT (#4754)
f682971dd0 is described below

commit f682971dd0c90e6512061b5f00b6d83424140962
Author: Arbaaz Khan <[email protected]>
AuthorDate: Thu Aug 28 17:22:37 2025 -0400

    Added Eventual Scan test in ComprehensiveIT (#4754)
    
    * Added Eventual Scan test
---
 .../accumulo/test/ComprehensiveFlakyAmpleIT.java   | 21 +++++--
 .../accumulo/test/ComprehensiveFlakyFateIT.java    | 17 +++++-
 ...iveIT_SimpleSuite.java => ComprehensiveIT.java} | 20 ++++++-
 .../apache/accumulo/test/ComprehensiveITBase.java  | 68 +++++++++++++++++++++-
 4 files changed, 116 insertions(+), 10 deletions(-)

diff --git 
a/test/src/main/java/org/apache/accumulo/test/ComprehensiveFlakyAmpleIT.java 
b/test/src/main/java/org/apache/accumulo/test/ComprehensiveFlakyAmpleIT.java
index 100985e34e..71429006ff 100644
--- a/test/src/main/java/org/apache/accumulo/test/ComprehensiveFlakyAmpleIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ComprehensiveFlakyAmpleIT.java
@@ -20,8 +20,11 @@ package org.apache.accumulo.test;
 
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
 import org.apache.accumulo.harness.SharedMiniClusterBase;
 import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.test.ample.FlakyAmpleManager;
 import org.apache.accumulo.test.ample.FlakyAmpleTserver;
 import org.junit.jupiter.api.AfterAll;
@@ -33,12 +36,22 @@ import org.junit.jupiter.api.BeforeAll;
  * metadata updates using Ample.
  */
 public class ComprehensiveFlakyAmpleIT extends ComprehensiveITBase {
-  @BeforeAll
-  public static void setup() throws Exception {
-    SharedMiniClusterBase.startMiniClusterWithConfig((cfg, coreSite) -> {
+
+  private static class ComprehensiveFlakyAmpleITConfiguration
+      implements MiniClusterConfigurationCallback {
+    @Override
+    public void configureMiniCluster(MiniAccumuloConfigImpl cfg,
+        org.apache.hadoop.conf.Configuration coreSite) {
+      cfg.setProperty(Property.SSERV_CACHED_TABLET_METADATA_EXPIRATION, "5s");
       cfg.setServerClass(ServerType.MANAGER, r -> FlakyAmpleManager.class);
       cfg.setServerClass(ServerType.TABLET_SERVER, r -> 
FlakyAmpleTserver.class);
-    });
+    }
+  }
+
+  @BeforeAll
+  public static void setup() throws Exception {
+    ComprehensiveFlakyAmpleITConfiguration c = new 
ComprehensiveFlakyAmpleITConfiguration();
+    SharedMiniClusterBase.startMiniClusterWithConfig(c);
 
     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
       client.securityOperations().changeUserAuthorizations("root", 
AUTHORIZATIONS);
diff --git 
a/test/src/main/java/org/apache/accumulo/test/ComprehensiveFlakyFateIT.java 
b/test/src/main/java/org/apache/accumulo/test/ComprehensiveFlakyFateIT.java
index 7c31118d20..0e4691ea30 100644
--- a/test/src/main/java/org/apache/accumulo/test/ComprehensiveFlakyFateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ComprehensiveFlakyFateIT.java
@@ -20,8 +20,11 @@ package org.apache.accumulo.test;
 
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
 import org.apache.accumulo.harness.SharedMiniClusterBase;
 import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.test.fate.FlakyFateManager;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
@@ -31,10 +34,20 @@ import org.junit.jupiter.api.BeforeAll;
  * {@link org.apache.accumulo.test.fate.FlakyFate} because it will run a lot 
of FATE operations.
  */
 public class ComprehensiveFlakyFateIT extends ComprehensiveITBase {
+  private static class ComprehensiveFlakyFateITConfiguration
+      implements MiniClusterConfigurationCallback {
+    @Override
+    public void configureMiniCluster(MiniAccumuloConfigImpl cfg,
+        org.apache.hadoop.conf.Configuration coreSite) {
+      cfg.setProperty(Property.SSERV_CACHED_TABLET_METADATA_EXPIRATION, "5s");
+      cfg.setServerClass(ServerType.MANAGER, r -> FlakyFateManager.class);
+    }
+  }
+
   @BeforeAll
   public static void setup() throws Exception {
-    SharedMiniClusterBase.startMiniClusterWithConfig(
-        (cfg, coreSite) -> cfg.setServerClass(ServerType.MANAGER, r -> 
FlakyFateManager.class));
+    ComprehensiveFlakyFateITConfiguration c = new 
ComprehensiveFlakyFateITConfiguration();
+    SharedMiniClusterBase.startMiniClusterWithConfig(c);
 
     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
       client.securityOperations().changeUserAuthorizations("root", 
AUTHORIZATIONS);
diff --git 
a/test/src/main/java/org/apache/accumulo/test/ComprehensiveIT_SimpleSuite.java 
b/test/src/main/java/org/apache/accumulo/test/ComprehensiveIT.java
similarity index 64%
rename from 
test/src/main/java/org/apache/accumulo/test/ComprehensiveIT_SimpleSuite.java
rename to test/src/main/java/org/apache/accumulo/test/ComprehensiveIT.java
index 0cd25ea302..3ffb5cf003 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/ComprehensiveIT_SimpleSuite.java
+++ b/test/src/main/java/org/apache/accumulo/test/ComprehensiveIT.java
@@ -19,20 +19,34 @@
 package org.apache.accumulo.test;
 
 import static org.apache.accumulo.harness.AccumuloITBase.SUNNY_DAY;
+import static org.apache.accumulo.harness.SharedMiniClusterBase.getClientProps;
+import static org.apache.accumulo.test.ComprehensiveITBase.AUTHORIZATIONS;
 
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
 import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Tag;
 
 @Tag(SUNNY_DAY)
-public class ComprehensiveIT_SimpleSuite extends ComprehensiveITBase {
+public class ComprehensiveIT extends ComprehensiveITBase {
+
+  private static class ComprehensiveITConfiguration implements 
MiniClusterConfigurationCallback {
+    @Override
+    public void configureMiniCluster(MiniAccumuloConfigImpl cfg,
+        org.apache.hadoop.conf.Configuration coreSite) {
+      cfg.setProperty(Property.SSERV_CACHED_TABLET_METADATA_EXPIRATION, "5s");
+    }
+  }
+
   @BeforeAll
   public static void setup() throws Exception {
-    SharedMiniClusterBase.startMiniCluster();
-
+    ComprehensiveITConfiguration c = new ComprehensiveITConfiguration();
+    SharedMiniClusterBase.startMiniClusterWithConfig(c);
     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
       client.securityOperations().changeUserAuthorizations("root", 
AUTHORIZATIONS);
     }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/ComprehensiveITBase.java 
b/test/src/main/java/org/apache/accumulo/test/ComprehensiveITBase.java
index 9517229ba2..a6406a7556 100644
--- a/test/src/main/java/org/apache/accumulo/test/ComprehensiveITBase.java
+++ b/test/src/main/java/org/apache/accumulo/test/ComprehensiveITBase.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.test;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.stream.Collectors.toMap;
+import static 
org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel.EVENTUAL;
 import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -33,10 +34,12 @@ import java.io.InputStreamReader;
 import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedSet;
@@ -77,6 +80,7 @@ import 
org.apache.accumulo.core.client.security.SecurityErrorCode;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.client.summary.CountingSummarizer;
 import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.core.conf.ClientProperty;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.ColumnUpdate;
 import org.apache.accumulo.core.data.Condition;
@@ -97,7 +101,9 @@ import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.accumulo.core.security.NamespacePermission;
 import org.apache.accumulo.core.security.SystemPermission;
 import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.spi.scan.ConfigurableScanServerSelector;
 import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.test.util.Wait;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
@@ -119,7 +125,67 @@ public abstract class ComprehensiveITBase extends 
SharedMiniClusterBase {
   public static final String DOG_AND_CAT = "DOG&CAT";
   static final Authorizations AUTHORIZATIONS = new Authorizations("CAT", 
"DOG");
 
-  private static final Logger log = 
LoggerFactory.getLogger(ComprehensiveIT_SimpleSuite.class);
+  private static final Logger log = 
LoggerFactory.getLogger(ComprehensiveIT.class);
+
+  @Test
+  public void testEventualScan() throws Exception {
+    Properties props = new Properties();
+    props.putAll(getClientProps());
+    props.put(ClientProperty.SCAN_SERVER_SELECTOR.getKey(),
+        ConfigurableScanServerSelector.class.getName());
+    props.put(ClientProperty.SCAN_SERVER_SELECTOR_OPTS_PREFIX.getKey() + 
"profiles",
+        "[{'isDefault':true,'timeToWaitForScanServers' : 
'10d','maxBusyTimeout':'5m','busyTimeoutMultiplier':4,'attemptPlans':[{\"servers\":\"100%\",
 \"busyTimeout\":\"3ms\"}]}]");
+
+    try (AccumuloClient client = Accumulo.newClient().from(props).build()) {
+      String table = getUniqueNames(1)[0];
+      client.tableOperations().create(table);
+      getCluster().getClusterControl().start(ServerType.SCAN_SERVER);
+      Wait.waitFor(
+          () -> 
!client.instanceOperations().getServers(ServerId.Type.SCAN_SERVER).isEmpty());
+
+      write(client, table, generateMutations(0, 100, tr -> true));
+      verifyData(client, table, AUTHORIZATIONS, generateKeys(0, 100), scanner 
-> {});
+      verifyData(client, table, AUTHORIZATIONS, Collections.emptySortedMap(),
+          scanner -> scanner.setConsistencyLevel(EVENTUAL));
+
+      client.tableOperations().flush(table, null, null, true);
+      Wait.waitFor(() -> {
+        try (var scanner = client.createScanner(table, AUTHORIZATIONS)) {
+          scanner.setConsistencyLevel(EVENTUAL);
+          return scan(scanner).size() >= 100;
+        }
+      });
+
+      // should see all data that was flushed in eventual scan
+      verifyData(client, table, AUTHORIZATIONS, generateKeys(0, 100),
+          scanner -> scanner.setConsistencyLevel(EVENTUAL));
+      // should not see data with col vis set
+      verifyData(client, table, Authorizations.EMPTY, generateKeys(0, 100, tr 
-> tr.vis.isEmpty()),
+          scanner -> scanner.setConsistencyLevel(EVENTUAL));
+
+      // write some more rows after 100 and verify those are not seen by 
eventual scan until table
+      // is flushed.
+      write(client, table, generateMutations(100, 200, tr -> true));
+      verifyData(client, table, AUTHORIZATIONS, generateKeys(0, 100),
+          scanner -> scanner.setConsistencyLevel(EVENTUAL));
+
+      client.tableOperations().flush(table, null, null, true);
+      // wait for the eventual scan to see the new data
+      final int initialSize = generateKeys(0, 100).size();
+      Wait.waitFor(() -> {
+        try (var scanner = client.createScanner(table, AUTHORIZATIONS)) {
+          scanner.setConsistencyLevel(EVENTUAL);
+          return scan(scanner).size() > initialSize;
+        }
+      });
+
+      verifyData(client, table, AUTHORIZATIONS, generateKeys(0, 200),
+          scanner -> scanner.setConsistencyLevel(EVENTUAL));
+
+    } finally {
+      getCluster().getClusterControl().stop(ServerType.SCAN_SERVER);
+    }
+  }
 
   @Test
   public void testBulkImport() throws Exception {

Reply via email to