This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 8cd904138a fixes race condition w/ split,compaction relative to offline (#4629) 8cd904138a is described below commit 8cd904138a85197a03dc71542c6a0410400c8505 Author: Keith Turner <ktur...@apache.org> AuthorDate: Mon Jun 3 16:09:51 2024 -0400 fixes race condition w/ split,compaction relative to offline (#4629) These commit wraps up #3412 and includes the following changes * compactors now cancel running compactions when the table state is no longer online, this avoids doing work that will never be used * coordinator will no longer start commiting compactions when the tablet is offline, this avoid race condition w/ offline+wait of table * fixed bug found in tablet refresher where it was not properly handling concurrent tablet unloads (found by testing concurrent compaction and offline) * compaction fate operation that drives table compaction will now fail when table is offline (this change may be needed in older versions) * reordered when the split fate operation checks for table offline to avoid race condition with offline+wait of table * added multiple ITs to test running split and compaction concurrently with offline table operation --- .../org/apache/accumulo/compactor/Compactor.java | 8 ++ .../coordinator/CompactionCoordinator.java | 14 +++ .../manager/tableOps/bulkVer2/TabletRefresher.java | 49 ++++++++- .../manager/tableOps/compact/CompactionDriver.java | 6 ++ .../split/AllocateDirsAndEnsureOnline.java | 110 +++++++++++++++++++++ .../accumulo/manager/tableOps/split/PreSplit.java | 29 +----- .../accumulo/manager/tableOps/split/SplitInfo.java | 2 +- .../apache/accumulo/test/fate/ManagerRepoIT.java | 52 ++++++++++ .../accumulo/test/functional/CompactionIT.java | 94 +++++++++++++++++- .../apache/accumulo/test/functional/SplitIT.java | 52 +++++++++- 10 files changed, 379 insertions(+), 37 deletions(-) diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 4c5357e35a..d05324da53 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -75,6 +75,7 @@ import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor; import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors; import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; +import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.DataFileValue; @@ -204,6 +205,13 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac return; } + var tableState = getContext().getTableState(extent.tableId()); + if (tableState != TableState.ONLINE) { + LOG.info("Cancelling compaction {} because table state is {}", ecid, tableState); + JOB_HOLDER.cancel(job.getExternalCompactionId()); + return; + } + if (job.getKind() == TCompactionKind.USER) { var cconf = diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index 7642003985..f9872c3687 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -740,6 +740,20 @@ public class CompactionCoordinator var tabletMeta = ctx.getAmple().readTablet(extent, ECOMP, SELECTED, LOCATION, FILES, COMPACTED, OPID); + var tableState = manager.getContext().getTableState(extent.tableId()); + if (tableState != TableState.ONLINE) { + // Its important this check is done after the compaction id is set in the metadata table to + // avoid race conditions with the client code that waits for tables to go offline. That code + // looks for compaction ids in the metadata table after setting the table state. When that + // client code sees nothing for a tablet its important that nothing will changes the tablets + // files after that point in time which this check ensure. + LOG.debug("Not committing compaction {} for {} because of table state {}", ecid, extent, + tableState); + // cleanup metadata table and files related to the compaction + compactionsFailed(Map.of(ecid, extent)); + return; + } + if (!CommitCompaction.canCommitCompaction(ecid, tabletMeta)) { return; } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java index cb963e583a..8bff563391 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java @@ -23,9 +23,11 @@ import static java.util.stream.Collectors.mapping; import static java.util.stream.Collectors.toList; import java.time.Duration; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -35,6 +37,7 @@ import java.util.function.Supplier; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.metadata.TServerInstance; @@ -106,8 +109,8 @@ public class TabletRefresher { // Ask tablet server to reload the metadata for these tablets. The tablet server returns // the list of extents it was hosting but was unable to refresh (the tablets could be in - // the process of loading). If it is not currently hosting the tablet it treats that as - // refreshed and does not return anything for it. + // the process of loading). If it is not currently hosting the tablet it does not return + // anything for it. Future<List<TKeyExtent>> future = threadPool .submit(() -> sendSyncRefreshRequest(context, logId, entry.getKey(), entry.getValue())); @@ -141,6 +144,48 @@ public class TabletRefresher { .removeIf(location -> !liveTservers.contains(location.getServerInstance())); } + if (!refreshesNeeded.isEmpty()) { + // look for any tablets where the location changed, these tablets will no longer need a + // refresh because when the tablet loads at the new location it will see the new tablet + // metadata + HashMap<KeyExtent,TabletMetadata.Location> prevLocations = new HashMap<>(); + refreshesNeeded.forEach((loc, extents) -> { + for (TKeyExtent te : extents) { + var extent = KeyExtent.fromThrift(te); + prevLocations.put(extent, loc); + } + }); + + // Build a map of tablets that exist and their current location. No need to includes tablets + // that no longer exists or do not have a location as later logic is ok w/ these being null. + HashMap<KeyExtent,TabletMetadata.Location> currLocations = new HashMap<>(); + try (var tablets = + context.getAmple().readTablets().forTablets(prevLocations.keySet(), Optional.empty()) + .fetch(ColumnType.LOCATION).build()) { + tablets.forEach(tablet -> { + if (tablet.getLocation() != null) { + currLocations.put(tablet.getExtent(), tablet.getLocation()); + } + }); + } + + refreshesNeeded.clear(); + + var finalrefreshesNeeded = refreshesNeeded; + // rebuild refreshesNeeded only including those where the location is still the same + prevLocations.forEach((extent, prevLoc) -> { + var currLoc = currLocations.get(extent); + // currLoc may be null and this is ok because it should not be equal then + if (prevLoc.equals(currLoc)) { + finalrefreshesNeeded.computeIfAbsent(currLoc, k -> new ArrayList<>()) + .add(extent.toThrift()); + } else { + log.trace("The location of {} changed from {} to {}, so refresh no longer needed", + extent, prevLoc, currLoc); + } + }); + } + if (!refreshesNeeded.isEmpty()) { try { retry.waitForNextAttempt(log, logId + " waiting for " + refreshesNeeded.size() diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java index d1ace6c816..74fd66b49d 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java @@ -46,6 +46,7 @@ import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.logging.TabletLogger; +import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.metadata.AbstractTabletFile; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.StoredTabletFile; @@ -113,6 +114,11 @@ class CompactionDriver extends ManagerRepo { TableOperationsImpl.TABLE_DELETED_MSG); } + if (manager.getContext().getTableState(tableId) != TableState.ONLINE) { + throw new AcceptableThriftTableOperationException(tableId.canonical(), null, + TableOperation.COMPACT, TableOperationExceptionType.OFFLINE, "The table is not online."); + } + long t1 = System.currentTimeMillis(); int tabletsToWaitFor = updateAndCheckTablets(manager, fateId); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/AllocateDirsAndEnsureOnline.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/AllocateDirsAndEnsureOnline.java new file mode 100644 index 0000000000..0b090cbcd2 --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/AllocateDirsAndEnsureOnline.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.tableOps.split; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException; +import org.apache.accumulo.core.clientImpl.thrift.TableOperation; +import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.manager.state.tables.TableState; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletOperationId; +import org.apache.accumulo.core.metadata.schema.TabletOperationType; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.tableOps.ManagerRepo; +import org.apache.accumulo.server.tablets.TabletNameGenerator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AllocateDirsAndEnsureOnline extends ManagerRepo { + + private static final long serialVersionUID = 1L; + private static final Logger log = LoggerFactory.getLogger(PreSplit.class); + + private final SplitInfo splitInfo; + + public AllocateDirsAndEnsureOnline(SplitInfo splitInfo) { + this.splitInfo = splitInfo; + } + + @Override + public Repo<Manager> call(FateId fateId, Manager manager) throws Exception { + // This check of table state is done after setting the operation id to avoid a race condition + // with the client code that waits for a table to go offline. That client code sets the table + // state and then scans the metadata table looking for split operations ids. If split checks + // tables state before setting the opid then there is race condition with the client. Setting it + // after ensures that in the case when the client does not see any split op id in the metadata + // table that it knows that any splits starting after that point in time will not complete. This + // order is needed because the split fate operation does not acquire a table lock in zookeeper. + if (manager.getContext().getTableState(splitInfo.getOriginal().tableId()) + != TableState.ONLINE) { + + var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId); + + // attempt to delete the operation id + try (var tabletsMutator = manager.getContext().getAmple().conditionallyMutateTablets()) { + + Ample.RejectionHandler rejectionHandler = new Ample.RejectionHandler() { + + @Override + public boolean callWhenTabletDoesNotExists() { + return true; + } + + @Override + public boolean test(TabletMetadata tabletMetadata) { + // if the tablet no longer exists or our operation id is not set then consider a success + return tabletMetadata == null || !opid.equals(tabletMetadata.getOperationId()); + } + }; + + tabletsMutator.mutateTablet(splitInfo.getOriginal()).requireOperation(opid) + .requireAbsentLocation().requireAbsentLogs().deleteOperation().submit(rejectionHandler); + + var result = tabletsMutator.process().get(splitInfo.getOriginal()); + + if (result.getStatus() != Ample.ConditionalResult.Status.ACCEPTED) { + throw new IllegalStateException( + "Failed to delete operation id " + splitInfo.getOriginal()); + } + } + + throw new AcceptableThriftTableOperationException( + splitInfo.getOriginal().tableId().canonical(), null, TableOperation.SPLIT, + TableOperationExceptionType.OFFLINE, + "Unable to split tablet because the table is offline"); + } else { + // Create the dir name here for the next step. If the next step fails it will always have the + // same dir name each time it runs again making it idempotent. + List<String> dirs = new ArrayList<>(); + + splitInfo.getSplits().forEach(split -> { + String dirName = TabletNameGenerator.createTabletDirectoryName(manager.getContext(), split); + dirs.add(dirName); + log.trace("{} allocated dir name {}", fateId, dirName); + }); + return new UpdateTablets(splitInfo, dirs); + } + } +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java index 906e953f45..6d89878f95 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java @@ -23,20 +23,14 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.SortedSet; -import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException; -import org.apache.accumulo.core.clientImpl.thrift.TableOperation; -import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.Repo; -import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; import org.apache.accumulo.core.metadata.schema.TabletMetadata; @@ -44,7 +38,6 @@ import org.apache.accumulo.core.metadata.schema.TabletOperationId; import org.apache.accumulo.core.metadata.schema.TabletOperationType; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.tableOps.ManagerRepo; -import org.apache.accumulo.server.tablets.TabletNameGenerator; import org.apache.hadoop.io.Text; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,15 +82,6 @@ public class PreSplit extends ManagerRepo { return 1000; } } else { - // do not set the operation id on the tablet if the table is offline - if (manager.getContext().getTableState(splitInfo.getOriginal().tableId()) - != TableState.ONLINE) { - throw new AcceptableThriftTableOperationException( - splitInfo.getOriginal().tableId().canonical(), null, TableOperation.SPLIT, - TableOperationExceptionType.OFFLINE, - "Unable to split tablet because the table is offline"); - } - try (var tabletsMutator = manager.getContext().getAmple().conditionallyMutateTablets()) { tabletsMutator.mutateTablet(splitInfo.getOriginal()).requireAbsentOperation() @@ -155,18 +139,7 @@ public class PreSplit extends ManagerRepo { "Tablet unexpectedly had walogs %s %s %s", fateId, tabletMetadata.getLogs(), tabletMetadata.getExtent()); - // Create the dir name here for the next step. If the next step fails it will always have the - // same dir name each time it runs again making it idempotent. - - List<String> dirs = new ArrayList<>(); - - splitInfo.getSplits().forEach(split -> { - String dirName = TabletNameGenerator.createTabletDirectoryName(manager.getContext(), split); - dirs.add(dirName); - log.trace("{} allocated dir name {}", fateId, dirName); - }); - - return new UpdateTablets(splitInfo, dirs); + return new AllocateDirsAndEnsureOnline(splitInfo); } @Override diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/SplitInfo.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/SplitInfo.java index b8f8c7adff..7d97e6a34e 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/SplitInfo.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/SplitInfo.java @@ -37,7 +37,7 @@ public class SplitInfo implements Serializable { private final byte[] endRow; private final byte[][] splits; - SplitInfo(KeyExtent extent, SortedSet<Text> splits) { + public SplitInfo(KeyExtent extent, SortedSet<Text> splits) { this.tableId = extent.tableId(); this.prevEndRow = extent.prevEndRow() == null ? null : TextUtil.getBytes(extent.prevEndRow()); this.endRow = extent.endRow() == null ? null : TextUtil.getBytes(extent.endRow()); diff --git a/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java b/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java index 978fb3c491..e49de2fa9f 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java @@ -28,13 +28,16 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.nio.file.Path; import java.util.Arrays; +import java.util.List; import java.util.Map; +import java.util.TreeSet; import java.util.UUID; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.TableId; @@ -55,10 +58,13 @@ import org.apache.accumulo.manager.tableOps.merge.MergeInfo; import org.apache.accumulo.manager.tableOps.merge.MergeInfo.Operation; import org.apache.accumulo.manager.tableOps.merge.MergeTablets; import org.apache.accumulo.manager.tableOps.merge.ReserveTablets; +import org.apache.accumulo.manager.tableOps.split.AllocateDirsAndEnsureOnline; import org.apache.accumulo.manager.tableOps.split.FindSplits; import org.apache.accumulo.manager.tableOps.split.PreSplit; +import org.apache.accumulo.manager.tableOps.split.SplitInfo; import org.apache.accumulo.test.ample.metadata.TestAmple; import org.apache.accumulo.test.ample.metadata.TestAmple.TestServerAmpleImpl; +import org.apache.hadoop.io.Text; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -134,6 +140,52 @@ public class ManagerRepoIT extends SharedMiniClusterBase { } } + @Test + public void testSplitOffline() throws Exception { + String[] tableNames = getUniqueNames(2); + String metadataTable = tableNames[0]; + String userTable = tableNames[1]; + + // This test ensures a repo involved in splitting a tablet handles an offline table correctly + + try (ClientContext client = + (ClientContext) Accumulo.newClient().from(getClientProps()).build()) { + TestAmple.createMetadataTable(client, metadataTable); + + // create a new table that is initially offline + client.tableOperations().create(userTable, new NewTableConfiguration().createOffline()); + + TableId tableId = TableId.of(client.tableOperations().tableIdMap().get(userTable)); + + TestServerAmpleImpl testAmple = (TestServerAmpleImpl) TestAmple + .create(getCluster().getServerContext(), Map.of(DataLevel.USER, metadataTable)); + + testAmple.createMetadataFromExisting(client, tableId, + not(SplitColumnFamily.UNSPLITTABLE_COLUMN)); + + var fateId = FateId.from(FateInstanceType.USER, UUID.randomUUID()); + KeyExtent extent = new KeyExtent(tableId, null, null); + + // manually set an operation id on the tablet + var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId); + testAmple.mutateTablet(extent) + .putOperation(TabletOperationId.from(TabletOperationType.SPLITTING, fateId)).mutate(); + + Manager manager = mockWithAmple(getCluster().getServerContext(), testAmple); + + assertEquals(opid, testAmple.readTablet(extent).getOperationId()); + + var eoRepo = new AllocateDirsAndEnsureOnline( + new SplitInfo(extent, new TreeSet<>(List.of(new Text("sp1"))))); + + // The repo should delete the opid and throw an exception + assertThrows(ThriftTableOperationException.class, () -> eoRepo.call(fateId, manager)); + + // the operation id should have been cleaned up before the exception was thrown + assertNull(testAmple.readTablet(extent).getOperationId()); + } + } + @Test public void testFindSplitsUnsplittable() throws Exception { diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java index bd26f0a469..e110786f15 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java @@ -34,6 +34,7 @@ import java.nio.file.Paths; import java.nio.file.SimpleFileVisitor; import java.nio.file.attribute.BasicFileAttributes; import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; import java.util.EnumSet; import java.util.HashMap; @@ -43,9 +44,12 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Optional; import java.util.Set; +import java.util.SortedSet; import java.util.TreeSet; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; @@ -96,6 +100,7 @@ import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.test.VerifyIngest; import org.apache.accumulo.test.VerifyIngest.VerifyParams; import org.apache.accumulo.test.compaction.CompactionExecutorIT.TestPlanner; @@ -1178,7 +1183,94 @@ public class CompactionIT extends AccumuloClusterHarness { ExternalCompactionTestUtils.assertNoCompactionMetadata(getServerContext(), table1); } - private void writeRows(ClientContext client, String tableName, int rows, boolean wait) + @Test + public void testOfflineAndCompactions() throws Exception { + var uniqueNames = getUniqueNames(1); + String table = uniqueNames[0]; + + // This test exercises concurrent compactions and table offline. + + try (final AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + + SortedSet<Text> splits = new TreeSet<>(); + for (int i = 1; i < 32; i++) { + splits.add(new Text(String.format("r:%04d", i))); + } + + client.tableOperations().create(table, new NewTableConfiguration().withSplits(splits)); + writeRows(client, table, 33, true); + // create two files per tablet + writeRows(client, table, 33, true); + + var ctx = getCluster().getServerContext(); + var tableId = ctx.getTableId(table); + + // verify assumptions of test, expect all tablets to have files + var files0 = getFiles(ctx, tableId); + assertEquals(32, files0.size()); + assertFalse(files0.values().stream().anyMatch(Set::isEmpty)); + + // lower the tables compaction ratio to cause system compactions + client.tableOperations().setProperty(table, Property.TABLE_MAJC_RATIO.getKey(), "1"); + + // start a bunch of compactions in the background + var executor = Executors.newCachedThreadPool(); + List<Future<?>> futures = new ArrayList<>(); + // start user compactions on a subset of the tables tablets, system compactions should attempt + // to run on all tablets. With concurrency should get a mix. + for (int i = 1; i < 20; i++) { + var startRow = new Text(String.format("r:%04d", i - 1)); + var endRow = new Text(String.format("r:%04d", i)); + futures.add(executor.submit(() -> { + CompactionConfig config = new CompactionConfig(); + config.setWait(true); + config.setStartRow(startRow); + config.setEndRow(endRow); + client.tableOperations().compact(table, config); + return null; + })); + } + + log.debug("Waiting for offline"); + // take tablet offline while there are concurrent compactions + client.tableOperations().offline(table, true); + + // grab a snapshot of all the tablets files after waiting for offline, do not expect any + // tablets files to change at this point + var files1 = getFiles(ctx, tableId); + + // wait for the background compactions + log.debug("Waiting for futures"); + for (var future : futures) { + try { + future.get(); + } catch (ExecutionException ee) { + // its ok if some of the compactions fail because the table was concurrently taken offline + assertTrue(ee.getMessage().contains("is offline")); + } + } + + // grab a second snapshot of the tablets files after all the background operations completed + var files2 = getFiles(ctx, tableId); + + // do not expect the files to have changed after the offline operation returned. + assertEquals(files1, files2); + + executor.shutdown(); + } + } + + private Map<KeyExtent,Set<StoredTabletFile>> getFiles(ServerContext ctx, TableId tableId) { + Map<KeyExtent,Set<StoredTabletFile>> files = new HashMap<>(); + try (var tablets = ctx.getAmple().readTablets().forTable(tableId).build()) { + for (var tablet : tablets) { + files.put(tablet.getExtent(), tablet.getFiles()); + } + } + return files; + } + + private void writeRows(AccumuloClient client, String tableName, int rows, boolean wait) throws Exception { try (BatchWriter bw = client.createBatchWriter(tableName)) { for (int i = 0; i < rows; i++) { diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java index 1985b96b32..f444eababe 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java @@ -25,6 +25,7 @@ import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSec import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; import static org.apache.accumulo.core.util.LazySingletons.RANDOM; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assumptions.assumeTrue; @@ -42,6 +43,8 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -345,6 +348,15 @@ public class SplitIT extends AccumuloClusterHarness { @Test public void concurrentSplit() throws Exception { + concurrentSplit(false); + } + + @Test + public void concurrentSplitAndOffline() throws Exception { + concurrentSplit(true); + } + + public void concurrentSplit(boolean offlineTable) throws Exception { try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { final String tableName = getUniqueNames(1)[0]; @@ -364,15 +376,15 @@ public class SplitIT extends AccumuloClusterHarness { ExecutorService es = Executors.newFixedThreadPool(10); final int totalFutures = 100; final int splitsPerFuture = 4; - final Set<Text> totalSplits = new HashSet<>(); + final Set<Text> totalSplits = new ConcurrentSkipListSet<>(); List<Callable<Void>> tasks = new ArrayList<>(totalFutures); for (int i = 0; i < totalFutures; i++) { final Pair<Integer,Integer> splitBounds = getRandomSplitBounds(numRows); final TreeSet<Text> splits = TestIngest.getSplitPoints(splitBounds.getFirst().longValue(), splitBounds.getSecond().longValue(), splitsPerFuture); - totalSplits.addAll(splits); tasks.add(() -> { c.tableOperations().addSplits(tableName, splits); + totalSplits.addAll(splits); return null; }); } @@ -381,19 +393,49 @@ public class SplitIT extends AccumuloClusterHarness { List<Future<Void>> futures = tasks.parallelStream().map(es::submit).collect(Collectors.toList()); + Set<Text> splitsAfterOffline = null; + if (offlineTable) { + // run offline concurrently with split operation + c.tableOperations().offline(tableName, true); + splitsAfterOffline = Set.copyOf(c.tableOperations().listSplits(tableName)); + } + log.debug("Waiting for futures to complete"); for (Future<?> f : futures) { - f.get(); + try { + f.get(); + } catch (ExecutionException ee) { + if (offlineTable && ee.getMessage().contains("is offline")) { + // Some exceptions are expected when concurrently taking the table offline. + log.debug(ee.getMessage()); + } else { + throw ee; + } + } } - es.shutdown(); - log.debug("Checking that {} splits were created ", totalSplits.size()); + if (offlineTable) { + // The splits seen immediately after offline() call should not change after all the futures + // complete. This ensures that nothing changes in the tablet after the offline+wait call + // returns. + assertEquals(splitsAfterOffline, new HashSet<>(c.tableOperations().listSplits(tableName)), + "Splits changed after offline"); + + // table will be scanned for verification, so bring it online + c.tableOperations().online(tableName); + } else { + assertFalse(totalSplits.isEmpty()); + } + log.debug("Checking that {} splits were created ", totalSplits.size()); assertEquals(totalSplits, new HashSet<>(c.tableOperations().listSplits(tableName)), "Did not see expected splits"); log.debug("Verifying {} rows ingested into {}", numRows, tableName); VerifyIngest.verifyIngest(c, params); + + es.shutdown(); + } }