This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 12951be5d7 Modified AssignmentHandler to return false for Tablets with 
OpId (#3808)
12951be5d7 is described below

commit 12951be5d78860a175355a0071f5cf53c72d5078
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Thu Oct 5 12:11:49 2023 -0400

    Modified AssignmentHandler to return false for Tablets with OpId (#3808)
    
    In #3425 the Manager was modified to set the TabletGoalState to
    UNASSIGNED for Tablets that have an OperationId. This changes
    the AssignmentHandler to return false in the same condition.
    This will cause both the TabletServer and the ScanServer to not
    host a Tablet when there is an OperationId.
    
    Closes #3741
---
 pom.xml                                            |  5 ++
 .../apache/accumulo/tserver/AssignmentHandler.java |  6 ++
 test/pom.xml                                       |  4 +
 .../org/apache/accumulo/test/ScanServerIT.java     | 93 ++++++++++++++++++++++
 4 files changed, 108 insertions(+)

diff --git a/pom.xml b/pom.xml
index 108fc5788e..7a5a7f1fde 100644
--- a/pom.xml
+++ b/pom.xml
@@ -629,6 +629,11 @@
         <artifactId>objenesis</artifactId>
         <version>3.3</version>
       </dependency>
+      <dependency>
+        <groupId>org.opentest4j</groupId>
+        <artifactId>opentest4j</artifactId>
+        <version>1.2.0</version>
+      </dependency>
       <dependency>
         <groupId>org.powermock</groupId>
         <artifactId>powermock-api-easymock</artifactId>
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java
index 873934f90a..7694747e3f 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java
@@ -285,6 +285,12 @@ class AssignmentHandler implements Runnable {
       return false;
     }
 
+    if (meta.getOperationId() != null) {
+      log.info(METADATA_ISSUE + "metadata entry has a FATE operation id {} {} 
{}", extent, loc,
+          meta.getOperationId());
+      return false;
+    }
+
     return true;
   }
 }
diff --git a/test/pom.xml b/test/pom.xml
index 2ffd6b1af4..0302e45a6d 100644
--- a/test/pom.xml
+++ b/test/pom.xml
@@ -198,6 +198,10 @@
       <groupId>org.junit.jupiter</groupId>
       <artifactId>junit-jupiter-engine</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.opentest4j</groupId>
+      <artifactId>opentest4j</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java 
b/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java
index 7eef3d59dd..7fe25848b8 100644
--- a/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java
@@ -23,14 +23,22 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
+import static org.junit.jupiter.api.Assertions.fail;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Objects;
 import java.util.Properties;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -48,14 +56,21 @@ import 
org.apache.accumulo.core.client.admin.TabletHostingGoal;
 import org.apache.accumulo.core.conf.ClientProperty;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.Ample.TabletsMutator;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
+import org.apache.accumulo.core.metadata.schema.TabletOperationId;
+import org.apache.accumulo.core.metadata.schema.TabletOperationType;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
 import org.apache.accumulo.harness.SharedMiniClusterBase;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.metadata.ServerAmpleImpl;
 import org.apache.accumulo.test.functional.ReadWriteIT;
 import org.apache.accumulo.test.functional.SlowIterator;
 import org.apache.accumulo.test.util.Wait;
@@ -66,6 +81,7 @@ import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.opentest4j.AssertionFailedError;
 
 import com.google.common.collect.Iterables;
 
@@ -246,6 +262,83 @@ public class ScanServerIT extends SharedMiniClusterBase {
     }
   }
 
+  @Test
+  public void testScanTabletsWithOperationIds() throws Exception {
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+      String tableName = getUniqueNames(1)[0];
+
+      setupTableWithHostingMix(client, tableName);
+
+      // Unload all tablets
+      TableId tid = 
TableId.of(client.tableOperations().tableIdMap().get(tableName));
+      client.tableOperations().setTabletHostingGoal(tableName, new 
Range((Text) null, (Text) null),
+          TabletHostingGoal.ONDEMAND);
+
+      // Wait for the tablets to be unloaded
+      Wait.waitFor(() -> ScanServerIT.getNumHostedTablets(client, 
tid.canonical()) == 0, 30_000,
+          1_000);
+
+      // Set operationIds on all the table's tablets so that they won't be 
loaded.
+      TabletOperationId opid = 
TabletOperationId.from(TabletOperationType.SPLITTING, 1234L);
+      Ample ample = getCluster().getServerContext().getAmple();
+      ServerAmpleImpl sai = (ServerAmpleImpl) ample;
+      try (TabletsMutator tm = sai.mutateTablets()) {
+        ample.readTablets().forTable(tid).build().forEach(meta -> {
+          tm.mutateTablet(meta.getExtent()).putOperation(opid).mutate();
+        });
+      }
+
+      final List<Future<?>> futures = new ArrayList<>();
+      final ExecutorService executor = Executors.newFixedThreadPool(4);
+      try (Scanner scanner = client.createScanner(tableName, 
Authorizations.EMPTY);
+          BatchScanner bscanner = client.createBatchScanner(tableName, 
Authorizations.EMPTY)) {
+        scanner.setRange(new Range());
+
+        // Confirm that the ScanServer will not complete the scan
+        scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+        futures.add(executor.submit(() -> 
assertTimeoutPreemptively(Duration.ofSeconds(30), () -> {
+          Iterables.size(scanner);
+        })));
+
+        // Confirm that the TabletServer will not complete the scan
+        scanner.setConsistencyLevel(ConsistencyLevel.IMMEDIATE);
+        futures.add(executor.submit(() -> 
assertTimeoutPreemptively(Duration.ofSeconds(30), () -> {
+          Iterables.size(scanner);
+        })));
+
+        // Test the BatchScanner
+        bscanner.setRanges(Collections.singleton(new Range()));
+
+        // Confirm that the ScanServer will not complete the scan
+        bscanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+        futures.add(executor.submit(() -> 
assertTimeoutPreemptively(Duration.ofSeconds(30), () -> {
+          Iterables.size(bscanner);
+        })));
+
+        // Confirm that the TabletServer will not complete the scan
+        bscanner.setConsistencyLevel(ConsistencyLevel.IMMEDIATE);
+        futures.add(executor.submit(() -> 
assertTimeoutPreemptively(Duration.ofSeconds(30), () -> {
+          Iterables.size(bscanner);
+        })));
+
+        UtilWaitThread.sleep(30_000);
+
+        assertEquals(4, futures.size());
+        futures.forEach(f -> {
+          try {
+            f.get();
+            fail("Scanner should have timed out");
+          } catch (ExecutionException e) {
+            assertEquals(AssertionFailedError.class, e.getCause().getClass());
+          } catch (InterruptedException e) {
+            fail("Scan was interrupted");
+          }
+        });
+      } // when the scanner is closed, all open sessions should be closed
+      executor.shutdown();
+    }
+  }
+
   @Test
   public void testBatchScanWithTabletHostingMix() throws Exception {
     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {

Reply via email to