This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new cfb0e9c37b use conditional mutation to set tablet availability (#4636) cfb0e9c37b is described below commit cfb0e9c37bc1dfa5e23a1a743cc0bbf5e5fe10aa Author: Keith Turner <ktur...@apache.org> AuthorDate: Wed Jun 5 13:36:45 2024 -0400 use conditional mutation to set tablet availability (#4636) Updated the fate operation that sets tablet availability to use conditional mutations. The fate code was refactored to do the metadata update in isReady because it may need to retry. Modified the fate operation to check if the table is offline and fail if it is. --- .../accumulo/manager/FateServiceHandler.java | 5 +- .../manager/tableOps/availability/LockTable.java | 70 ++++++++++++++++++++++ .../availability/SetTabletAvailability.java | 50 +++++++++++----- .../test/functional/TabletAvailabilityIT.java | 15 ++++- 4 files changed, 121 insertions(+), 19 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java b/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java index 3e45faca23..32303ade9d 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java @@ -87,7 +87,7 @@ import org.apache.accumulo.core.util.tables.TableNameUtil; import org.apache.accumulo.core.volume.Volume; import org.apache.accumulo.manager.tableOps.ChangeTableState; import org.apache.accumulo.manager.tableOps.TraceRepo; -import org.apache.accumulo.manager.tableOps.availability.SetTabletAvailability; +import org.apache.accumulo.manager.tableOps.availability.LockTable; import org.apache.accumulo.manager.tableOps.bulkVer2.PrepBulkImport; import org.apache.accumulo.manager.tableOps.clone.CloneTable; import org.apache.accumulo.manager.tableOps.compact.CompactRange; @@ -711,8 +711,7 @@ class FateServiceHandler implements FateService.Iface { goalMessage += "Set availability for table: " + tableName + "(" + tableId + ") range: " + tRange + " to: " + tabletAvailability.name(); manager.fate(type).seedTransaction(op.toString(), fateId, - new TraceRepo<>( - new SetTabletAvailability(tableId, namespaceId, tRange, tabletAvailability)), + new TraceRepo<>(new LockTable(tableId, namespaceId, tRange, tabletAvailability)), autoCleanup, goalMessage); break; } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/availability/LockTable.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/availability/LockTable.java new file mode 100644 index 0000000000..e6d099de1b --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/availability/LockTable.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.tableOps.availability; + +import org.apache.accumulo.core.client.admin.TabletAvailability; +import org.apache.accumulo.core.clientImpl.thrift.TableOperation; +import org.apache.accumulo.core.data.NamespaceId; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.thrift.TRange; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.tableOps.ManagerRepo; +import org.apache.accumulo.manager.tableOps.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LockTable extends ManagerRepo { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(LockTable.class); + + private final TableId tableId; + private final NamespaceId namespaceId; + private final TRange tRange; + private final TabletAvailability tabletAvailability; + + public LockTable(TableId tableId, NamespaceId namespaceId, TRange range, + TabletAvailability tabletAvailability) { + this.tableId = tableId; + this.namespaceId = namespaceId; + this.tRange = range; + this.tabletAvailability = tabletAvailability; + } + + @Override + public long isReady(FateId fateId, Manager manager) throws Exception { + return Utils.reserveNamespace(manager, namespaceId, fateId, + DistributedReadWriteLock.LockType.READ, true, TableOperation.SET_TABLET_AVAILABILITY) + + Utils.reserveTable(manager, tableId, fateId, DistributedReadWriteLock.LockType.WRITE, + true, TableOperation.SET_TABLET_AVAILABILITY); + } + + @Override + public Repo<Manager> call(FateId fateId, Manager manager) throws Exception { + return new SetTabletAvailability(tableId, namespaceId, tRange, tabletAvailability); + } + + @Override + public void undo(FateId fateId, Manager manager) throws Exception { + Utils.unreserveNamespace(manager, namespaceId, fateId, DistributedReadWriteLock.LockType.READ); + Utils.unreserveTable(manager, tableId, fateId, DistributedReadWriteLock.LockType.WRITE); + } +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/availability/SetTabletAvailability.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/availability/SetTabletAvailability.java index 6397ccb1e7..eda81d3ff3 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/availability/SetTabletAvailability.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/availability/SetTabletAvailability.java @@ -18,8 +18,13 @@ */ package org.apache.accumulo.manager.tableOps.availability; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + import org.apache.accumulo.core.client.admin.TabletAvailability; +import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException; import org.apache.accumulo.core.clientImpl.thrift.TableOperation; +import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.data.PartialKey; @@ -30,9 +35,11 @@ import org.apache.accumulo.core.dataImpl.thrift.TRange; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.LockType; -import org.apache.accumulo.core.metadata.schema.Ample.TabletsMutator; +import org.apache.accumulo.core.manager.state.tables.TableState; +import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; +import org.apache.accumulo.core.util.time.NanoTime; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.tableOps.ManagerRepo; import org.apache.accumulo.manager.tableOps.Utils; @@ -60,14 +67,11 @@ public class SetTabletAvailability extends ManagerRepo { @Override public long isReady(FateId fateId, Manager manager) throws Exception { - return Utils.reserveNamespace(manager, namespaceId, fateId, LockType.READ, true, - TableOperation.SET_TABLET_AVAILABILITY) - + Utils.reserveTable(manager, tableId, fateId, LockType.WRITE, true, - TableOperation.SET_TABLET_AVAILABILITY); - } - @Override - public Repo<Manager> call(FateId fateId, Manager manager) throws Exception { + if (manager.getContext().getTableState(tableId) != TableState.ONLINE) { + throw new AcceptableThriftTableOperationException(tableId.canonical(), null, + TableOperation.COMPACT, TableOperationExceptionType.OFFLINE, "The table is not online."); + } final Range range = new Range(tRange); LOG.debug("Finding tablets in Range: {} for table:{}", range, tableId); @@ -81,10 +85,21 @@ public class SetTabletAvailability extends ManagerRepo { // row is always inclusive. final Text scanRangeStart = (range.getStartKey() == null) ? null : range.getStartKey().getRow(); + AtomicLong notAccepted = new AtomicLong(); + + Consumer<Ample.ConditionalResult> resultsConsumer = result -> { + if (result.getStatus() != Ample.ConditionalResult.Status.ACCEPTED) { + notAccepted.incrementAndGet(); + LOG.debug("{} failed to set tablet availability for {}", fateId, result.getExtent()); + } + }; + + var start = NanoTime.now(); try ( TabletsMetadata m = manager.getContext().getAmple().readTablets().forTable(tableId) .overlapping(scanRangeStart, true, null).build(); - TabletsMutator mutator = manager.getContext().getAmple().mutateTablets()) { + Ample.AsyncConditionalTabletsMutator mutator = + manager.getContext().getAmple().conditionallyMutateTablets(resultsConsumer)) { for (TabletMetadata tm : m) { final KeyExtent tabletExtent = tm.getExtent(); LOG.trace("Evaluating tablet {} against range {}", tabletExtent, range); @@ -114,18 +129,23 @@ public class SetTabletAvailability extends ManagerRepo { LOG.debug("Setting tablet availability to {} requested for: {} ", tabletAvailability, tabletExtent); - mutator.mutateTablet(tabletExtent).putTabletAvailability(tabletAvailability).mutate(); + mutator.mutateTablet(tabletExtent).requireAbsentOperation() + .putTabletAvailability(tabletAvailability) + .submit(tabletMeta -> tabletMeta.getTabletAvailability() == tabletAvailability); } } - Utils.unreserveNamespace(manager, namespaceId, fateId, LockType.READ); - Utils.unreserveTable(manager, tableId, fateId, LockType.WRITE); - return null; + + if (notAccepted.get() > 0) { + return Math.min(30000, Math.max(start.elapsed().toMillis(), 1)); + } else { + return 0; + } } @Override - public void undo(FateId fateId, Manager manager) throws Exception { + public Repo<Manager> call(FateId fateId, Manager manager) throws Exception { Utils.unreserveNamespace(manager, namespaceId, fateId, LockType.READ); Utils.unreserveTable(manager, tableId, fateId, LockType.WRITE); + return null; } - } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/TabletAvailabilityIT.java b/test/src/main/java/org/apache/accumulo/test/functional/TabletAvailabilityIT.java index c2bc1900a9..403878b3d2 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/TabletAvailabilityIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/TabletAvailabilityIT.java @@ -22,6 +22,7 @@ import static org.apache.accumulo.core.client.admin.TabletAvailability.HOSTED; import static org.apache.accumulo.core.client.admin.TabletAvailability.UNHOSTED; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.HashMap; @@ -37,6 +38,8 @@ import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.InvalidTabletHostingRequestException; import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.ScannerBase; +import org.apache.accumulo.core.client.TableOfflineException; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.TabletAvailability; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; @@ -46,6 +49,17 @@ import org.junit.jupiter.api.Test; public class TabletAvailabilityIT extends AccumuloClusterHarness { + @Test + public void testOffline() throws Exception { + String table = getUniqueNames(1)[0]; + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + client.tableOperations().create(table, new NewTableConfiguration().createOffline()); + + assertThrows(TableOfflineException.class, + () -> client.tableOperations().setTabletAvailability(table, new Range(), HOSTED)); + } + } + @Test public void testBoundries() throws Exception { // Tests operating on tablets with different tablet availabilites that are next to each other. @@ -158,5 +172,4 @@ public class TabletAvailabilityIT extends AccumuloClusterHarness { m.put("f", "q", String.format("%010d", val)); return m; } - }