This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 12951be5d7 Modified AssignmentHandler to return false for Tablets with OpId (#3808) 12951be5d7 is described below commit 12951be5d78860a175355a0071f5cf53c72d5078 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Thu Oct 5 12:11:49 2023 -0400 Modified AssignmentHandler to return false for Tablets with OpId (#3808) In #3425 the Manager was modified to set the TabletGoalState to UNASSIGNED for Tablets that have an OperationId. This changes the AssignmentHandler to return false in the same condition. This will cause both the TabletServer and the ScanServer to not host a Tablet when there is an OperationId. Closes #3741 --- pom.xml | 5 ++ .../apache/accumulo/tserver/AssignmentHandler.java | 6 ++ test/pom.xml | 4 + .../org/apache/accumulo/test/ScanServerIT.java | 93 ++++++++++++++++++++++ 4 files changed, 108 insertions(+) diff --git a/pom.xml b/pom.xml index 108fc5788e..7a5a7f1fde 100644 --- a/pom.xml +++ b/pom.xml @@ -629,6 +629,11 @@ <artifactId>objenesis</artifactId> <version>3.3</version> </dependency> + <dependency> + <groupId>org.opentest4j</groupId> + <artifactId>opentest4j</artifactId> + <version>1.2.0</version> + </dependency> <dependency> <groupId>org.powermock</groupId> <artifactId>powermock-api-easymock</artifactId> diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java index 873934f90a..7694747e3f 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java @@ -285,6 +285,12 @@ class AssignmentHandler implements Runnable { return false; } + if (meta.getOperationId() != null) { + log.info(METADATA_ISSUE + "metadata entry has a FATE operation id {} {} {}", extent, loc, + meta.getOperationId()); + return false; + } + return true; } } diff --git a/test/pom.xml b/test/pom.xml index 2ffd6b1af4..0302e45a6d 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -198,6 +198,10 @@ <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-engine</artifactId> </dependency> + <dependency> + <groupId>org.opentest4j</groupId> + <artifactId>opentest4j</artifactId> + </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java index 7eef3d59dd..7fe25848b8 100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java @@ -23,14 +23,22 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; +import static org.junit.jupiter.api.Assertions.fail; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Objects; import java.util.Properties; import java.util.SortedSet; import java.util.TreeSet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -48,14 +56,21 @@ import org.apache.accumulo.core.client.admin.TabletHostingGoal; import org.apache.accumulo.core.conf.ClientProperty; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.Ample.TabletsMutator; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; +import org.apache.accumulo.core.metadata.schema.TabletOperationId; +import org.apache.accumulo.core.metadata.schema.TabletOperationType; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.harness.MiniClusterConfigurationCallback; import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.server.metadata.ServerAmpleImpl; import org.apache.accumulo.test.functional.ReadWriteIT; import org.apache.accumulo.test.functional.SlowIterator; import org.apache.accumulo.test.util.Wait; @@ -66,6 +81,7 @@ import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.opentest4j.AssertionFailedError; import com.google.common.collect.Iterables; @@ -246,6 +262,83 @@ public class ScanServerIT extends SharedMiniClusterBase { } } + @Test + public void testScanTabletsWithOperationIds() throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + String tableName = getUniqueNames(1)[0]; + + setupTableWithHostingMix(client, tableName); + + // Unload all tablets + TableId tid = TableId.of(client.tableOperations().tableIdMap().get(tableName)); + client.tableOperations().setTabletHostingGoal(tableName, new Range((Text) null, (Text) null), + TabletHostingGoal.ONDEMAND); + + // Wait for the tablets to be unloaded + Wait.waitFor(() -> ScanServerIT.getNumHostedTablets(client, tid.canonical()) == 0, 30_000, + 1_000); + + // Set operationIds on all the table's tablets so that they won't be loaded. + TabletOperationId opid = TabletOperationId.from(TabletOperationType.SPLITTING, 1234L); + Ample ample = getCluster().getServerContext().getAmple(); + ServerAmpleImpl sai = (ServerAmpleImpl) ample; + try (TabletsMutator tm = sai.mutateTablets()) { + ample.readTablets().forTable(tid).build().forEach(meta -> { + tm.mutateTablet(meta.getExtent()).putOperation(opid).mutate(); + }); + } + + final List<Future<?>> futures = new ArrayList<>(); + final ExecutorService executor = Executors.newFixedThreadPool(4); + try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY); + BatchScanner bscanner = client.createBatchScanner(tableName, Authorizations.EMPTY)) { + scanner.setRange(new Range()); + + // Confirm that the ScanServer will not complete the scan + scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL); + futures.add(executor.submit(() -> assertTimeoutPreemptively(Duration.ofSeconds(30), () -> { + Iterables.size(scanner); + }))); + + // Confirm that the TabletServer will not complete the scan + scanner.setConsistencyLevel(ConsistencyLevel.IMMEDIATE); + futures.add(executor.submit(() -> assertTimeoutPreemptively(Duration.ofSeconds(30), () -> { + Iterables.size(scanner); + }))); + + // Test the BatchScanner + bscanner.setRanges(Collections.singleton(new Range())); + + // Confirm that the ScanServer will not complete the scan + bscanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL); + futures.add(executor.submit(() -> assertTimeoutPreemptively(Duration.ofSeconds(30), () -> { + Iterables.size(bscanner); + }))); + + // Confirm that the TabletServer will not complete the scan + bscanner.setConsistencyLevel(ConsistencyLevel.IMMEDIATE); + futures.add(executor.submit(() -> assertTimeoutPreemptively(Duration.ofSeconds(30), () -> { + Iterables.size(bscanner); + }))); + + UtilWaitThread.sleep(30_000); + + assertEquals(4, futures.size()); + futures.forEach(f -> { + try { + f.get(); + fail("Scanner should have timed out"); + } catch (ExecutionException e) { + assertEquals(AssertionFailedError.class, e.getCause().getClass()); + } catch (InterruptedException e) { + fail("Scan was interrupted"); + } + }); + } // when the scanner is closed, all open sessions should be closed + executor.shutdown(); + } + } + @Test public void testBatchScanWithTabletHostingMix() throws Exception { try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {