This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new bd199baf91 Use conditional mutations for minor compactions (#3863) bd199baf91 is described below commit bd199baf917c801f4e8bd88537da09ea93ce9335 Author: Keith Turner <ktur...@apache.org> AuthorDate: Wed Oct 18 16:12:33 2023 -0400 Use conditional mutations for minor compactions (#3863) Use conditional mutations for tablet updates to ensure the location is as expected and also check that a few other tablet metadata fields are as expected. --- .../metadata/ConditionalTabletMutatorImpl.java | 10 ++ .../accumulo/server/util/ManagerMetadataUtil.java | 38 +------ .../accumulo/server/util/MetadataTableUtil.java | 8 -- .../org/apache/accumulo/tserver/tablet/Tablet.java | 118 ++++++++++++++++----- .../org/apache/accumulo/test/ComprehensiveIT.java | 6 ++ .../test/functional/AmpleConditionalWriterIT.java | 52 ++++++++- 6 files changed, 160 insertions(+), 72 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java index 4393a09110..fb3d1dc9e5 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java @@ -22,6 +22,7 @@ package org.apache.accumulo.server.metadata; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.HostingColumnFamily.GOAL_COLUMN; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.COMPACT_COLUMN; +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.FLUSH_COLUMN; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.OPID_COLUMN; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.SELECTED_COLUMN; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN; @@ -231,6 +232,15 @@ public class ConditionalTabletMutatorImpl extends TabletMutatorBase<Ample.Condit mutation.addCondition(c); } break; + case FLUSH_ID: { + Condition c = + new Condition(FLUSH_COLUMN.getColumnFamily(), FLUSH_COLUMN.getColumnQualifier()); + if (tabletMetadata.getFlushId().isPresent()) { + c = c.setValue(Long.toString(tabletMetadata.getFlushId().getAsLong())); + } + mutation.addCondition(c); + } + break; default: throw new UnsupportedOperationException("Column type " + type + " is not supported."); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ManagerMetadataUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/ManagerMetadataUtil.java index f109991cc1..69e7da8e6e 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/ManagerMetadataUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/ManagerMetadataUtil.java @@ -24,8 +24,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Optional; -import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -179,37 +177,6 @@ public class ManagerMetadataUtil { } } - /** - * Update tablet file data from flush. Returns a StoredTabletFile if there are data entries. - */ - public static Optional<StoredTabletFile> updateTabletDataFile(ServerContext context, - KeyExtent extent, ReferencedTabletFile newDatafile, DataFileValue dfv, MetadataTime time, - TServerInstance tServerInstance, ServiceLock zooLock, Set<String> unusedWalLogs, - Location lastLocation, long flushId) { - - // ELASTICITY_TODO use conditional mutation and require tablet location - TabletMutator tablet = context.getAmple().mutateTablet(extent); - // if there are no entries, the path doesn't get stored in metadata table, only the flush ID - Optional<StoredTabletFile> newFile = Optional.empty(); - - // if entries are present, write to path to metadata table - if (dfv.getNumEntries() > 0) { - tablet.putFile(newDatafile, dfv); - tablet.putTime(time); - newFile = Optional.of(newDatafile.insert()); - - updateLastForCompactionMode(context, tablet, lastLocation, tServerInstance); - } - tablet.putFlushId(flushId); - - unusedWalLogs.forEach(tablet::deleteWal); - - tablet.putZooLock(context.getZooKeeperRoot(), zooLock); - - tablet.mutate(); - return newFile; - } - /** * Update the last location if the location mode is "assignment". This will delete the previous * last location if needed and set the new last location @@ -240,8 +207,9 @@ public class ManagerMetadataUtil { * @param lastLocation The last location * @param tServerInstance The server address */ - public static void updateLastForCompactionMode(ClientContext context, TabletMutator tabletMutator, - Location lastLocation, TServerInstance tServerInstance) { + public static void updateLastForCompactionMode(ClientContext context, + Ample.ConditionalTabletMutator tabletMutator, Location lastLocation, + TServerInstance tServerInstance) { // if the location mode is 'compaction', then preserve the current compaction location in the // last location value if ("compaction".equals(context.getConfiguration().get(Property.TSERV_LAST_LOCATION_MODE))) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java index 1a2db0d106..c6b2e8689d 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java @@ -141,14 +141,6 @@ public class MetadataTableUtil { log.error("Failed to write metadata updates for extent {} {}", extent, m.prettyPrint(), e); } - public static void updateTabletFlushID(KeyExtent extent, long flushID, ServerContext context, - ServiceLock zooLock) { - TabletMutator tablet = context.getAmple().mutateTablet(extent); - tablet.putFlushId(flushID); - tablet.putZooLock(context.getZooKeeperRoot(), zooLock); - tablet.mutate(); - } - public static Map<StoredTabletFile,DataFileValue> updateTabletDataFile(long tid, KeyExtent extent, Map<ReferencedTabletFile,DataFileValue> estSizes, MetadataTime time, ServerContext context, ServiceLock zooLock) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 38853da19d..afa6380ffd 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -37,7 +37,6 @@ import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -63,6 +62,7 @@ import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; @@ -426,10 +426,6 @@ public class Tablet extends TabletBase { return; } - if (getMetadata().getFlushId().orElse(-1) >= tableFlushID) { - return; - } - if (isClosing() || isClosed() || isBeingDeleted() || getTabletMemory().memoryReservedForMinC()) { return; @@ -447,18 +443,37 @@ public class Tablet extends TabletBase { refreshLock.lock(); try { // if multiple threads were allowed to update this outside of a sync block, then it would - // be - // a race condition - // ELASTICITY_TODO use conditional mutations - MetadataTableUtil.updateTabletFlushID(extent, tableFlushID, context, - getTabletServer().getLock()); - // It is important the the refresh lock is held for the update above and the refresh below - // to avoid race conditions. - refreshMetadata(RefreshPurpose.FLUSH_ID_UPDATE); + // be a race condition + var lastTabletMetadata = getMetadata(); + + // Check flush id while holding refresh lock to prevent race condition with other threads + // in tablet reading and writing the tablets metadata. + if (lastTabletMetadata.getFlushId().orElse(-1) < tableFlushID) { + try (var tabletsMutator = getContext().getAmple().conditionallyMutateTablets()) { + var tablet = tabletsMutator.mutateTablet(extent) + .requireLocation(Location.current(tabletServer.getTabletSession())) + .requireSame(lastTabletMetadata, ColumnType.PREV_ROW, ColumnType.FLUSH_ID); + + tablet.putFlushId(tableFlushID); + tablet.putZooLock(context.getZooKeeperRoot(), getTabletServer().getLock()); + tablet + .submit(tabletMetadata -> tabletMetadata.getFlushId().orElse(-1) == tableFlushID); + + var result = tabletsMutator.process().get(extent); + + if (result.getStatus() != Ample.ConditionalResult.Status.ACCEPTED) { + throw new IllegalStateException("Failed to update flush id " + extent + " " + + tabletServer.getTabletSession() + " " + tableFlushID); + } + } + + // It is important the the refresh lock is held for the update above and the refresh + // below to avoid race conditions. + refreshMetadata(RefreshPurpose.FLUSH_ID_UPDATE); + } } finally { refreshLock.unlock(); } - } else if (initiateMinor) { initiateMinorCompaction(tableFlushID, MinorCompactionReason.USER); } @@ -1283,20 +1298,65 @@ public class Tablet extends TabletBase { ReferencedTabletFile newDatafile, DataFileValue dfv, Set<String> unusedWalLogs, long flushId) { - // expect time to only move forward from what was recently seen in metadata table - Preconditions.checkArgument(maxCommittedTime >= getMetadata().getTime().getTime()); + Preconditions.checkState(refreshLock.isHeldByCurrentThread()); + + // Read these once in case of buggy race conditions will get consistent logging. If all other + // code is locking properly these should not change during this method. + var lastTabletMetadata = getMetadata(); + var expectedTime = lastTabletMetadata.getTime(); + + // Expect time to only move forward from what was recently seen in metadata table. + Preconditions.checkArgument(maxCommittedTime >= expectedTime.getTime()); + + // The tablet time is used to determine if the write succeeded, in order to do this the tablet + // time needs to be different from what is currently stored in the metadata table. + while (maxCommittedTime == expectedTime.getTime()) { + var nextTime = tabletTime.getAndUpdateTime(); + Preconditions.checkState(nextTime >= maxCommittedTime); + if (nextTime > maxCommittedTime) { + maxCommittedTime++; + } + } + + try (var tabletsMutator = getContext().getAmple().conditionallyMutateTablets()) { + var tablet = tabletsMutator.mutateTablet(extent) + .requireLocation(Location.current(tabletServer.getTabletSession())) + .requireSame(lastTabletMetadata, ColumnType.PREV_ROW, ColumnType.TIME); + + Optional<StoredTabletFile> newFile = Optional.empty(); + + // if entries are present, write to path to metadata table + if (dfv.getNumEntries() > 0) { + tablet.putFile(newDatafile, dfv); + newFile = Optional.of(newDatafile.insert()); + + ManagerMetadataUtil.updateLastForCompactionMode(getContext(), tablet, lastLocation, + tabletServer.getTabletSession()); + } + + var newTime = tabletTime.getMetadataTime(maxCommittedTime); + tablet.putTime(newTime); - // ELASTICITY_TODO use conditional mutation, can check time and location + tablet.putFlushId(flushId); - // ELASTICITY_TODO minor compaction will need something like the bulk import loaded column - // to avoid : partial write, compact of file in partial write, and then another write of the - // file - // leading to the file being added twice. + unusedWalLogs.forEach(tablet::deleteWal); - return ManagerMetadataUtil.updateTabletDataFile(getTabletServer().getContext(), extent, - newDatafile, dfv, tabletTime.getMetadataTime(maxCommittedTime), - tabletServer.getTabletSession(), tabletServer.getLock(), unusedWalLogs, lastLocation, - flushId); + tablet.putZooLock(getContext().getZooKeeperRoot(), tabletServer.getLock()); + + // When trying to determine if write was successful, check if the time was updated. Can not + // check if the new file exists because of two reasons. First, it could be compacted away + // between the write and check. Second, some flushes do not produce a file. + tablet.submit(tabletMetadata -> tabletMetadata.getTime().equals(newTime)); + + if (tabletsMutator.process().get(extent).getStatus() + != Ample.ConditionalResult.Status.ACCEPTED) { + // Include the things that could have caused the write to fail. + throw new IllegalStateException("Unable to write minor compaction. " + extent + " " + + tabletServer.getTabletSession() + " " + expectedTime); + } + + return newFile; + } } @Override @@ -1360,7 +1420,7 @@ public class Tablet extends TabletBase { // The purpose of this lock is to prevent race conditions between concurrent refresh RPC calls and // between minor compactions and refresh calls. - private final Lock refreshLock = new ReentrantLock(); + private final ReentrantLock refreshLock = new ReentrantLock(); void bringMinorCompactionOnline(ReferencedTabletFile tmpDatafile, ReferencedTabletFile newDatafile, DataFileValue dfv, CommitSession commitSession, @@ -1468,6 +1528,12 @@ public class Tablet extends TabletBase { // scans TabletMetadata tabletMetadata = getContext().getAmple().readTablet(getExtent()); + Preconditions.checkState(tabletMetadata != null, "Tablet no longer exits %s", getExtent()); + Preconditions.checkState( + Location.current(tabletServer.getTabletSession()).equals(tabletMetadata.getLocation()), + "Tablet % location %s is not this tserver %s", getExtent(), tabletMetadata.getLocation(), + tabletServer.getTabletSession()); + synchronized (this) { var prevMetadata = latestMetadata; latestMetadata = tabletMetadata; diff --git a/test/src/main/java/org/apache/accumulo/test/ComprehensiveIT.java b/test/src/main/java/org/apache/accumulo/test/ComprehensiveIT.java index 1cd5ce06cd..cec2e224c6 100644 --- a/test/src/main/java/org/apache/accumulo/test/ComprehensiveIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ComprehensiveIT.java @@ -207,6 +207,12 @@ public class ComprehensiveIT extends SharedMiniClusterBase { Wait.waitFor(() -> expected.equals(scan(client, table, AUTHORIZATIONS))); verifyData(client, table, AUTHORIZATIONS, expected); + + // flush a table with no unflushed data, tablet servers take a different code path for this + // case + client.tableOperations().flush(table, null, null, true); + + verifyData(client, table, AUTHORIZATIONS, expected); } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java index 81883b0e2b..452ee25dcc 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java @@ -23,6 +23,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.SELECTED_COLUMN; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACTED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FLUSH_ID; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; @@ -33,6 +34,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.ArrayList; import java.util.Collection; @@ -525,7 +527,7 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness { } @Test - public void testMultipleExtents() throws Exception { + public void testMultipleExtents() { try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { var ts1 = new TServerInstance("localhost:9997", 5000L); var ts2 = new TServerInstance("localhost:9997", 6000L); @@ -576,7 +578,7 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness { } @Test - public void testOperations() throws Exception { + public void testOperations() { try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { var context = cluster.getServerContext(); @@ -691,7 +693,7 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness { } @Test - public void testRootTabletUpdate() throws Exception { + public void testRootTabletUpdate() { var context = cluster.getServerContext(); var rootMeta = context.getAmple().readTablet(RootTable.EXTENT); @@ -754,4 +756,48 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness { } } } + + @Test + public void testFlushId() { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + var context = cluster.getServerContext(); + + assertTrue(context.getAmple().readTablet(e1).getFlushId().isEmpty()); + + var ctmi = new ConditionalTabletsMutatorImpl(context); + + var tabletMeta1 = TabletMetadata.builder(e1).putFlushId(42L).build(); + assertTrue(tabletMeta1.getFlushId().isPresent()); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, FLUSH_ID) + .putFlushId(43L).submit(tabletMetadata -> tabletMetadata.getFlushId().orElse(-1) == 43L); + var results = ctmi.process(); + assertEquals(Status.REJECTED, results.get(e1).getStatus()); + assertTrue(context.getAmple().readTablet(e1).getFlushId().isEmpty()); + + ctmi = new ConditionalTabletsMutatorImpl(context); + var tabletMeta2 = TabletMetadata.builder(e1).build(FLUSH_ID); + assertFalse(tabletMeta2.getFlushId().isPresent()); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta2, FLUSH_ID) + .putFlushId(43L).submit(tabletMetadata -> tabletMetadata.getFlushId().orElse(-1) == 43L); + results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + assertEquals(43L, context.getAmple().readTablet(e1).getFlushId().getAsLong()); + + ctmi = new ConditionalTabletsMutatorImpl(context); + var tabletMeta3 = TabletMetadata.builder(e1).putFlushId(43L).build(); + assertTrue(tabletMeta1.getFlushId().isPresent()); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta3, FLUSH_ID) + .putFlushId(44L).submit(tabletMetadata -> tabletMetadata.getFlushId().orElse(-1) == 44L); + results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + assertEquals(44L, context.getAmple().readTablet(e1).getFlushId().getAsLong()); + + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta3, FLUSH_ID) + .putFlushId(45L).submit(tabletMetadata -> tabletMetadata.getFlushId().orElse(-1) == 45L); + results = ctmi.process(); + assertEquals(Status.REJECTED, results.get(e1).getStatus()); + assertEquals(44L, context.getAmple().readTablet(e1).getFlushId().getAsLong()); + } + } }