This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new bd199baf91 Use conditional mutations for minor compactions (#3863)
bd199baf91 is described below

commit bd199baf917c801f4e8bd88537da09ea93ce9335
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Wed Oct 18 16:12:33 2023 -0400

    Use conditional mutations for minor compactions (#3863)
    
    Use conditional mutations for tablet updates to ensure the location is
    as expected and also check that a few other tablet metadata fields are
    as expected.
---
 .../metadata/ConditionalTabletMutatorImpl.java     |  10 ++
 .../accumulo/server/util/ManagerMetadataUtil.java  |  38 +------
 .../accumulo/server/util/MetadataTableUtil.java    |   8 --
 .../org/apache/accumulo/tserver/tablet/Tablet.java | 118 ++++++++++++++++-----
 .../org/apache/accumulo/test/ComprehensiveIT.java  |   6 ++
 .../test/functional/AmpleConditionalWriterIT.java  |  52 ++++++++-
 6 files changed, 160 insertions(+), 72 deletions(-)

diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
index 4393a09110..fb3d1dc9e5 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
@@ -22,6 +22,7 @@ package org.apache.accumulo.server.metadata;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.HostingColumnFamily.GOAL_COLUMN;
 import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.COMPACT_COLUMN;
+import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.FLUSH_COLUMN;
 import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.OPID_COLUMN;
 import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.SELECTED_COLUMN;
 import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN;
@@ -231,6 +232,15 @@ public class ConditionalTabletMutatorImpl extends 
TabletMutatorBase<Ample.Condit
         mutation.addCondition(c);
       }
         break;
+      case FLUSH_ID: {
+        Condition c =
+            new Condition(FLUSH_COLUMN.getColumnFamily(), 
FLUSH_COLUMN.getColumnQualifier());
+        if (tabletMetadata.getFlushId().isPresent()) {
+          c = 
c.setValue(Long.toString(tabletMetadata.getFlushId().getAsLong()));
+        }
+        mutation.addCondition(c);
+      }
+        break;
       default:
         throw new UnsupportedOperationException("Column type " + type + " is 
not supported.");
     }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/util/ManagerMetadataUtil.java
 
b/server/base/src/main/java/org/apache/accumulo/server/util/ManagerMetadataUtil.java
index f109991cc1..69e7da8e6e 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/util/ManagerMetadataUtil.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/util/ManagerMetadataUtil.java
@@ -24,8 +24,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
@@ -179,37 +177,6 @@ public class ManagerMetadataUtil {
     }
   }
 
-  /**
-   * Update tablet file data from flush. Returns a StoredTabletFile if there 
are data entries.
-   */
-  public static Optional<StoredTabletFile> updateTabletDataFile(ServerContext 
context,
-      KeyExtent extent, ReferencedTabletFile newDatafile, DataFileValue dfv, 
MetadataTime time,
-      TServerInstance tServerInstance, ServiceLock zooLock, Set<String> 
unusedWalLogs,
-      Location lastLocation, long flushId) {
-
-    // ELASTICITY_TODO use conditional mutation and require tablet location
-    TabletMutator tablet = context.getAmple().mutateTablet(extent);
-    // if there are no entries, the path doesn't get stored in metadata table, 
only the flush ID
-    Optional<StoredTabletFile> newFile = Optional.empty();
-
-    // if entries are present, write to path to metadata table
-    if (dfv.getNumEntries() > 0) {
-      tablet.putFile(newDatafile, dfv);
-      tablet.putTime(time);
-      newFile = Optional.of(newDatafile.insert());
-
-      updateLastForCompactionMode(context, tablet, lastLocation, 
tServerInstance);
-    }
-    tablet.putFlushId(flushId);
-
-    unusedWalLogs.forEach(tablet::deleteWal);
-
-    tablet.putZooLock(context.getZooKeeperRoot(), zooLock);
-
-    tablet.mutate();
-    return newFile;
-  }
-
   /**
    * Update the last location if the location mode is "assignment". This will 
delete the previous
    * last location if needed and set the new last location
@@ -240,8 +207,9 @@ public class ManagerMetadataUtil {
    * @param lastLocation The last location
    * @param tServerInstance The server address
    */
