This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 8cd904138a fixes race condition w/ split,compaction relative to 
offline (#4629)
8cd904138a is described below

commit 8cd904138a85197a03dc71542c6a0410400c8505
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Mon Jun 3 16:09:51 2024 -0400

    fixes race condition w/ split,compaction relative to offline (#4629)
    
    These commit wraps up #3412 and includes the following changes
    
     * compactors now cancel running compactions when the table state is no 
longer online, this avoids doing work that will never be used
     * coordinator will no longer start commiting compactions when the tablet 
is offline, this avoid race condition w/ offline+wait of table
     * fixed bug found in tablet refresher where it was not properly handling 
concurrent tablet unloads (found by testing concurrent compaction and offline)
     * compaction fate operation that drives table compaction will now fail 
when table is offline (this change may be needed in older versions)
     * reordered when the split fate operation checks for table offline to 
avoid race condition with offline+wait of table
     * added multiple ITs to test running split and compaction concurrently 
with offline table operation
---
 .../org/apache/accumulo/compactor/Compactor.java   |   8 ++
 .../coordinator/CompactionCoordinator.java         |  14 +++
 .../manager/tableOps/bulkVer2/TabletRefresher.java |  49 ++++++++-
 .../manager/tableOps/compact/CompactionDriver.java |   6 ++
 .../split/AllocateDirsAndEnsureOnline.java         | 110 +++++++++++++++++++++
 .../accumulo/manager/tableOps/split/PreSplit.java  |  29 +-----
 .../accumulo/manager/tableOps/split/SplitInfo.java |   2 +-
 .../apache/accumulo/test/fate/ManagerRepoIT.java   |  52 ++++++++++
 .../accumulo/test/functional/CompactionIT.java     |  94 +++++++++++++++++-
 .../apache/accumulo/test/functional/SplitIT.java   |  52 +++++++++-
 10 files changed, 379 insertions(+), 37 deletions(-)

diff --git 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 4c5357e35a..d05324da53 100644
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -75,6 +75,7 @@ import org.apache.accumulo.core.lock.ServiceLockData;
 import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor;
 import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors;
 import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
+import org.apache.accumulo.core.manager.state.tables.TableState;
 import org.apache.accumulo.core.metadata.ReferencedTabletFile;
 import org.apache.accumulo.core.metadata.StoredTabletFile;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
@@ -204,6 +205,13 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
           return;
         }
 
+        var tableState = getContext().getTableState(extent.tableId());
+        if (tableState != TableState.ONLINE) {
+          LOG.info("Cancelling compaction {} because table state is {}", ecid, 
tableState);
+          JOB_HOLDER.cancel(job.getExternalCompactionId());
+          return;
+        }
+
         if (job.getKind() == TCompactionKind.USER) {
 
           var cconf =
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index 7642003985..f9872c3687 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -740,6 +740,20 @@ public class CompactionCoordinator
     var tabletMeta =
         ctx.getAmple().readTablet(extent, ECOMP, SELECTED, LOCATION, FILES, 
COMPACTED, OPID);
 
+    var tableState = manager.getContext().getTableState(extent.tableId());
+    if (tableState != TableState.ONLINE) {
+      // Its important this check is done after the compaction id is set in 
the metadata table to
+      // avoid race conditions with the client code that waits for tables to 
go offline. That code
+      // looks for compaction ids in the metadata table after setting the 
table state. When that
+      // client code sees nothing for a tablet its important that nothing will 
changes the tablets
+      // files after that point in time which this check ensure.
+      LOG.debug("Not committing compaction {} for {} because of table state 
{}", ecid, extent,
+          tableState);
+      // cleanup metadata table and files related to the compaction
+      compactionsFailed(Map.of(ecid, extent));
+      return;
+    }
+
     if (!CommitCompaction.canCommitCompaction(ecid, tabletMeta)) {
       return;
     }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java
index cb963e583a..8bff563391 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java
@@ -23,9 +23,11 @@ import static java.util.stream.Collectors.mapping;
 import static java.util.stream.Collectors.toList;
 
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -35,6 +37,7 @@ import java.util.function.Supplier;
 
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
 import org.apache.accumulo.core.fate.FateId;
 import org.apache.accumulo.core.metadata.TServerInstance;
@@ -106,8 +109,8 @@ public class TabletRefresher {
 
         // Ask tablet server to reload the metadata for these tablets. The 
tablet server returns
         // the list of extents it was hosting but was unable to refresh (the 
tablets could be in
-        // the process of loading). If it is not currently hosting the tablet 
it treats that as
-        // refreshed and does not return anything for it.
+        // the process of loading). If it is not currently hosting the tablet 
it does not return
+        // anything for it.
         Future<List<TKeyExtent>> future = threadPool
             .submit(() -> sendSyncRefreshRequest(context, logId, 
entry.getKey(), entry.getValue()));
 
@@ -141,6 +144,48 @@ public class TabletRefresher {
             .removeIf(location -> 
!liveTservers.contains(location.getServerInstance()));
       }
 
+      if (!refreshesNeeded.isEmpty()) {
+        // look for any tablets where the location changed, these tablets will 
no longer need a
+        // refresh because when the tablet loads at the new location it will 
see the new tablet
+        // metadata
+        HashMap<KeyExtent,TabletMetadata.Location> prevLocations = new 
HashMap<>();
+        refreshesNeeded.forEach((loc, extents) -> {
+          for (TKeyExtent te : extents) {
+            var extent = KeyExtent.fromThrift(te);
+            prevLocations.put(extent, loc);
+          }
+        });
+
+        // Build a map of tablets that exist and their current location. No 
need to includes tablets
+        // that no longer exists or do not have a location as later logic is 
ok w/ these being null.
+        HashMap<KeyExtent,TabletMetadata.Location> currLocations = new 
HashMap<>();
+        try (var tablets =
+            
context.getAmple().readTablets().forTablets(prevLocations.keySet(), 
Optional.empty())
+                .fetch(ColumnType.LOCATION).build()) {
+          tablets.forEach(tablet -> {
+            if (tablet.getLocation() != null) {
+              currLocations.put(tablet.getExtent(), tablet.getLocation());
+            }
+          });
+        }
+
+        refreshesNeeded.clear();
+
+        var finalrefreshesNeeded = refreshesNeeded;
+        // rebuild refreshesNeeded only including those where the location is 
still the same
+        prevLocations.forEach((extent, prevLoc) -> {
+          var currLoc = currLocations.get(extent);
+          // currLoc may be null and this is ok because it should not be equal 
then
+          if (prevLoc.equals(currLoc)) {
+            finalrefreshesNeeded.computeIfAbsent(currLoc, k -> new 
ArrayList<>())
+                .add(extent.toThrift());
+          } else {
+            log.trace("The location of {} changed from {} to {}, so refresh no 
longer needed",
+                extent, prevLoc, currLoc);
+          }
+        });
+      }
+
       if (!refreshesNeeded.isEmpty()) {
         try {
           retry.waitForNextAttempt(log, logId + " waiting for " + 
refreshesNeeded.size()
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
index d1ace6c816..74fd66b49d 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
@@ -46,6 +46,7 @@ import org.apache.accumulo.core.fate.FateId;
 import org.apache.accumulo.core.fate.Repo;
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.core.logging.TabletLogger;
+import org.apache.accumulo.core.manager.state.tables.TableState;
 import org.apache.accumulo.core.metadata.AbstractTabletFile;
 import org.apache.accumulo.core.metadata.AccumuloTable;
 import org.apache.accumulo.core.metadata.StoredTabletFile;
@@ -113,6 +114,11 @@ class CompactionDriver extends ManagerRepo {
           TableOperationsImpl.TABLE_DELETED_MSG);
     }
 
+    if (manager.getContext().getTableState(tableId) != TableState.ONLINE) {
+      throw new AcceptableThriftTableOperationException(tableId.canonical(), 
null,
+          TableOperation.COMPACT, TableOperationExceptionType.OFFLINE, "The 
table is not online.");
+    }
+
     long t1 = System.currentTimeMillis();
 
     int tabletsToWaitFor = updateAndCheckTablets(manager, fateId);
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/AllocateDirsAndEnsureOnline.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/AllocateDirsAndEnsureOnline.java
new file mode 100644
index 0000000000..0b090cbcd2
--- /dev/null
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/AllocateDirsAndEnsureOnline.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.tableOps.split;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import 
org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
+import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
+import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
+import org.apache.accumulo.core.fate.FateId;
+import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.core.manager.state.tables.TableState;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletOperationId;
+import org.apache.accumulo.core.metadata.schema.TabletOperationType;
+import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.manager.tableOps.ManagerRepo;
+import org.apache.accumulo.server.tablets.TabletNameGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AllocateDirsAndEnsureOnline extends ManagerRepo {
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger log = LoggerFactory.getLogger(PreSplit.class);
+
+  private final SplitInfo splitInfo;
+
+  public AllocateDirsAndEnsureOnline(SplitInfo splitInfo) {
+    this.splitInfo = splitInfo;
+  }
+
+  @Override
+  public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
+    // This check of table state is done after setting the operation id to 
avoid a race condition
+    // with the client code that waits for a table to go offline. That client 
code sets the table
+    // state and then scans the metadata table looking for split operations 
ids. If split checks
+    // tables state before setting the opid then there is race condition with 
the client. Setting it
+    // after ensures that in the case when the client does not see any split 
op id in the metadata
+    // table that it knows that any splits starting after that point in time 
will not complete. This
+    // order is needed because the split fate operation does not acquire a 
table lock in zookeeper.
+    if (manager.getContext().getTableState(splitInfo.getOriginal().tableId())
+        != TableState.ONLINE) {
+
+      var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId);
+
+      // attempt to delete the operation id
+      try (var tabletsMutator = 
manager.getContext().getAmple().conditionallyMutateTablets()) {
+
+        Ample.RejectionHandler rejectionHandler = new Ample.RejectionHandler() 
{
+
+          @Override
+          public boolean callWhenTabletDoesNotExists() {
+            return true;
+          }
+
+          @Override
+          public boolean test(TabletMetadata tabletMetadata) {
+            // if the tablet no longer exists or our operation id is not set 
then consider a success
+            return tabletMetadata == null || 
!opid.equals(tabletMetadata.getOperationId());
+          }
+        };
+
+        
tabletsMutator.mutateTablet(splitInfo.getOriginal()).requireOperation(opid)
+            
.requireAbsentLocation().requireAbsentLogs().deleteOperation().submit(rejectionHandler);
+
+        var result = tabletsMutator.process().get(splitInfo.getOriginal());
+
+        if (result.getStatus() != Ample.ConditionalResult.Status.ACCEPTED) {
+          throw new IllegalStateException(
+              "Failed to delete operation id " + splitInfo.getOriginal());
+        }
+      }
+
+      throw new AcceptableThriftTableOperationException(
+          splitInfo.getOriginal().tableId().canonical(), null, 
TableOperation.SPLIT,
+          TableOperationExceptionType.OFFLINE,
+          "Unable to split tablet because the table is offline");
+    } else {
+      // Create the dir name here for the next step. If the next step fails it 
will always have the
+      // same dir name each time it runs again making it idempotent.
+      List<String> dirs = new ArrayList<>();
+
+      splitInfo.getSplits().forEach(split -> {
+        String dirName = 
TabletNameGenerator.createTabletDirectoryName(manager.getContext(), split);
+        dirs.add(dirName);
+        log.trace("{} allocated dir name {}", fateId, dirName);
+      });
+      return new UpdateTablets(splitInfo, dirs);
+    }
+  }
+}
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java
index 906e953f45..6d89878f95 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java
@@ -23,20 +23,14 @@ import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.SortedSet;
 
-import 
org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
-import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
-import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.fate.FateId;
 import org.apache.accumulo.core.fate.Repo;
-import org.apache.accumulo.core.manager.state.tables.TableState;
 import org.apache.accumulo.core.metadata.schema.Ample;
 import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
@@ -44,7 +38,6 @@ import 
org.apache.accumulo.core.metadata.schema.TabletOperationId;
 import org.apache.accumulo.core.metadata.schema.TabletOperationType;
 import org.apache.accumulo.manager.Manager;
 import org.apache.accumulo.manager.tableOps.ManagerRepo;
-import org.apache.accumulo.server.tablets.TabletNameGenerator;
 import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -89,15 +82,6 @@ public class PreSplit extends ManagerRepo {
         return 1000;
       }
     } else {
-      // do not set the operation id on the tablet if the table is offline
-      if (manager.getContext().getTableState(splitInfo.getOriginal().tableId())
-          != TableState.ONLINE) {
-        throw new AcceptableThriftTableOperationException(
-            splitInfo.getOriginal().tableId().canonical(), null, 
TableOperation.SPLIT,
-            TableOperationExceptionType.OFFLINE,
-            "Unable to split tablet because the table is offline");
-      }
-
       try (var tabletsMutator = 
manager.getContext().getAmple().conditionallyMutateTablets()) {
 
         
tabletsMutator.mutateTablet(splitInfo.getOriginal()).requireAbsentOperation()
@@ -155,18 +139,7 @@ public class PreSplit extends ManagerRepo {
         "Tablet unexpectedly had walogs %s %s %s", fateId, 
tabletMetadata.getLogs(),
         tabletMetadata.getExtent());
 
-    // Create the dir name here for the next step. If the next step fails it 
will always have the
-    // same dir name each time it runs again making it idempotent.
-
-    List<String> dirs = new ArrayList<>();
-
-    splitInfo.getSplits().forEach(split -> {
-      String dirName = 
TabletNameGenerator.createTabletDirectoryName(manager.getContext(), split);
-      dirs.add(dirName);
-      log.trace("{} allocated dir name {}", fateId, dirName);
-    });
-
-    return new UpdateTablets(splitInfo, dirs);
+    return new AllocateDirsAndEnsureOnline(splitInfo);
   }
 
   @Override
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/SplitInfo.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/SplitInfo.java
index b8f8c7adff..7d97e6a34e 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/SplitInfo.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/SplitInfo.java
@@ -37,7 +37,7 @@ public class SplitInfo implements Serializable {
   private final byte[] endRow;
   private final byte[][] splits;
 
-  SplitInfo(KeyExtent extent, SortedSet<Text> splits) {
+  public SplitInfo(KeyExtent extent, SortedSet<Text> splits) {
     this.tableId = extent.tableId();
     this.prevEndRow = extent.prevEndRow() == null ? null : 
TextUtil.getBytes(extent.prevEndRow());
     this.endRow = extent.endRow() == null ? null : 
TextUtil.getBytes(extent.endRow());
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java 
b/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java
index 978fb3c491..e49de2fa9f 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java
@@ -28,13 +28,16 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.nio.file.Path;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
+import java.util.TreeSet;
 import java.util.UUID;
 
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 import org.apache.accumulo.core.clientImpl.ClientContext;
+import 
org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.TableId;
@@ -55,10 +58,13 @@ import org.apache.accumulo.manager.tableOps.merge.MergeInfo;
 import org.apache.accumulo.manager.tableOps.merge.MergeInfo.Operation;
 import org.apache.accumulo.manager.tableOps.merge.MergeTablets;
 import org.apache.accumulo.manager.tableOps.merge.ReserveTablets;
+import org.apache.accumulo.manager.tableOps.split.AllocateDirsAndEnsureOnline;
 import org.apache.accumulo.manager.tableOps.split.FindSplits;
 import org.apache.accumulo.manager.tableOps.split.PreSplit;
+import org.apache.accumulo.manager.tableOps.split.SplitInfo;
 import org.apache.accumulo.test.ample.metadata.TestAmple;
 import org.apache.accumulo.test.ample.metadata.TestAmple.TestServerAmpleImpl;
+import org.apache.hadoop.io.Text;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
@@ -134,6 +140,52 @@ public class ManagerRepoIT extends SharedMiniClusterBase {
     }
   }
 
+  @Test
+  public void testSplitOffline() throws Exception {
+    String[] tableNames = getUniqueNames(2);
+    String metadataTable = tableNames[0];
+    String userTable = tableNames[1];
+
+    // This test ensures a repo involved in splitting a tablet handles an 
offline table correctly
+
+    try (ClientContext client =
+        (ClientContext) Accumulo.newClient().from(getClientProps()).build()) {
+      TestAmple.createMetadataTable(client, metadataTable);
+
+      // create a new table that is initially offline
+      client.tableOperations().create(userTable, new 
NewTableConfiguration().createOffline());
+
+      TableId tableId = 
TableId.of(client.tableOperations().tableIdMap().get(userTable));
+
+      TestServerAmpleImpl testAmple = (TestServerAmpleImpl) TestAmple
+          .create(getCluster().getServerContext(), Map.of(DataLevel.USER, 
metadataTable));
+
+      testAmple.createMetadataFromExisting(client, tableId,
+          not(SplitColumnFamily.UNSPLITTABLE_COLUMN));
+
+      var fateId = FateId.from(FateInstanceType.USER, UUID.randomUUID());
+      KeyExtent extent = new KeyExtent(tableId, null, null);
+
+      // manually set an operation id on the tablet
+      var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId);
+      testAmple.mutateTablet(extent)
+          .putOperation(TabletOperationId.from(TabletOperationType.SPLITTING, 
fateId)).mutate();
+
+      Manager manager = mockWithAmple(getCluster().getServerContext(), 
testAmple);
+
+      assertEquals(opid, testAmple.readTablet(extent).getOperationId());
+
+      var eoRepo = new AllocateDirsAndEnsureOnline(
+          new SplitInfo(extent, new TreeSet<>(List.of(new Text("sp1")))));
+
+      // The repo should delete the opid and throw an exception
+      assertThrows(ThriftTableOperationException.class, () -> 
eoRepo.call(fateId, manager));
+
+      // the operation id should have been cleaned up before the exception was 
thrown
+      assertNull(testAmple.readTablet(extent).getOperationId());
+    }
+  }
+
   @Test
   public void testFindSplitsUnsplittable() throws Exception {
 
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
index bd26f0a469..e110786f15 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
@@ -34,6 +34,7 @@ import java.nio.file.Paths;
 import java.nio.file.SimpleFileVisitor;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -43,9 +44,12 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Set;
+import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
@@ -96,6 +100,7 @@ import 
org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.test.VerifyIngest;
 import org.apache.accumulo.test.VerifyIngest.VerifyParams;
 import org.apache.accumulo.test.compaction.CompactionExecutorIT.TestPlanner;
@@ -1178,7 +1183,94 @@ public class CompactionIT extends AccumuloClusterHarness 
{
     ExternalCompactionTestUtils.assertNoCompactionMetadata(getServerContext(), 
table1);
   }
 
-  private void writeRows(ClientContext client, String tableName, int rows, 
boolean wait)
+  @Test
+  public void testOfflineAndCompactions() throws Exception {
+    var uniqueNames = getUniqueNames(1);
+    String table = uniqueNames[0];
+
+    // This test exercises concurrent compactions and table offline.
+
+    try (final AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+
+      SortedSet<Text> splits = new TreeSet<>();
+      for (int i = 1; i < 32; i++) {
+        splits.add(new Text(String.format("r:%04d", i)));
+      }
+
+      client.tableOperations().create(table, new 
NewTableConfiguration().withSplits(splits));
+      writeRows(client, table, 33, true);
+      // create two files per tablet
+      writeRows(client, table, 33, true);
+
+      var ctx = getCluster().getServerContext();
+      var tableId = ctx.getTableId(table);
+
+      // verify assumptions of test, expect all tablets to have files
+      var files0 = getFiles(ctx, tableId);
+      assertEquals(32, files0.size());
+      assertFalse(files0.values().stream().anyMatch(Set::isEmpty));
+
+      // lower the tables compaction ratio to cause system compactions
+      client.tableOperations().setProperty(table, 
Property.TABLE_MAJC_RATIO.getKey(), "1");
+
+      // start a bunch of compactions in the background
+      var executor = Executors.newCachedThreadPool();
+      List<Future<?>> futures = new ArrayList<>();
+      // start user compactions on a subset of the tables tablets, system 
compactions should attempt
+      // to run on all tablets. With concurrency should get a mix.
+      for (int i = 1; i < 20; i++) {
+        var startRow = new Text(String.format("r:%04d", i - 1));
+        var endRow = new Text(String.format("r:%04d", i));
+        futures.add(executor.submit(() -> {
+          CompactionConfig config = new CompactionConfig();
+          config.setWait(true);
+          config.setStartRow(startRow);
+          config.setEndRow(endRow);
+          client.tableOperations().compact(table, config);
+          return null;
+        }));
+      }
+
+      log.debug("Waiting for offline");
+      // take tablet offline while there are concurrent compactions
+      client.tableOperations().offline(table, true);
+
+      // grab a snapshot of all the tablets files after waiting for offline, 
do not expect any
+      // tablets files to change at this point
+      var files1 = getFiles(ctx, tableId);
+
+      // wait for the background compactions
+      log.debug("Waiting for futures");
+      for (var future : futures) {
+        try {
+          future.get();
+        } catch (ExecutionException ee) {
+          // its ok if some of the compactions fail because the table was 
concurrently taken offline
+          assertTrue(ee.getMessage().contains("is offline"));
+        }
+      }
+
+      // grab a second snapshot of the tablets files after all the background 
operations completed
+      var files2 = getFiles(ctx, tableId);
+
+      // do not expect the files to have changed after the offline operation 
returned.
+      assertEquals(files1, files2);
+
+      executor.shutdown();
+    }
+  }
+
+  private Map<KeyExtent,Set<StoredTabletFile>> getFiles(ServerContext ctx, 
TableId tableId) {
+    Map<KeyExtent,Set<StoredTabletFile>> files = new HashMap<>();
+    try (var tablets = ctx.getAmple().readTablets().forTable(tableId).build()) 
{
+      for (var tablet : tablets) {
+        files.put(tablet.getExtent(), tablet.getFiles());
+      }
+    }
+    return files;
+  }
+
+  private void writeRows(AccumuloClient client, String tableName, int rows, 
boolean wait)
       throws Exception {
     try (BatchWriter bw = client.createBatchWriter(tableName)) {
       for (int i = 0; i < rows; i++) {
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
index 1985b96b32..f444eababe 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
@@ -25,6 +25,7 @@ import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSec
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
 import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assumptions.assumeTrue;
@@ -42,6 +43,8 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -345,6 +348,15 @@ public class SplitIT extends AccumuloClusterHarness {
 
   @Test
   public void concurrentSplit() throws Exception {
+    concurrentSplit(false);
+  }
+
+  @Test
+  public void concurrentSplitAndOffline() throws Exception {
+    concurrentSplit(true);
+  }
+
+  public void concurrentSplit(boolean offlineTable) throws Exception {
     try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
 
       final String tableName = getUniqueNames(1)[0];
@@ -364,15 +376,15 @@ public class SplitIT extends AccumuloClusterHarness {
       ExecutorService es = Executors.newFixedThreadPool(10);
       final int totalFutures = 100;
       final int splitsPerFuture = 4;
-      final Set<Text> totalSplits = new HashSet<>();
+      final Set<Text> totalSplits = new ConcurrentSkipListSet<>();
       List<Callable<Void>> tasks = new ArrayList<>(totalFutures);
       for (int i = 0; i < totalFutures; i++) {
         final Pair<Integer,Integer> splitBounds = 
getRandomSplitBounds(numRows);
         final TreeSet<Text> splits = 
TestIngest.getSplitPoints(splitBounds.getFirst().longValue(),
             splitBounds.getSecond().longValue(), splitsPerFuture);
-        totalSplits.addAll(splits);
         tasks.add(() -> {
           c.tableOperations().addSplits(tableName, splits);
+          totalSplits.addAll(splits);
           return null;
         });
       }
@@ -381,19 +393,49 @@ public class SplitIT extends AccumuloClusterHarness {
       List<Future<Void>> futures =
           tasks.parallelStream().map(es::submit).collect(Collectors.toList());
 
+      Set<Text> splitsAfterOffline = null;
+      if (offlineTable) {
+        // run offline concurrently with split operation
+        c.tableOperations().offline(tableName, true);
+        splitsAfterOffline = 
Set.copyOf(c.tableOperations().listSplits(tableName));
+      }
+
       log.debug("Waiting for futures to complete");
       for (Future<?> f : futures) {
-        f.get();
+        try {
+          f.get();
+        } catch (ExecutionException ee) {
+          if (offlineTable && ee.getMessage().contains("is offline")) {
+            // Some exceptions are expected when concurrently taking the table 
offline.
+            log.debug(ee.getMessage());
+          } else {
+            throw ee;
+          }
+        }
       }
-      es.shutdown();
 
-      log.debug("Checking that {} splits were created ", totalSplits.size());
+      if (offlineTable) {
+        // The splits seen immediately after offline() call should not change 
after all the futures
+        // complete. This ensures that nothing changes in the tablet after the 
offline+wait call
+        // returns.
+        assertEquals(splitsAfterOffline, new 
HashSet<>(c.tableOperations().listSplits(tableName)),
+            "Splits changed after offline");
+
+        // table will be scanned for verification, so bring it online
+        c.tableOperations().online(tableName);
+      } else {
+        assertFalse(totalSplits.isEmpty());
+      }
 
+      log.debug("Checking that {} splits were created ", totalSplits.size());
       assertEquals(totalSplits, new 
HashSet<>(c.tableOperations().listSplits(tableName)),
           "Did not see expected splits");
 
       log.debug("Verifying {} rows ingested into {}", numRows, tableName);
       VerifyIngest.verifyIngest(c, params);
+
+      es.shutdown();
+
     }
   }
 

Reply via email to