This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new b8089dd528 Add a merge column marker to make merge idempotent (#3957) b8089dd528 is described below commit b8089dd528dd959fa3eeea23063d47c61cacedeb Author: Christopher L. Shannon <christopher.l.shan...@gmail.com> AuthorDate: Sat Nov 18 15:03:05 2023 -0500 Add a merge column marker to make merge idempotent (#3957) The new MERGE marker allows correctly resuming metadata updates for no-chop merge if there was a failure and resume and makes merge idempotent Co-authored-by: Keith Turner <ktur...@apache.org> --- .../core/metadata/schema/MetadataSchema.java | 12 ++ .../core/metadata/schema/TabletMetadata.java | 13 +- .../core/metadata/schema/TabletsMetadata.java | 4 + .../core/metadata/schema/TabletMetadataTest.java | 29 ++++ .../server/constraints/MetadataConstraints.java | 4 +- .../accumulo/server/manager/state/MergeState.java | 9 +- .../java/org/apache/accumulo/manager/Manager.java | 1 + .../accumulo/manager/TabletGroupWatcher.java | 154 ++++++++++++++++++--- .../apache/accumulo/manager/state/MergeStats.java | 2 +- .../accumulo/manager/TabletGroupWatcherTest.java | 20 +++ .../java/org/apache/accumulo/test/MetaSplitIT.java | 9 ++ .../apache/accumulo/test/functional/MergeIT.java | 22 ++- .../accumulo/test/util/FileMetadataUtil.java | 9 ++ 13 files changed, 256 insertions(+), 32 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java index 0cb57e586f..2bc0a6e18e 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java @@ -325,6 +325,18 @@ public class MetadataSchema { public static final String STR_NAME = "ecomp"; public static final Text NAME = new Text(STR_NAME); } + + /** + * Column family for indicating that the files in a tablet contain fenced files that have been + * merged from other tablets during a merge operation. This is used to support resuming a failed + * merge operation. + */ + public static class MergedColumnFamily { + public static final String STR_NAME = "merged"; + public static final Text NAME = new Text(STR_NAME); + public static final ColumnFQ MERGED_COLUMN = new ColumnFQ(NAME, new Text(STR_NAME)); + public static final Value MERGED_VALUE = new Value("merged"); + } } /** diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java index fccc3e6c43..af31605d51 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java @@ -62,6 +62,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Ex import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.MergedColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn; @@ -104,6 +105,7 @@ public class TabletMetadata { private OptionalLong compact = OptionalLong.empty(); private Double splitRatio = null; private Map<ExternalCompactionId,ExternalCompactionMetadata> extCompactions; + private boolean merged; public enum LocationType { CURRENT, FUTURE, LAST @@ -125,7 +127,8 @@ public class TabletMetadata { COMPACT_ID, SPLIT_RATIO, SUSPEND, - ECOMP + ECOMP, + MERGED } public static class Location { @@ -345,6 +348,11 @@ public class TabletMetadata { return splitRatio; } + public boolean hasMerged() { + ensureFetched(ColumnType.MERGED); + return merged; + } + public SortedMap<Key,Value> getKeyValues() { Preconditions.checkState(keyValues != null, "Requested key values when it was not saved"); return keyValues; @@ -479,6 +487,9 @@ public class TabletMetadata { extCompBuilder.put(ExternalCompactionId.of(qual), ExternalCompactionMetadata.fromJson(val)); break; + case MergedColumnFamily.STR_NAME: + te.merged = true; + break; default: throw new IllegalStateException("Unexpected family " + fam); } diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java index d8811234df..515e6a08ef 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java @@ -76,6 +76,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Ex import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.MergedColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; @@ -336,6 +337,9 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable case ECOMP: families.add(ExternalCompactionColumnFamily.NAME); break; + case MERGED: + families.add(MergedColumnFamily.NAME); + break; default: throw new IllegalArgumentException("Unknown col type " + colToFetch); diff --git a/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java b/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java index a3a3d33a45..d547b8596d 100644 --- a/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java +++ b/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java @@ -20,6 +20,8 @@ package org.apache.accumulo.core.metadata.schema; import static java.util.stream.Collectors.toSet; import static org.apache.accumulo.core.metadata.StoredTabletFile.serialize; +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.MergedColumnFamily.MERGED_COLUMN; +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.MergedColumnFamily.MERGED_VALUE; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.COMPACT_COLUMN; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.FLUSH_COLUMN; @@ -113,6 +115,8 @@ public class TabletMetadataTest { mutation.at().family(ScanFileColumnFamily.NAME).qualifier(sf1.getMetadata()).put(""); mutation.at().family(ScanFileColumnFamily.NAME).qualifier(sf2.getMetadata()).put(""); + MERGED_COLUMN.put(mutation, new Value()); + SortedMap<Key,Value> rowMap = toRowMap(mutation); TabletMetadata tm = TabletMetadata.convertRow(rowMap.entrySet().iterator(), @@ -143,6 +147,7 @@ public class TabletMetadataTest { assertTrue(tm.sawPrevEndRow()); assertEquals("M123456789", tm.getTime().encode()); assertEquals(Set.of(sf1, sf2), Set.copyOf(tm.getScans())); + assertTrue(tm.hasMerged()); } @Test @@ -258,6 +263,30 @@ public class TabletMetadataTest { assertFalse(tm.hasCurrent()); } + @Test + public void testMergedColumn() { + KeyExtent extent = new KeyExtent(TableId.of("5"), new Text("df"), new Text("da")); + + // Test merged column set + Mutation mutation = TabletColumnFamily.createPrevRowMutation(extent); + MERGED_COLUMN.put(mutation, MERGED_VALUE); + TabletMetadata tm = TabletMetadata.convertRow(toRowMap(mutation).entrySet().iterator(), + EnumSet.of(ColumnType.MERGED), true); + assertTrue(tm.hasMerged()); + + // Column not set + mutation = TabletColumnFamily.createPrevRowMutation(extent); + tm = TabletMetadata.convertRow(toRowMap(mutation).entrySet().iterator(), + EnumSet.of(ColumnType.MERGED), true); + assertFalse(tm.hasMerged()); + + // MERGED Column not fetched + mutation = TabletColumnFamily.createPrevRowMutation(extent); + tm = TabletMetadata.convertRow(toRowMap(mutation).entrySet().iterator(), + EnumSet.of(ColumnType.PREV_ROW), true); + assertThrows(IllegalStateException.class, tm::hasMerged); + } + private SortedMap<Key,Value> toRowMap(Mutation mutation) { SortedMap<Key,Value> rowMap = new TreeMap<>(); mutation.getUpdates().forEach(cu -> { diff --git a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java index 936d7f16ef..ca45a05e8a 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java +++ b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java @@ -47,6 +47,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Ex import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.MergedColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn; @@ -98,7 +99,8 @@ public class MetadataConstraints implements Constraint { FutureLocationColumnFamily.NAME, ClonedColumnFamily.NAME, ExternalCompactionColumnFamily.NAME, - UpgraderDeprecatedConstants.ChoppedColumnFamily.NAME + UpgraderDeprecatedConstants.ChoppedColumnFamily.NAME, + MergedColumnFamily.NAME ); // @formatter:on diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MergeState.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MergeState.java index fc15a0b39c..1ab0c6d777 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MergeState.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MergeState.java @@ -33,10 +33,15 @@ public enum MergeState { */ WAITING_FOR_OFFLINE, /** - * when the number of chopped, offline tablets equals the number of merge tablets, begin the - * metadata updates + * when the number of offline tablets equals the number of merge tablets, begin the metadata + * updates */ MERGING, + /** + * when the operation has finished metadata updates for merge. We can now remove the merged + * tablets and clear the MERGED marker. Not used for delete + */ + MERGED, /** * merge is complete, the resulting tablet can be brought online, remove the marker in zookeeper */ diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 8142af7c3f..b75c8601c4 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -657,6 +657,7 @@ public class Manager extends AbstractServer return TabletGoalState.UNASSIGNED; } case MERGING: + case MERGED: return TabletGoalState.UNASSIGNED; } } else { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index b99bf3473b..c4e29eaf69 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -77,6 +77,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Da import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.MergedColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataTime; @@ -585,15 +586,28 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { try { if (stats.getMergeInfo().isDelete()) { deleteTablets(stats.getMergeInfo()); + // For delete we are done and can skip to COMPLETE + update = MergeState.COMPLETE; } else { mergeMetadataRecords(stats.getMergeInfo()); + // For merge we need another state to delete the tablets + // and clear the marker + update = MergeState.MERGED; } - update = MergeState.COMPLETE; manager.setMergeState(stats.getMergeInfo(), update); } catch (Exception ex) { Manager.log.error("Unable merge metadata table records", ex); } } + + // If the state is MERGED then we are finished with metadata updates + if (update == MergeState.MERGED) { + // Finish the merge operatoin by deleting the merged tablets and + // cleaning up the marker that was used for merge + deleteMergedTablets(stats.getMergeInfo()); + update = MergeState.COMPLETE; + manager.setMergeState(stats.getMergeInfo(), update); + } } catch (Exception ex) { Manager.log.error( "Unable to update merge state for merge " + stats.getMergeInfo().getExtent(), ex); @@ -601,6 +615,16 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { } } + // Remove the merged marker from the last tablet in the merge range + private void clearMerged(MergeInfo mergeInfo, BatchWriter bw, HighTablet highTablet) + throws AccumuloException { + Manager.log.debug("Clearing MERGED marker for {}", mergeInfo.getExtent()); + var m = new Mutation(highTablet.getExtent().toMetaRow()); + MergedColumnFamily.MERGED_COLUMN.putDelete(m); + bw.addMutation(m); + bw.flush(); + } + // This method finds returns the deletion starting row (exclusive) for tablets that // need to be actually deleted. If the startTablet is null then // the deletion start row will just be null as all tablets are being deleted @@ -720,7 +744,8 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { if (extent.endRow() != null) { Key nextExtent = new Key(extent.endRow()).followingKey(PartialKey.ROW); followingTablet = - getHighTablet(new KeyExtent(extent.tableId(), nextExtent.getRow(), extent.endRow())); + getHighTablet(new KeyExtent(extent.tableId(), nextExtent.getRow(), extent.endRow())) + .getExtent(); Manager.log.debug("Found following tablet {}", followingTablet); } try { @@ -805,7 +830,8 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { private void mergeMetadataRecords(MergeInfo info) throws AccumuloException { KeyExtent range = info.getExtent(); Manager.log.debug("Merging metadata for {}", range); - KeyExtent stop = getHighTablet(range); + HighTablet highTablet = getHighTablet(range); + KeyExtent stop = highTablet.getExtent(); Manager.log.debug("Highest tablet is {}", stop); Value firstPrevRowValue = null; Text stopRow = stop.toMetaRow(); @@ -813,8 +839,7 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { if (start == null) { start = new Text(); } - Range scanRange = - new Range(TabletsSection.encodeRow(range.tableId(), start), false, stopRow, false); + String targetSystemTable = MetadataTable.NAME; if (range.isMeta()) { targetSystemTable = RootTable.NAME; @@ -826,6 +851,13 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { KeyExtent previousKeyExtent = null; KeyExtent lastExtent = null; + // Check if we have already previously fenced the tablets + if (highTablet.isMerged()) { + Manager.log.debug("tablet metadata already fenced for merge {}", range); + // Return as we already fenced the files + return; + } + try (BatchWriter bw = client.createBatchWriter(targetSystemTable)) { long fileCount = 0; // Make file entries in highest tablet @@ -954,28 +986,62 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { // delete any entries for external compactions extCompIds.forEach(ecid -> m.putDelete(ExternalCompactionColumnFamily.STR_NAME, ecid)); - if (!m.getUpdates().isEmpty()) { - bw.addMutation(m); - } + // Add a marker so we know the tablets have been fenced in case the merge operation + // needs to be recovered and restarted to finish later. + MergedColumnFamily.MERGED_COLUMN.put(m, MergedColumnFamily.MERGED_VALUE); + // Add the prev row column update to the same mutation as the + // file updates so it will be atomic and only update the prev row + // if the tablets were fenced + Preconditions.checkState(firstPrevRowValue != null, + "Previous row entry for lowest tablet was not found."); + stop = new KeyExtent(stop.tableId(), stop.endRow(), + TabletColumnFamily.decodePrevEndRow(firstPrevRowValue)); + TabletColumnFamily.PREV_ROW_COLUMN.put(m, + TabletColumnFamily.encodePrevEndRow(stop.prevEndRow())); + Manager.log.debug("Setting the prevRow for last tablet: {}", stop); + bw.addMutation(m); bw.flush(); Manager.log.debug("Moved {} files to {}", fileCount, stop); + } catch (Exception ex) { + throw new AccumuloException(ex); + } + } - if (firstPrevRowValue == null) { - Manager.log.debug("tablet already merged"); - return; - } + private void deleteMergedTablets(MergeInfo info) throws AccumuloException { + KeyExtent range = info.getExtent(); + Manager.log.debug("Deleting merged tablets for {}", range); + HighTablet highTablet = getHighTablet(range); + if (!highTablet.isMerged()) { + Manager.log.debug("Tablets have already been deleted for merge with range {}, returning", + range); + return; + } - stop = new KeyExtent(stop.tableId(), stop.endRow(), - TabletColumnFamily.decodePrevEndRow(firstPrevRowValue)); - Mutation updatePrevRow = TabletColumnFamily.createPrevRowMutation(stop); - Manager.log.debug("Setting the prevRow for last tablet: {}", stop); - bw.addMutation(updatePrevRow); - bw.flush(); + KeyExtent stop = highTablet.getExtent(); + Manager.log.debug("Highest tablet is {}", stop); + + Text stopRow = stop.toMetaRow(); + Text start = range.prevEndRow(); + if (start == null) { + start = new Text(); + } + Range scanRange = + new Range(TabletsSection.encodeRow(range.tableId(), start), false, stopRow, false); + String targetSystemTable = MetadataTable.NAME; + if (range.isMeta()) { + targetSystemTable = RootTable.NAME; + } + + AccumuloClient client = manager.getContext(); + try (BatchWriter bw = client.createBatchWriter(targetSystemTable)) { + // Continue and delete the tablets that were merged deleteTablets(info, scanRange, bw, client); + // Clear the merged marker after we finish deleting tablets + clearMerged(info, bw, highTablet); } catch (Exception ex) { throw new AccumuloException(ex); } @@ -1206,6 +1272,7 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { // group all deletes into tablet into one mutation, this makes tablets // either disappear entirely or not all.. this is important for the case // where the process terminates in the loop below... + Manager.log.debug("Inside delete tablets"); scanner = client.createScanner(info.getExtent().isMeta() ? RootTable.NAME : MetadataTable.NAME, Authorizations.EMPTY); Manager.log.debug("Deleting range {}", scanRange); @@ -1236,24 +1303,47 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { || key.getColumnFamily().equals(FutureLocationColumnFamily.NAME); } - private KeyExtent getHighTablet(KeyExtent range) throws AccumuloException { + private HighTablet getHighTablet(KeyExtent range) throws AccumuloException { try { AccumuloClient client = manager.getContext(); Scanner scanner = client.createScanner(range.isMeta() ? RootTable.NAME : MetadataTable.NAME, Authorizations.EMPTY); TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner); + MergedColumnFamily.MERGED_COLUMN.fetch(scanner); KeyExtent start = new KeyExtent(range.tableId(), range.endRow(), null); scanner.setRange(new Range(start.toMetaRow(), null)); Iterator<Entry<Key,Value>> iterator = scanner.iterator(); if (!iterator.hasNext()) { throw new AccumuloException("No last tablet for a merge " + range); } - Entry<Key,Value> entry = iterator.next(); - KeyExtent highTablet = KeyExtent.fromMetaPrevRow(entry); - if (!highTablet.tableId().equals(range.tableId())) { + + KeyExtent highTablet = null; + boolean merged = false; + Text firstRow = null; + + while (iterator.hasNext()) { + Entry<Key,Value> entry = iterator.next(); + if (firstRow == null) { + firstRow = entry.getKey().getRow(); + } + if (TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(entry.getKey())) { + Preconditions.checkState(entry.getKey().getRow().equals(firstRow), + "Row " + entry.getKey().getRow() + " does not match first row seen " + firstRow); + highTablet = KeyExtent.fromMetaPrevRow(entry); + Manager.log.debug("found high tablet: {}", entry.getKey()); + break; + } else if (MergedColumnFamily.MERGED_COLUMN.hasColumns(entry.getKey())) { + Preconditions.checkState(entry.getKey().getRow().equals(firstRow), + "Row " + entry.getKey().getRow() + " does not match first row seen " + firstRow); + Manager.log.debug("is merged true: {}", entry.getKey()); + merged = true; + } + } + + if (highTablet == null || !highTablet.tableId().equals(range.tableId())) { throw new AccumuloException("No last tablet for merge " + range + " " + highTablet); } - return highTablet; + return new HighTablet(highTablet, merged); } catch (Exception ex) { throw new AccumuloException("Unexpected failure finding the last tablet for a merge " + range, ex); @@ -1353,4 +1443,22 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { } } + @VisibleForTesting + protected static class HighTablet { + private final KeyExtent extent; + private final boolean merged; + + public HighTablet(KeyExtent extent, boolean merged) { + this.extent = Objects.requireNonNull(extent); + this.merged = merged; + } + + public boolean isMerged() { + return merged; + } + + public KeyExtent getExtent() { + return extent; + } + } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java b/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java index 0f1bcbee0c..3f136628cd 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java @@ -124,7 +124,7 @@ public class MergeStats { info.getExtent()); } } - if (state == MergeState.MERGING) { + if (state == MergeState.MERGING || state == MergeState.MERGED) { if (hosted != 0) { // Shouldn't happen log.error("Unexpected state: hosted tablets should be zero {} merge {}", hosted, diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/TabletGroupWatcherTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/TabletGroupWatcherTest.java index 35026efbe9..0f693ad50f 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/TabletGroupWatcherTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/TabletGroupWatcherTest.java @@ -19,9 +19,16 @@ package org.apache.accumulo.manager; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.manager.TabletGroupWatcher.HighTablet; +import org.apache.hadoop.io.Text; import org.junit.jupiter.api.Test; public class TabletGroupWatcherTest { @@ -64,4 +71,17 @@ public class TabletGroupWatcherTest { assertEquals(1, newValues.getSecond().getNumEntries()); assertEquals(original.getTime(), newValues.getSecond().getTime()); } + + @Test + public void testHighTablet() { + HighTablet mergedTruePrevRowFalse = + new HighTablet(new KeyExtent(MetadataTable.ID, new Text("end"), null), true); + assertNotNull(mergedTruePrevRowFalse.getExtent()); + assertTrue(mergedTruePrevRowFalse.isMerged()); + + HighTablet mergedFalsePrevRowFalse = + new HighTablet(new KeyExtent(MetadataTable.ID, new Text("end"), null), false); + assertNotNull(mergedFalsePrevRowFalse.getExtent()); + assertFalse(mergedFalsePrevRowFalse.isMerged()); + } } diff --git a/test/src/main/java/org/apache/accumulo/test/MetaSplitIT.java b/test/src/main/java/org/apache/accumulo/test/MetaSplitIT.java index aeee11e9e9..013a1e7dbd 100644 --- a/test/src/main/java/org/apache/accumulo/test/MetaSplitIT.java +++ b/test/src/main/java/org/apache/accumulo/test/MetaSplitIT.java @@ -19,6 +19,7 @@ package org.apache.accumulo.test; import static org.apache.accumulo.test.util.FileMetadataUtil.countFencedFiles; +import static org.apache.accumulo.test.util.FileMetadataUtil.verifyMergedMarkerCleared; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -145,21 +146,29 @@ public class MetaSplitIT extends AccumuloClusterHarness { // Merging tablets should produce fenced files because of no-chop merge assertTrue(countFencedFiles(getServerContext(), MetadataTable.NAME) > 0); verifyMetadataTableScan(client); + // Verify that the MERGED marker was cleared and doesn't exist on any tablet + verifyMergedMarkerCleared(getServerContext(), MetadataTable.ID); addSplits(opts, "44 55 66 77 88".split(" ")); checkMetadataSplits(9, opts); assertTrue(countFencedFiles(getServerContext(), MetadataTable.NAME) > 0); verifyMetadataTableScan(client); + // Verify that the MERGED marker was cleared and doesn't exist on any tablet + verifyMergedMarkerCleared(getServerContext(), MetadataTable.ID); opts.merge(MetadataTable.NAME, new Text("5"), new Text("7")); checkMetadataSplits(6, opts); assertTrue(countFencedFiles(getServerContext(), MetadataTable.NAME) > 0); verifyMetadataTableScan(client); + // Verify that the MERGED marker was cleared and doesn't exist on any tablet + verifyMergedMarkerCleared(getServerContext(), MetadataTable.ID); opts.merge(MetadataTable.NAME, null, null); checkMetadataSplits(0, opts); assertTrue(countFencedFiles(getServerContext(), MetadataTable.NAME) > 0); verifyMetadataTableScan(client); + // Verify that the MERGED marker was cleared and doesn't exist on any tablet + verifyMergedMarkerCleared(getServerContext(), MetadataTable.ID); opts.compact(MetadataTable.NAME, new CompactionConfig()); // Should be no more fenced files after compaction diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java index c8e8f93de5..4d265a1120 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java @@ -20,6 +20,7 @@ package org.apache.accumulo.test.functional; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.accumulo.test.util.FileMetadataUtil.printAndVerifyFileMetadata; +import static org.apache.accumulo.test.util.FileMetadataUtil.verifyMergedMarkerCleared; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -97,6 +98,9 @@ public class MergeIT extends AccumuloClusterHarness { c.tableOperations().flush(tableName, null, null, true); c.tableOperations().merge(tableName, new Text("c1"), new Text("f1")); assertEquals(8, c.tableOperations().listSplits(tableName).size()); + // Verify that the MERGED marker was cleared + verifyMergedMarkerCleared(getServerContext(), + TableId.of(c.tableOperations().tableIdMap().get(tableName))); } } @@ -189,6 +193,9 @@ public class MergeIT extends AccumuloClusterHarness { verify(c, 100, 201, tableName); verifyNoRows(c, 100, 301, tableName); verify(c, 600, 401, tableName); + + // Verify that the MERGED marker was cleared + verifyMergedMarkerCleared(getServerContext(), tableId); } } @@ -251,6 +258,8 @@ public class MergeIT extends AccumuloClusterHarness { c.tableOperations().merge(tableName, null, null); log.debug("Metadata after Merge"); printAndVerifyFileMetadata(getServerContext(), tableId, 12); + // Verify that the MERGED marker was cleared + verifyMergedMarkerCleared(getServerContext(), tableId); // Verify that the deleted rows can't be read after merge verify(c, 150, 1, tableName); @@ -332,6 +341,8 @@ public class MergeIT extends AccumuloClusterHarness { c.tableOperations().merge(tableName, null, null); log.debug("Metadata after second Merge"); printAndVerifyFileMetadata(getServerContext(), tableId, -1); + // Verify that the MERGED marker was cleared + verifyMergedMarkerCleared(getServerContext(), tableId); // Verify that the deleted rows can't be read after merge verify(c, 150, 1, tableName); @@ -382,6 +393,8 @@ public class MergeIT extends AccumuloClusterHarness { c.tableOperations().merge(tableName, null, null); log.debug("Metadata after Merge"); printAndVerifyFileMetadata(getServerContext(), tableId, -1); + // Verify that the MERGED marker was cleared + verifyMergedMarkerCleared(getServerContext(), tableId); // Verify that the deleted rows can't be read after merge verify(c, 100, 1, tableName); @@ -518,16 +531,17 @@ public class MergeIT extends AccumuloClusterHarness { log.debug("Before Merge"); client.tableOperations().flush(table, null, null, true); - printAndVerifyFileMetadata(getServerContext(), - TableId.of(client.tableOperations().tableIdMap().get(table))); + TableId tableId = TableId.of(client.tableOperations().tableIdMap().get(table)); + printAndVerifyFileMetadata(getServerContext(), tableId); client.tableOperations().merge(table, start == null ? null : new Text(start), end == null ? null : new Text(end)); client.tableOperations().flush(table, null, null, true); log.debug("After Merge"); - printAndVerifyFileMetadata(getServerContext(), - TableId.of(client.tableOperations().tableIdMap().get(table))); + printAndVerifyFileMetadata(getServerContext(), tableId); + // Verify that the MERGED marker was cleared + verifyMergedMarkerCleared(getServerContext(), tableId); try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) { diff --git a/test/src/main/java/org/apache/accumulo/test/util/FileMetadataUtil.java b/test/src/main/java/org/apache/accumulo/test/util/FileMetadataUtil.java index 8d94ff3506..bfc39c9e36 100644 --- a/test/src/main/java/org/apache/accumulo/test/util/FileMetadataUtil.java +++ b/test/src/main/java/org/apache/accumulo/test/util/FileMetadataUtil.java @@ -19,6 +19,7 @@ package org.apache.accumulo.test.util; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.HashMap; import java.util.Map; @@ -169,6 +170,14 @@ public class FileMetadataUtil { ctx.tableOperations().online(tableName, true); } + // Verifies that the MERGED marker was cleared and doesn't exist on any tablet + public static void verifyMergedMarkerCleared(final ServerContext ctx, TableId tableId) { + try (var tabletsMetadata = + ctx.getAmple().readTablets().forTable(tableId).fetch(ColumnType.MERGED).build()) { + assertTrue(tabletsMetadata.stream().noneMatch(TabletMetadata::hasMerged)); + } + } + public interface FileMutator { void mutate(TabletMetadata tm, TabletMutator mutator, StoredTabletFile file, DataFileValue value);