This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new b8089dd528 Add a merge column marker to make merge idempotent (#3957)
b8089dd528 is described below

commit b8089dd528dd959fa3eeea23063d47c61cacedeb
Author: Christopher L. Shannon <christopher.l.shan...@gmail.com>
AuthorDate: Sat Nov 18 15:03:05 2023 -0500

    Add a merge column marker to make merge idempotent (#3957)
    
    The new MERGE marker allows correctly resuming metadata updates for
    no-chop merge if there was a failure and resume and makes merge
    idempotent
    
    Co-authored-by: Keith Turner <ktur...@apache.org>
---
 .../core/metadata/schema/MetadataSchema.java       |  12 ++
 .../core/metadata/schema/TabletMetadata.java       |  13 +-
 .../core/metadata/schema/TabletsMetadata.java      |   4 +
 .../core/metadata/schema/TabletMetadataTest.java   |  29 ++++
 .../server/constraints/MetadataConstraints.java    |   4 +-
 .../accumulo/server/manager/state/MergeState.java  |   9 +-
 .../java/org/apache/accumulo/manager/Manager.java  |   1 +
 .../accumulo/manager/TabletGroupWatcher.java       | 154 ++++++++++++++++++---
 .../apache/accumulo/manager/state/MergeStats.java  |   2 +-
 .../accumulo/manager/TabletGroupWatcherTest.java   |  20 +++
 .../java/org/apache/accumulo/test/MetaSplitIT.java |   9 ++
 .../apache/accumulo/test/functional/MergeIT.java   |  22 ++-
 .../accumulo/test/util/FileMetadataUtil.java       |   9 ++
 13 files changed, 256 insertions(+), 32 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
index 0cb57e586f..2bc0a6e18e 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
@@ -325,6 +325,18 @@ public class MetadataSchema {
       public static final String STR_NAME = "ecomp";
       public static final Text NAME = new Text(STR_NAME);
     }
+
+    /**
+     * Column family for indicating that the files in a tablet contain fenced 
files that have been
+     * merged from other tablets during a merge operation. This is used to 
support resuming a failed
+     * merge operation.
+     */
+    public static class MergedColumnFamily {
+      public static final String STR_NAME = "merged";
+      public static final Text NAME = new Text(STR_NAME);
+      public static final ColumnFQ MERGED_COLUMN = new ColumnFQ(NAME, new 
Text(STR_NAME));
+      public static final Value MERGED_VALUE = new Value("merged");
+    }
   }
 
   /**
diff --git 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java
 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java
index fccc3e6c43..af31605d51 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java
@@ -62,6 +62,7 @@ import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Ex
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.MergedColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn;
@@ -104,6 +105,7 @@ public class TabletMetadata {
   private OptionalLong compact = OptionalLong.empty();
   private Double splitRatio = null;
   private Map<ExternalCompactionId,ExternalCompactionMetadata> extCompactions;
+  private boolean merged;
 
   public enum LocationType {
     CURRENT, FUTURE, LAST
@@ -125,7 +127,8 @@ public class TabletMetadata {
     COMPACT_ID,
     SPLIT_RATIO,
     SUSPEND,
-    ECOMP
+    ECOMP,
+    MERGED
   }
 
   public static class Location {
@@ -345,6 +348,11 @@ public class TabletMetadata {
     return splitRatio;
   }
 
+  public boolean hasMerged() {
+    ensureFetched(ColumnType.MERGED);
+    return merged;
+  }
+
   public SortedMap<Key,Value> getKeyValues() {
     Preconditions.checkState(keyValues != null, "Requested key values when it 
was not saved");
     return keyValues;
@@ -479,6 +487,9 @@ public class TabletMetadata {
           extCompBuilder.put(ExternalCompactionId.of(qual),
               ExternalCompactionMetadata.fromJson(val));
           break;
+        case MergedColumnFamily.STR_NAME:
+          te.merged = true;
+          break;
         default:
           throw new IllegalStateException("Unexpected family " + fam);
       }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
index d8811234df..515e6a08ef 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
@@ -76,6 +76,7 @@ import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Ex
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.MergedColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
@@ -336,6 +337,9 @@ public class TabletsMetadata implements 
Iterable<TabletMetadata>, AutoCloseable
           case ECOMP:
             families.add(ExternalCompactionColumnFamily.NAME);
             break;
+          case MERGED:
+            families.add(MergedColumnFamily.NAME);
+            break;
           default:
             throw new IllegalArgumentException("Unknown col type " + 
colToFetch);
 
diff --git 
a/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java
 
b/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java
index a3a3d33a45..d547b8596d 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java
@@ -20,6 +20,8 @@ package org.apache.accumulo.core.metadata.schema;
 
 import static java.util.stream.Collectors.toSet;
 import static org.apache.accumulo.core.metadata.StoredTabletFile.serialize;
+import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.MergedColumnFamily.MERGED_COLUMN;
+import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.MergedColumnFamily.MERGED_VALUE;
 import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.COMPACT_COLUMN;
 import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN;
 import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.FLUSH_COLUMN;
@@ -113,6 +115,8 @@ public class TabletMetadataTest {
     
mutation.at().family(ScanFileColumnFamily.NAME).qualifier(sf1.getMetadata()).put("");
     
mutation.at().family(ScanFileColumnFamily.NAME).qualifier(sf2.getMetadata()).put("");
 
+    MERGED_COLUMN.put(mutation, new Value());
+
     SortedMap<Key,Value> rowMap = toRowMap(mutation);
 
     TabletMetadata tm = TabletMetadata.convertRow(rowMap.entrySet().iterator(),
@@ -143,6 +147,7 @@ public class TabletMetadataTest {
     assertTrue(tm.sawPrevEndRow());
     assertEquals("M123456789", tm.getTime().encode());
     assertEquals(Set.of(sf1, sf2), Set.copyOf(tm.getScans()));
+    assertTrue(tm.hasMerged());
   }
 
   @Test
@@ -258,6 +263,30 @@ public class TabletMetadataTest {
     assertFalse(tm.hasCurrent());
   }
 
+  @Test
+  public void testMergedColumn() {
+    KeyExtent extent = new KeyExtent(TableId.of("5"), new Text("df"), new 
Text("da"));
+
+    // Test merged column set
+    Mutation mutation = TabletColumnFamily.createPrevRowMutation(extent);
+    MERGED_COLUMN.put(mutation, MERGED_VALUE);
+    TabletMetadata tm = 
TabletMetadata.convertRow(toRowMap(mutation).entrySet().iterator(),
+        EnumSet.of(ColumnType.MERGED), true);
+    assertTrue(tm.hasMerged());
+
+    // Column not set
+    mutation = TabletColumnFamily.createPrevRowMutation(extent);
+    tm = TabletMetadata.convertRow(toRowMap(mutation).entrySet().iterator(),
+        EnumSet.of(ColumnType.MERGED), true);
+    assertFalse(tm.hasMerged());
+
+    // MERGED Column not fetched
+    mutation = TabletColumnFamily.createPrevRowMutation(extent);
+    tm = TabletMetadata.convertRow(toRowMap(mutation).entrySet().iterator(),
+        EnumSet.of(ColumnType.PREV_ROW), true);
+    assertThrows(IllegalStateException.class, tm::hasMerged);
+  }
+
   private SortedMap<Key,Value> toRowMap(Mutation mutation) {
     SortedMap<Key,Value> rowMap = new TreeMap<>();
     mutation.getUpdates().forEach(cu -> {
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
 
b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
index 936d7f16ef..ca45a05e8a 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
@@ -47,6 +47,7 @@ import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Ex
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.MergedColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn;
@@ -98,7 +99,8 @@ public class MetadataConstraints implements Constraint {
           FutureLocationColumnFamily.NAME,
           ClonedColumnFamily.NAME,
           ExternalCompactionColumnFamily.NAME,
-          UpgraderDeprecatedConstants.ChoppedColumnFamily.NAME
+          UpgraderDeprecatedConstants.ChoppedColumnFamily.NAME,
+          MergedColumnFamily.NAME
       );
   // @formatter:on
 
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MergeState.java
 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MergeState.java
index fc15a0b39c..1ab0c6d777 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MergeState.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MergeState.java
@@ -33,10 +33,15 @@ public enum MergeState {
    */
   WAITING_FOR_OFFLINE,
   /**
-   * when the number of chopped, offline tablets equals the number of merge 
tablets, begin the
-   * metadata updates
+   * when the number of offline tablets equals the number of merge tablets, 
begin the metadata
+   * updates
    */
   MERGING,
+  /**
+   * when the operation has finished metadata updates for merge. We can now 
remove the merged
+   * tablets and clear the MERGED marker. Not used for delete
+   */
+  MERGED,
   /**
    * merge is complete, the resulting tablet can be brought online, remove the 
marker in zookeeper
    */
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 8142af7c3f..b75c8601c4 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -657,6 +657,7 @@ public class Manager extends AbstractServer
                 return TabletGoalState.UNASSIGNED;
               }
             case MERGING:
+            case MERGED:
               return TabletGoalState.UNASSIGNED;
           }
         } else {
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
index b99bf3473b..c4e29eaf69 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
@@ -77,6 +77,7 @@ import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Da
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.MergedColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
 import org.apache.accumulo.core.metadata.schema.MetadataTime;
@@ -585,15 +586,28 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
           try {
             if (stats.getMergeInfo().isDelete()) {
               deleteTablets(stats.getMergeInfo());
+              // For delete we are done and can skip to COMPLETE
+              update = MergeState.COMPLETE;
             } else {
               mergeMetadataRecords(stats.getMergeInfo());
+              // For merge we need another state to delete the tablets
+              // and clear the marker
+              update = MergeState.MERGED;
             }
-            update = MergeState.COMPLETE;
             manager.setMergeState(stats.getMergeInfo(), update);
           } catch (Exception ex) {
             Manager.log.error("Unable merge metadata table records", ex);
           }
         }
+
+        // If the state is MERGED then we are finished with metadata updates
+        if (update == MergeState.MERGED) {
+          // Finish the merge operatoin by deleting the merged tablets and
+          // cleaning up the marker that was used for merge
+          deleteMergedTablets(stats.getMergeInfo());
+          update = MergeState.COMPLETE;
+          manager.setMergeState(stats.getMergeInfo(), update);
+        }
       } catch (Exception ex) {
         Manager.log.error(
             "Unable to update merge state for merge " + 
stats.getMergeInfo().getExtent(), ex);
@@ -601,6 +615,16 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
     }
   }
 