-  public static void updateLastForCompactionMode(ClientContext context, 
TabletMutator tabletMutator,
-      Location lastLocation, TServerInstance tServerInstance) {
+  public static void updateLastForCompactionMode(ClientContext context,
+      Ample.ConditionalTabletMutator tabletMutator, Location lastLocation,
+      TServerInstance tServerInstance) {
     // if the location mode is 'compaction', then preserve the current 
compaction location in the
     // last location value
     if 
("compaction".equals(context.getConfiguration().get(Property.TSERV_LAST_LOCATION_MODE)))
 {
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
 
b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
index 1a2db0d106..c6b2e8689d 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
@@ -141,14 +141,6 @@ public class MetadataTableUtil {
     log.error("Failed to write metadata updates for extent {} {}", extent, 
m.prettyPrint(), e);
   }
 
-  public static void updateTabletFlushID(KeyExtent extent, long flushID, 
ServerContext context,
-      ServiceLock zooLock) {
-    TabletMutator tablet = context.getAmple().mutateTablet(extent);
-    tablet.putFlushId(flushID);
-    tablet.putZooLock(context.getZooKeeperRoot(), zooLock);
-    tablet.mutate();
-  }
-
   public static Map<StoredTabletFile,DataFileValue> updateTabletDataFile(long 
tid, KeyExtent extent,
       Map<ReferencedTabletFile,DataFileValue> estSizes, MetadataTime time, 
ServerContext context,
       ServiceLock zooLock) {
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 38853da19d..afa6380ffd 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -37,7 +37,6 @@ import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -63,6 +62,7 @@ import 
org.apache.accumulo.core.manager.state.tables.TableState;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.ReferencedTabletFile;
 import org.apache.accumulo.core.metadata.StoredTabletFile;
+import org.apache.accumulo.core.metadata.schema.Ample;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
@@ -426,10 +426,6 @@ public class Tablet extends TabletBase {
           return;
         }
 
-        if (getMetadata().getFlushId().orElse(-1) >= tableFlushID) {
-          return;
-        }
-
         if (isClosing() || isClosed() || isBeingDeleted()
             || getTabletMemory().memoryReservedForMinC()) {
           return;
@@ -447,18 +443,37 @@ public class Tablet extends TabletBase {
         refreshLock.lock();
         try {
           // if multiple threads were allowed to update this outside of a sync 
block, then it would
-          // be
-          // a race condition
-          // ELASTICITY_TODO use conditional mutations
-          MetadataTableUtil.updateTabletFlushID(extent, tableFlushID, context,
-              getTabletServer().getLock());
-          // It is important the the refresh lock is held for the update above 
and the refresh below
-          // to avoid race conditions.
-          refreshMetadata(RefreshPurpose.FLUSH_ID_UPDATE);
+          // be a race condition
+          var lastTabletMetadata = getMetadata();
+
+          // Check flush id while holding refresh lock to prevent race 
condition with other threads
+          // in tablet reading and writing the tablets metadata.
+          if (lastTabletMetadata.getFlushId().orElse(-1) < tableFlushID) {
+            try (var tabletsMutator = 
getContext().getAmple().conditionallyMutateTablets()) {
+              var tablet = tabletsMutator.mutateTablet(extent)
+                  
.requireLocation(Location.current(tabletServer.getTabletSession()))
+                  .requireSame(lastTabletMetadata, ColumnType.PREV_ROW, 
ColumnType.FLUSH_ID);
+
+              tablet.putFlushId(tableFlushID);
+              tablet.putZooLock(context.getZooKeeperRoot(), 
getTabletServer().getLock());
+              tablet
+                  .submit(tabletMetadata -> 
tabletMetadata.getFlushId().orElse(-1) == tableFlushID);
+
+              var result = tabletsMutator.process().get(extent);
+
+              if (result.getStatus() != 
Ample.ConditionalResult.Status.ACCEPTED) {
+                throw new IllegalStateException("Failed to update flush id " + 
extent + " "
+                    + tabletServer.getTabletSession() + " " + tableFlushID);
+              }
+            }
+
+            // It is important the the refresh lock is held for the update 
above and the refresh
+            // below to avoid race conditions.
+            refreshMetadata(RefreshPurpose.FLUSH_ID_UPDATE);
+          }
         } finally {
           refreshLock.unlock();
         }
-
       } else if (initiateMinor) {
         initiateMinorCompaction(tableFlushID, MinorCompactionReason.USER);
       }
@@ -1283,20 +1298,65 @@ public class Tablet extends TabletBase {
       ReferencedTabletFile newDatafile, DataFileValue dfv, Set<String> 
unusedWalLogs,
       long flushId) {
 
-    // expect time to only move forward from what was recently seen in 
metadata table
-    Preconditions.checkArgument(maxCommittedTime >= 
getMetadata().getTime().getTime());
+    Preconditions.checkState(refreshLock.isHeldByCurrentThread());
+
+    // Read these once in case of buggy race conditions will get consistent 
logging. If all other
+    // code is locking properly these should not change during this method.
+    var lastTabletMetadata = getMetadata();
+    var expectedTime = lastTabletMetadata.getTime();
+
+    // Expect time to only move forward from what was recently seen in 
metadata table.
+    Preconditions.checkArgument(maxCommittedTime >= expectedTime.getTime());
+
+    // The tablet time is used to determine if the write succeeded, in order 
to do this the tablet
+    // time needs to be different from what is currently stored in the 
metadata table.
+    while (maxCommittedTime == expectedTime.getTime()) {
+      var nextTime = tabletTime.getAndUpdateTime();
+      Preconditions.checkState(nextTime >= maxCommittedTime);
+      if (nextTime > maxCommittedTime) {
+        maxCommittedTime++;
+      }
+    }
+
+    try (var tabletsMutator = 
getContext().getAmple().conditionallyMutateTablets()) {
+      var tablet = tabletsMutator.mutateTablet(extent)
+          .requireLocation(Location.current(tabletServer.getTabletSession()))
+          .requireSame(lastTabletMetadata, ColumnType.PREV_ROW, 
ColumnType.TIME);
+
+      Optional<StoredTabletFile> newFile = Optional.empty();
+
+      // if entries are present, write to path to metadata table
+      if (dfv.getNumEntries() > 0) {
+        tablet.putFile(newDatafile, dfv);
+        newFile = Optional.of(newDatafile.insert());
+
+        ManagerMetadataUtil.updateLastForCompactionMode(getContext(), tablet, 
lastLocation,
+            tabletServer.getTabletSession());
+      }
+
+      var newTime = tabletTime.getMetadataTime(maxCommittedTime);
+      tablet.putTime(newTime);
 
-    // ELASTICITY_TODO use conditional mutation, can check time and location
+      tablet.putFlushId(flushId);
 
-    // ELASTICITY_TODO minor compaction will need something like the bulk 
import loaded column
-    // to avoid : partial write, compact of file in partial write, and then 
another write of the
-    // file
-    // leading to the file being added twice.
+      unusedWalLogs.forEach(tablet::deleteWal);
 
-    return 
ManagerMetadataUtil.updateTabletDataFile(getTabletServer().getContext(), extent,
-        newDatafile, dfv, tabletTime.getMetadataTime(maxCommittedTime),
-        tabletServer.getTabletSession(), tabletServer.getLock(), 
unusedWalLogs, lastLocation,
-        flushId);
+      tablet.putZooLock(getContext().getZooKeeperRoot(), 
tabletServer.getLock());
+
+      // When trying to determine if write was successful, check if the time 
was updated. Can not
+      // check if the new file exists because of two reasons. First, it could 
be compacted away
+      // between the write and check. Second, some flushes do not produce a 
file.
+      tablet.submit(tabletMetadata -> 
tabletMetadata.getTime().equals(newTime));
+
+      if (tabletsMutator.process().get(extent).getStatus()
+          != Ample.ConditionalResult.Status.ACCEPTED) {
+        // Include the things that could have caused the write to fail.
+        throw new IllegalStateException("Unable to write minor compaction.  " 
+ extent + " "
+            + tabletServer.getTabletSession() + " " + expectedTime);
+      }
+
+      return newFile;
+    }
   }
 
   @Override
@@ -1360,7 +1420,7 @@ public class Tablet extends TabletBase {
 
   // The purpose of this lock is to prevent race conditions between concurrent 
refresh RPC calls and
   // between minor compactions and refresh calls.
-  private final Lock refreshLock = new ReentrantLock();
+  private final ReentrantLock refreshLock = new ReentrantLock();
 
   void bringMinorCompactionOnline(ReferencedTabletFile tmpDatafile,
       ReferencedTabletFile newDatafile, DataFileValue dfv, CommitSession 
commitSession,
@@ -1468,6 +1528,12 @@ public class Tablet extends TabletBase {
       // scans
       TabletMetadata tabletMetadata = 
getContext().getAmple().readTablet(getExtent());
 
+      Preconditions.checkState(tabletMetadata != null, "Tablet no longer exits 
%s", getExtent());
+      Preconditions.checkState(
+          
Location.current(tabletServer.getTabletSession()).equals(tabletMetadata.getLocation()),
+          "Tablet % location %s is not this tserver %s", getExtent(), 
tabletMetadata.getLocation(),
+          tabletServer.getTabletSession());
+
       synchronized (this) {
         var prevMetadata = latestMetadata;
         latestMetadata = tabletMetadata;
diff --git a/test/src/main/java/org/apache/accumulo/test/ComprehensiveIT.java 
b/test/src/main/java/org/apache/accumulo/test/ComprehensiveIT.java
index 1cd5ce06cd..cec2e224c6 100644
--- a/test/src/main/java/org/apache/accumulo/test/ComprehensiveIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ComprehensiveIT.java
@@ -207,6 +207,12 @@ public class ComprehensiveIT extends SharedMiniClusterBase 
{
       Wait.waitFor(() -> expected.equals(scan(client, table, AUTHORIZATIONS)));
 
       verifyData(client, table, AUTHORIZATIONS, expected);
+
+      // flush a table with no unflushed data, tablet servers take a different 
code path for this
+      // case
+      client.tableOperations().flush(table, null, null, true);
+
+      verifyData(client, table, AUTHORIZATIONS, expected);
     }
   }
 
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
index 81883b0e2b..452ee25dcc 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
@@ -23,6 +23,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.SELECTED_COLUMN;
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACTED;
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FLUSH_ID;
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED;
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
@@ -33,6 +34,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -525,7 +527,7 @@ public class AmpleConditionalWriterIT extends 
AccumuloClusterHarness {
   }
 
   @Test
-  public void testMultipleExtents() throws Exception {
+  public void testMultipleExtents() {
     try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
       var ts1 = new TServerInstance("localhost:9997", 5000L);
       var ts2 = new TServerInstance("localhost:9997", 6000L);
@@ -576,7 +578,7 @@ public class AmpleConditionalWriterIT extends 
AccumuloClusterHarness {
   }
 
   @Test
-  public void testOperations() throws Exception {
+  public void testOperations() {
     try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
       var context = cluster.getServerContext();
 
@@ -691,7 +693,7 @@ public class AmpleConditionalWriterIT extends 
AccumuloClusterHarness {
   }
 
   @Test
-  public void testRootTabletUpdate() throws Exception {
+  public void testRootTabletUpdate() {
     var context = cluster.getServerContext();
 
     var rootMeta = context.getAmple().readTablet(RootTable.EXTENT);
@@ -754,4 +756,48 @@ public class AmpleConditionalWriterIT extends 
AccumuloClusterHarness {
       }
     }
   }
+
+  @Test
+  public void testFlushId() {
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+      var context = cluster.getServerContext();
+
+      assertTrue(context.getAmple().readTablet(e1).getFlushId().isEmpty());
+
+      var ctmi = new ConditionalTabletsMutatorImpl(context);
+
+      var tabletMeta1 = TabletMetadata.builder(e1).putFlushId(42L).build();
+      assertTrue(tabletMeta1.getFlushId().isPresent());
+      ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, 
FLUSH_ID)
+          .putFlushId(43L).submit(tabletMetadata -> 
tabletMetadata.getFlushId().orElse(-1) == 43L);
+      var results = ctmi.process();
+      assertEquals(Status.REJECTED, results.get(e1).getStatus());
+      assertTrue(context.getAmple().readTablet(e1).getFlushId().isEmpty());
+
+      ctmi = new ConditionalTabletsMutatorImpl(context);
+      var tabletMeta2 = TabletMetadata.builder(e1).build(FLUSH_ID);
+      assertFalse(tabletMeta2.getFlushId().isPresent());
+      ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta2, 
FLUSH_ID)
+          .putFlushId(43L).submit(tabletMetadata -> 
tabletMetadata.getFlushId().orElse(-1) == 43L);
+      results = ctmi.process();
+      assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
+      assertEquals(43L, 
context.getAmple().readTablet(e1).getFlushId().getAsLong());
+
+      ctmi = new ConditionalTabletsMutatorImpl(context);
+      var tabletMeta3 = TabletMetadata.builder(e1).putFlushId(43L).build();
+      assertTrue(tabletMeta1.getFlushId().isPresent());
+      ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta3, 
FLUSH_ID)
+          .putFlushId(44L).submit(tabletMetadata -> 
tabletMetadata.getFlushId().orElse(-1) == 44L);
+      results = ctmi.process();
+      assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
+      assertEquals(44L, 
context.getAmple().readTablet(e1).getFlushId().getAsLong());
+
+      ctmi = new ConditionalTabletsMutatorImpl(context);
+      ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta3, 
FLUSH_ID)
+          .putFlushId(45L).submit(tabletMetadata -> 
tabletMetadata.getFlushId().orElse(-1) == 45L);
+      results = ctmi.process();
+      assertEquals(Status.REJECTED, results.get(e1).getStatus());
+      assertEquals(44L, 
context.getAmple().readTablet(e1).getFlushId().getAsLong());
+    }
+  }
 }

Reply via email to