This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new c615abf55b Update elasticity MergeTablets to use MERGED marker (#3975) c615abf55b is described below commit c615abf55b3ccf1bd32d0645797f14286911df07 Author: Christopher L. Shannon <christopher.l.shan...@gmail.com> AuthorDate: Tue Nov 28 18:32:06 2023 -0500 Update elasticity MergeTablets to use MERGED marker (#3975) The MERGED marker is set on the last tablet of the merged range after metadata has been updated so that it is possible to know whether or not the files were already fenced. If the marker exists then the fencing for the last tablet can be skipped as the process was restarted. --- .../apache/accumulo/core/metadata/schema/Ample.java | 4 ++++ .../core/metadata/schema/TabletMetadataBuilder.java | 13 +++++++++++++ .../core/metadata/schema/TabletMutatorBase.java | 13 +++++++++++++ .../core/metadata/schema/TabletMetadataTest.java | 18 ++++++++++-------- .../accumulo/manager/tableOps/merge/DeleteTablets.java | 10 +++++++--- .../accumulo/manager/tableOps/merge/MergeTablets.java | 16 +++++++++------- 6 files changed, 56 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index d9bd1b0f07..5c68eba658 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java @@ -414,6 +414,10 @@ public interface Ample { * is not empty */ T deleteAll(Set<Key> keys); + + T setMerged(); + + T deleteMerged(); } interface TabletMutator extends TabletUpdates<TabletMutator> { diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java index c393a552b4..b9c2d9ae55 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java @@ -29,6 +29,7 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOGS; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.MERGED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SCANS; @@ -273,6 +274,18 @@ public class TabletMetadataBuilder implements Ample.TabletUpdates<TabletMetadata throw new UnsupportedOperationException(); } + @Override + public TabletMetadataBuilder setMerged() { + fetched.add(MERGED); + internalBuilder.setMerged(); + return this; + } + + @Override + public TabletMetadataBuilder deleteMerged() { + throw new UnsupportedOperationException(); + } + /** * @param extraFetched Anything that was put on the builder will automatically be added to the * fetched set. However, for the case where something was not put and it needs to be diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java index 18631f633a..86d0569fec 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java @@ -43,6 +43,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Fu import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.HostingColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.MergedColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn; @@ -329,6 +330,18 @@ public abstract class TabletMutatorBase<T extends Ample.TabletUpdates<T>> return getThis(); } + @Override + public T setMerged() { + MergedColumnFamily.MERGED_COLUMN.put(mutation, MergedColumnFamily.MERGED_VALUE); + return getThis(); + } + + @Override + public T deleteMerged() { + MergedColumnFamily.MERGED_COLUMN.putDelete(mutation); + return getThis(); + } + public void setCloseAfterMutate(AutoCloseable closeable) { this.closeAfterMutate = closeable; } diff --git a/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java b/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java index 61c2dded74..b8d73c3e8f 100644 --- a/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java +++ b/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java @@ -32,6 +32,7 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.HOSTING_REQUESTED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LAST; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.MERGED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SUSPEND; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -287,13 +288,13 @@ public class TabletMetadataTest { Mutation mutation = TabletColumnFamily.createPrevRowMutation(extent); MERGED_COLUMN.put(mutation, MERGED_VALUE); TabletMetadata tm = TabletMetadata.convertRow(toRowMap(mutation).entrySet().iterator(), - EnumSet.of(ColumnType.MERGED), true, false); + EnumSet.of(MERGED), true, false); assertTrue(tm.hasMerged()); // Column not set mutation = TabletColumnFamily.createPrevRowMutation(extent); - tm = TabletMetadata.convertRow(toRowMap(mutation).entrySet().iterator(), - EnumSet.of(ColumnType.MERGED), true, false); + tm = TabletMetadata.convertRow(toRowMap(mutation).entrySet().iterator(), EnumSet.of(MERGED), + true, false); assertFalse(tm.hasMerged()); // MERGED Column not fetched @@ -309,9 +310,8 @@ public class TabletMetadataTest { Mutation mutation = TabletColumnFamily.createPrevRowMutation(extent); mutation.put("1234567890abcdefg", "xyz", "v1"); - assertThrows(IllegalStateException.class, - () -> TabletMetadata.convertRow(toRowMap(mutation).entrySet().iterator(), - EnumSet.of(ColumnType.MERGED), true, false)); + assertThrows(IllegalStateException.class, () -> TabletMetadata + .convertRow(toRowMap(mutation).entrySet().iterator(), EnumSet.of(MERGED), true, false)); } private SortedMap<Key,Value> toRowMap(Mutation mutation) { @@ -353,7 +353,7 @@ public class TabletMetadataTest { .putLocation(Location.future(ser1)).putFile(sf1, dfv1).putFile(sf2, dfv2) .putCompactionId(23).putBulkFile(rf1, 25).putBulkFile(rf2, 35).putFlushId(27) .putDirName("dir1").putScan(sf3).putScan(sf4).putCompacted(17).putCompacted(23) - .build(ECOMP, HOSTING_REQUESTED); + .build(ECOMP, HOSTING_REQUESTED, MERGED); assertEquals(extent, tm.getExtent()); assertEquals(TabletHostingGoal.NEVER, tm.getHostingGoal()); @@ -367,6 +367,7 @@ public class TabletMetadataTest { assertEquals(Set.of(), tm.getExternalCompactions().keySet()); assertEquals(Set.of(17L, 23L), tm.getCompacted()); assertFalse(tm.getHostingRequested()); + assertFalse(tm.hasMerged()); assertThrows(IllegalStateException.class, tm::getOperationId); assertThrows(IllegalStateException.class, tm::getSuspend); assertThrows(IllegalStateException.class, tm::getTime); @@ -402,7 +403,7 @@ public class TabletMetadataTest { TabletMetadata tm3 = TabletMetadata.builder(extent).putExternalCompaction(ecid1, ecm) .putSuspension(ser1, 45L).putTime(new MetadataTime(479, TimeType.LOGICAL)).putWal(le1) - .putWal(le2).setHostingRequested().putSelectedFiles(selFiles).build(); + .putWal(le2).setHostingRequested().putSelectedFiles(selFiles).setMerged().build(); assertEquals(Set.of(ecid1), tm3.getExternalCompactions().keySet()); assertEquals(Set.of(sf1, sf2), tm3.getExternalCompactions().get(ecid1).getJobFiles()); @@ -416,6 +417,7 @@ public class TabletMetadataTest { assertEquals(159L, tm3.getSelectedFiles().getFateTxId()); assertFalse(tm3.getSelectedFiles().initiallySelectedAll()); assertEquals(selFiles.getMetadataValue(), tm3.getSelectedFiles().getMetadataValue()); + assertTrue(tm3.hasMerged()); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteTablets.java index d34a8f8355..49190e9e0f 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteTablets.java @@ -89,14 +89,18 @@ public class DeleteTablets extends ManagerRepo { for (var tabletMeta : tabletsMetadata) { MergeTablets.validateTablet(tabletMeta, fateStr, opid, data.tableId); + var tabletMutator = tabletsMutator.mutateTablet(tabletMeta.getExtent()) + .requireOperation(opid).requireAbsentLocation(); + // do not delete the last tablet if (Objects.equals(tabletMeta.getExtent().endRow(), lastEndRow)) { + // Clear the merged marker after we are finished on the last tablet + tabletMutator.deleteMerged(); + tabletMutator.submit((tm) -> !tm.hasMerged()); + submitted++; break; } - var tabletMutator = tabletsMutator.mutateTablet(tabletMeta.getExtent()) - .requireOperation(opid).requireAbsentLocation(); - tabletMeta.getKeyValues().keySet().forEach(key -> { log.trace("{} deleting {}", fateStr, key); }); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java index 5e89a58d20..4859708b73 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java @@ -24,13 +24,13 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.HOSTING_GOAL; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOGS; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.MERGED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.TIME; import static org.apache.accumulo.manager.tableOps.merge.DeleteRows.verifyAccepted; import java.util.ArrayList; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -56,7 +56,6 @@ import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.tableOps.ManagerRepo; import org.apache.accumulo.server.gc.AllVolumesDirectory; import org.apache.accumulo.server.tablets.TabletTime; -import org.apache.hadoop.io.Text; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,7 +89,8 @@ public class MergeTablets extends ManagerRepo { try (var tabletsMetadata = manager.getContext().getAmple().readTablets() .forTable(range.tableId()).overlapping(range.prevEndRow(), range.endRow()) - .fetch(OPID, LOCATION, HOSTING_GOAL, FILES, TIME, DIR, ECOMP, PREV_ROW, LOGS).build()) { + .fetch(OPID, LOCATION, HOSTING_GOAL, FILES, TIME, DIR, ECOMP, PREV_ROW, LOGS, MERGED) + .build()) { int tabletsSeen = 0; @@ -149,10 +149,8 @@ public class MergeTablets extends ManagerRepo { // Check if the last tablet was already updated, this could happen if a process died and this // code is running a 2nd time. If running a 2nd time it possible the last tablet was updated and // only a subset of the other tablets were deleted. If the last tablet was never updated, then - // its prev row should be the greatest. - Comparator<Text> prevRowComparator = Comparator.nullsFirst(Text::compareTo); - if (prevRowComparator.compare(firstTabletMeta.getPrevEndRow(), lastTabletMeta.getPrevEndRow()) - < 0) { + // the merged marker should not exist + if (!lastTabletMeta.hasMerged()) { // update the last tablet try (var tabletsMutator = manager.getContext().getAmple().conditionallyMutateTablets()) { var lastExtent = lastTabletMeta.getExtent(); @@ -177,6 +175,10 @@ public class MergeTablets extends ManagerRepo { tabletMutator.putHostingGoal(DeleteRows.getMergeHostingGoal(range, goals)); tabletMutator.putPrevEndRow(firstTabletMeta.getPrevEndRow()); + // Set merged marker on the last tablet when we are finished + // so we know that we already updated metadata if the process restarts + tabletMutator.setMerged(); + // if the tablet no longer exists (because changed prev end row, then the update was // successful. tabletMutator.submit(Ample.RejectionHandler.acceptAbsentTablet());