+  // Remove the merged marker from the last tablet in the merge range
+  private void clearMerged(MergeInfo mergeInfo, BatchWriter bw, HighTablet 
highTablet)
+      throws AccumuloException {
+    Manager.log.debug("Clearing MERGED marker for {}", mergeInfo.getExtent());
+    var m = new Mutation(highTablet.getExtent().toMetaRow());
+    MergedColumnFamily.MERGED_COLUMN.putDelete(m);
+    bw.addMutation(m);
+    bw.flush();
+  }
+
   // This method finds returns the deletion starting row (exclusive) for 
tablets that
   // need to be actually deleted. If the startTablet is null then
   // the deletion start row will just be null as all tablets are being deleted
@@ -720,7 +744,8 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
     if (extent.endRow() != null) {
       Key nextExtent = new Key(extent.endRow()).followingKey(PartialKey.ROW);
       followingTablet =
-          getHighTablet(new KeyExtent(extent.tableId(), nextExtent.getRow(), 
extent.endRow()));
+          getHighTablet(new KeyExtent(extent.tableId(), nextExtent.getRow(), 
extent.endRow()))
+              .getExtent();
       Manager.log.debug("Found following tablet {}", followingTablet);
     }
     try {
@@ -805,7 +830,8 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
   private void mergeMetadataRecords(MergeInfo info) throws AccumuloException {
     KeyExtent range = info.getExtent();
     Manager.log.debug("Merging metadata for {}", range);
-    KeyExtent stop = getHighTablet(range);
+    HighTablet highTablet = getHighTablet(range);
+    KeyExtent stop = highTablet.getExtent();
     Manager.log.debug("Highest tablet is {}", stop);
     Value firstPrevRowValue = null;
     Text stopRow = stop.toMetaRow();
@@ -813,8 +839,7 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
     if (start == null) {
       start = new Text();
     }
-    Range scanRange =
-        new Range(TabletsSection.encodeRow(range.tableId(), start), false, 
stopRow, false);
+
     String targetSystemTable = MetadataTable.NAME;
     if (range.isMeta()) {
       targetSystemTable = RootTable.NAME;
@@ -826,6 +851,13 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
     KeyExtent previousKeyExtent = null;
     KeyExtent lastExtent = null;
 
+    // Check if we have already previously fenced the tablets
+    if (highTablet.isMerged()) {
+      Manager.log.debug("tablet metadata already fenced for merge {}", range);
+      // Return as we already fenced the files
+      return;
+    }
+
     try (BatchWriter bw = client.createBatchWriter(targetSystemTable)) {
       long fileCount = 0;
       // Make file entries in highest tablet
@@ -954,28 +986,62 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
       // delete any entries for external compactions
       extCompIds.forEach(ecid -> 
m.putDelete(ExternalCompactionColumnFamily.STR_NAME, ecid));
 
-      if (!m.getUpdates().isEmpty()) {
-        bw.addMutation(m);
-      }
+      // Add a marker so we know the tablets have been fenced in case the 
merge operation
+      // needs to be recovered and restarted to finish later.
+      MergedColumnFamily.MERGED_COLUMN.put(m, MergedColumnFamily.MERGED_VALUE);
 
+      // Add the prev row column update to the same mutation as the
+      // file updates so it will be atomic and only update the prev row
+      // if the tablets were fenced
+      Preconditions.checkState(firstPrevRowValue != null,
+          "Previous row entry for lowest tablet was not found.");
+      stop = new KeyExtent(stop.tableId(), stop.endRow(),
+          TabletColumnFamily.decodePrevEndRow(firstPrevRowValue));
+      TabletColumnFamily.PREV_ROW_COLUMN.put(m,
+          TabletColumnFamily.encodePrevEndRow(stop.prevEndRow()));
+      Manager.log.debug("Setting the prevRow for last tablet: {}", stop);
+      bw.addMutation(m);
       bw.flush();
 
       Manager.log.debug("Moved {} files to {}", fileCount, stop);
+    } catch (Exception ex) {
+      throw new AccumuloException(ex);
+    }
+  }
 
-      if (firstPrevRowValue == null) {
-        Manager.log.debug("tablet already merged");
-        return;
-      }
+  private void deleteMergedTablets(MergeInfo info) throws AccumuloException {
+    KeyExtent range = info.getExtent();
+    Manager.log.debug("Deleting merged tablets for {}", range);
+    HighTablet highTablet = getHighTablet(range);
+    if (!highTablet.isMerged()) {
+      Manager.log.debug("Tablets have already been deleted for merge with 
range {}, returning",
+          range);
+      return;
+    }
 
-      stop = new KeyExtent(stop.tableId(), stop.endRow(),
-          TabletColumnFamily.decodePrevEndRow(firstPrevRowValue));
-      Mutation updatePrevRow = TabletColumnFamily.createPrevRowMutation(stop);
-      Manager.log.debug("Setting the prevRow for last tablet: {}", stop);
-      bw.addMutation(updatePrevRow);
-      bw.flush();
+    KeyExtent stop = highTablet.getExtent();
+    Manager.log.debug("Highest tablet is {}", stop);
+
+    Text stopRow = stop.toMetaRow();
+    Text start = range.prevEndRow();
+    if (start == null) {
+      start = new Text();
+    }
+    Range scanRange =
+        new Range(TabletsSection.encodeRow(range.tableId(), start), false, 
stopRow, false);
+    String targetSystemTable = MetadataTable.NAME;
+    if (range.isMeta()) {
+      targetSystemTable = RootTable.NAME;
+    }
+
+    AccumuloClient client = manager.getContext();
 
+    try (BatchWriter bw = client.createBatchWriter(targetSystemTable)) {
+      // Continue and delete the tablets that were merged
       deleteTablets(info, scanRange, bw, client);
 
+      // Clear the merged marker after we finish deleting tablets
+      clearMerged(info, bw, highTablet);
     } catch (Exception ex) {
       throw new AccumuloException(ex);
     }
@@ -1206,6 +1272,7 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
     // group all deletes into tablet into one mutation, this makes tablets
     // either disappear entirely or not all.. this is important for the case
     // where the process terminates in the loop below...
+    Manager.log.debug("Inside delete tablets");
     scanner = client.createScanner(info.getExtent().isMeta() ? RootTable.NAME 
: MetadataTable.NAME,
         Authorizations.EMPTY);
     Manager.log.debug("Deleting range {}", scanRange);
@@ -1236,24 +1303,47 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
         || key.getColumnFamily().equals(FutureLocationColumnFamily.NAME);
   }
 
-  private KeyExtent getHighTablet(KeyExtent range) throws AccumuloException {
+  private HighTablet getHighTablet(KeyExtent range) throws AccumuloException {
     try {
       AccumuloClient client = manager.getContext();
       Scanner scanner = client.createScanner(range.isMeta() ? RootTable.NAME : 
MetadataTable.NAME,
           Authorizations.EMPTY);
       TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
+      MergedColumnFamily.MERGED_COLUMN.fetch(scanner);
       KeyExtent start = new KeyExtent(range.tableId(), range.endRow(), null);
       scanner.setRange(new Range(start.toMetaRow(), null));
       Iterator<Entry<Key,Value>> iterator = scanner.iterator();
       if (!iterator.hasNext()) {
         throw new AccumuloException("No last tablet for a merge " + range);
       }
-      Entry<Key,Value> entry = iterator.next();
-      KeyExtent highTablet = KeyExtent.fromMetaPrevRow(entry);
-      if (!highTablet.tableId().equals(range.tableId())) {
+
+      KeyExtent highTablet = null;
+      boolean merged = false;
+      Text firstRow = null;
+
+      while (iterator.hasNext()) {
+        Entry<Key,Value> entry = iterator.next();
+        if (firstRow == null) {
+          firstRow = entry.getKey().getRow();
+        }
+        if (TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(entry.getKey())) {
+          Preconditions.checkState(entry.getKey().getRow().equals(firstRow),
+              "Row " + entry.getKey().getRow() + " does not match first row 
seen " + firstRow);
+          highTablet = KeyExtent.fromMetaPrevRow(entry);
+          Manager.log.debug("found high tablet: {}", entry.getKey());
+          break;
+        } else if 
(MergedColumnFamily.MERGED_COLUMN.hasColumns(entry.getKey())) {
+          Preconditions.checkState(entry.getKey().getRow().equals(firstRow),
+              "Row " + entry.getKey().getRow() + " does not match first row 
seen " + firstRow);
+          Manager.log.debug("is merged true: {}", entry.getKey());
+          merged = true;
+        }
+      }
+
+      if (highTablet == null || !highTablet.tableId().equals(range.tableId())) 
{
         throw new AccumuloException("No last tablet for merge " + range + " " 
+ highTablet);
       }
-      return highTablet;
+      return new HighTablet(highTablet, merged);
     } catch (Exception ex) {
       throw new AccumuloException("Unexpected failure finding the last tablet 
for a merge " + range,
           ex);
@@ -1353,4 +1443,22 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
     }
   }
 
+  @VisibleForTesting
+  protected static class HighTablet {
+    private final KeyExtent extent;
+    private final boolean merged;
+
+    public HighTablet(KeyExtent extent, boolean merged) {
+      this.extent = Objects.requireNonNull(extent);
+      this.merged = merged;
+    }
+
+    public boolean isMerged() {
+      return merged;
+    }
+
+    public KeyExtent getExtent() {
+      return extent;
+    }
+  }
 }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java
index 0f1bcbee0c..3f136628cd 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java
@@ -124,7 +124,7 @@ public class MergeStats {
             info.getExtent());
       }
     }
-    if (state == MergeState.MERGING) {
+    if (state == MergeState.MERGING || state == MergeState.MERGED) {
       if (hosted != 0) {
         // Shouldn't happen
         log.error("Unexpected state: hosted tablets should be zero {} merge 
{}", hosted,
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/TabletGroupWatcherTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/TabletGroupWatcherTest.java
index 35026efbe9..0f693ad50f 100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/TabletGroupWatcherTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/TabletGroupWatcherTest.java
@@ -19,9 +19,16 @@
 package org.apache.accumulo.manager;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.manager.TabletGroupWatcher.HighTablet;
+import org.apache.hadoop.io.Text;
 import org.junit.jupiter.api.Test;
 
 public class TabletGroupWatcherTest {
@@ -64,4 +71,17 @@ public class TabletGroupWatcherTest {
     assertEquals(1, newValues.getSecond().getNumEntries());
     assertEquals(original.getTime(), newValues.getSecond().getTime());
   }
+
+  @Test
+  public void testHighTablet() {
+    HighTablet mergedTruePrevRowFalse =
+        new HighTablet(new KeyExtent(MetadataTable.ID, new Text("end"), null), 
true);
+    assertNotNull(mergedTruePrevRowFalse.getExtent());
+    assertTrue(mergedTruePrevRowFalse.isMerged());
+
+    HighTablet mergedFalsePrevRowFalse =
+        new HighTablet(new KeyExtent(MetadataTable.ID, new Text("end"), null), 
false);
+    assertNotNull(mergedFalsePrevRowFalse.getExtent());
+    assertFalse(mergedFalsePrevRowFalse.isMerged());
+  }
 }
diff --git a/test/src/main/java/org/apache/accumulo/test/MetaSplitIT.java 
b/test/src/main/java/org/apache/accumulo/test/MetaSplitIT.java
index aeee11e9e9..013a1e7dbd 100644
--- a/test/src/main/java/org/apache/accumulo/test/MetaSplitIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/MetaSplitIT.java
@@ -19,6 +19,7 @@
 package org.apache.accumulo.test;
 
 import static org.apache.accumulo.test.util.FileMetadataUtil.countFencedFiles;
+import static 
org.apache.accumulo.test.util.FileMetadataUtil.verifyMergedMarkerCleared;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -145,21 +146,29 @@ public class MetaSplitIT extends AccumuloClusterHarness {
         // Merging tablets should produce fenced files because of no-chop merge
         assertTrue(countFencedFiles(getServerContext(), MetadataTable.NAME) > 
0);
         verifyMetadataTableScan(client);
+        // Verify that the MERGED marker was cleared and doesn't exist on any 
tablet
+        verifyMergedMarkerCleared(getServerContext(), MetadataTable.ID);
 
         addSplits(opts, "44 55 66 77 88".split(" "));
         checkMetadataSplits(9, opts);
         assertTrue(countFencedFiles(getServerContext(), MetadataTable.NAME) > 
0);
         verifyMetadataTableScan(client);
+        // Verify that the MERGED marker was cleared and doesn't exist on any 
tablet
+        verifyMergedMarkerCleared(getServerContext(), MetadataTable.ID);
 
         opts.merge(MetadataTable.NAME, new Text("5"), new Text("7"));
         checkMetadataSplits(6, opts);
         assertTrue(countFencedFiles(getServerContext(), MetadataTable.NAME) > 
0);
         verifyMetadataTableScan(client);
+        // Verify that the MERGED marker was cleared and doesn't exist on any 
tablet
+        verifyMergedMarkerCleared(getServerContext(), MetadataTable.ID);
 
         opts.merge(MetadataTable.NAME, null, null);
         checkMetadataSplits(0, opts);
         assertTrue(countFencedFiles(getServerContext(), MetadataTable.NAME) > 
0);
         verifyMetadataTableScan(client);
+        // Verify that the MERGED marker was cleared and doesn't exist on any 
tablet
+        verifyMergedMarkerCleared(getServerContext(), MetadataTable.ID);
 
         opts.compact(MetadataTable.NAME, new CompactionConfig());
         // Should be no more fenced files after compaction
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java
index c8e8f93de5..4d265a1120 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.test.functional;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static 
org.apache.accumulo.test.util.FileMetadataUtil.printAndVerifyFileMetadata;
+import static 
org.apache.accumulo.test.util.FileMetadataUtil.verifyMergedMarkerCleared;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -97,6 +98,9 @@ public class MergeIT extends AccumuloClusterHarness {
       c.tableOperations().flush(tableName, null, null, true);
       c.tableOperations().merge(tableName, new Text("c1"), new Text("f1"));
       assertEquals(8, c.tableOperations().listSplits(tableName).size());
+      // Verify that the MERGED marker was cleared
+      verifyMergedMarkerCleared(getServerContext(),
+          TableId.of(c.tableOperations().tableIdMap().get(tableName)));
     }
   }
 
@@ -189,6 +193,9 @@ public class MergeIT extends AccumuloClusterHarness {
       verify(c, 100, 201, tableName);
       verifyNoRows(c, 100, 301, tableName);
       verify(c, 600, 401, tableName);
+
+      // Verify that the MERGED marker was cleared
+      verifyMergedMarkerCleared(getServerContext(), tableId);
     }
   }
 
@@ -251,6 +258,8 @@ public class MergeIT extends AccumuloClusterHarness {
       c.tableOperations().merge(tableName, null, null);
       log.debug("Metadata after Merge");
       printAndVerifyFileMetadata(getServerContext(), tableId, 12);
+      // Verify that the MERGED marker was cleared
+      verifyMergedMarkerCleared(getServerContext(), tableId);
 
       // Verify that the deleted rows can't be read after merge
       verify(c, 150, 1, tableName);
@@ -332,6 +341,8 @@ public class MergeIT extends AccumuloClusterHarness {
       c.tableOperations().merge(tableName, null, null);
       log.debug("Metadata after second Merge");
       printAndVerifyFileMetadata(getServerContext(), tableId, -1);
+      // Verify that the MERGED marker was cleared
+      verifyMergedMarkerCleared(getServerContext(), tableId);
 
       // Verify that the deleted rows can't be read after merge
       verify(c, 150, 1, tableName);
@@ -382,6 +393,8 @@ public class MergeIT extends AccumuloClusterHarness {
       c.tableOperations().merge(tableName, null, null);
       log.debug("Metadata after Merge");
       printAndVerifyFileMetadata(getServerContext(), tableId, -1);
+      // Verify that the MERGED marker was cleared
+      verifyMergedMarkerCleared(getServerContext(), tableId);
 
       // Verify that the deleted rows can't be read after merge
       verify(c, 100, 1, tableName);
@@ -518,16 +531,17 @@ public class MergeIT extends AccumuloClusterHarness {
 
     log.debug("Before Merge");
     client.tableOperations().flush(table, null, null, true);
-    printAndVerifyFileMetadata(getServerContext(),
-        TableId.of(client.tableOperations().tableIdMap().get(table)));
+    TableId tableId = 
TableId.of(client.tableOperations().tableIdMap().get(table));
+    printAndVerifyFileMetadata(getServerContext(), tableId);
 
     client.tableOperations().merge(table, start == null ? null : new 
Text(start),
         end == null ? null : new Text(end));
 
     client.tableOperations().flush(table, null, null, true);
     log.debug("After Merge");
-    printAndVerifyFileMetadata(getServerContext(),
-        TableId.of(client.tableOperations().tableIdMap().get(table)));
+    printAndVerifyFileMetadata(getServerContext(), tableId);
+    // Verify that the MERGED marker was cleared
+    verifyMergedMarkerCleared(getServerContext(), tableId);
 
     try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) {
 
diff --git 
a/test/src/main/java/org/apache/accumulo/test/util/FileMetadataUtil.java 
b/test/src/main/java/org/apache/accumulo/test/util/FileMetadataUtil.java
index 8d94ff3506..bfc39c9e36 100644
--- a/test/src/main/java/org/apache/accumulo/test/util/FileMetadataUtil.java
+++ b/test/src/main/java/org/apache/accumulo/test/util/FileMetadataUtil.java
@@ -19,6 +19,7 @@
 package org.apache.accumulo.test.util;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -169,6 +170,14 @@ public class FileMetadataUtil {
     ctx.tableOperations().online(tableName, true);
   }
 
+  // Verifies that the MERGED marker was cleared and doesn't exist on any 
tablet
+  public static void verifyMergedMarkerCleared(final ServerContext ctx, 
TableId tableId) {
+    try (var tabletsMetadata =
+        
ctx.getAmple().readTablets().forTable(tableId).fetch(ColumnType.MERGED).build())
 {
+      
assertTrue(tabletsMetadata.stream().noneMatch(TabletMetadata::hasMerged));
+    }
+  }
+
   public interface FileMutator {
     void mutate(TabletMetadata tm, TabletMutator mutator, StoredTabletFile 
file,
         DataFileValue value);


Reply via email to