This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 989011f6f0 moved merge code out of TGW and into Fate (#3854) 989011f6f0 is described below commit 989011f6f0b8752af7850363fc4254764d8248f5 Author: Keith Turner <ktur...@apache.org> AuthorDate: Tue Oct 17 16:52:46 2023 -0400 moved merge code out of TGW and into Fate (#3854) Most of the code in the new DeleteRows and MergeTablets classes was copied from TabletGroupWatcher with slight modifications (like adding the Manager as parameter). --- .../accumulo/server/manager/state/MergeState.java | 4 - .../server/manager/state/MergeInfoTest.java | 4 +- .../java/org/apache/accumulo/manager/Manager.java | 4 +- .../accumulo/manager/TabletGroupWatcher.java | 753 +-------------------- .../apache/accumulo/manager/state/MergeStats.java | 238 ------- .../manager/tableOps/merge/DeleteRows.java | 600 ++++++++++++++++ ...bleRangeOpWait.java => FinishTableRangeOp.java} | 17 +- .../manager/tableOps/merge/MergeTablets.java | 282 ++++++++ .../manager/tableOps/merge/TableRangeOp.java | 6 +- .../manager/tableOps/merge/WaitForOffline.java | 81 +++ .../merge/DeleteRowsTest.java} | 10 +- .../apache/accumulo/test/manager/MergeStateIT.java | 247 ------- 12 files changed, 984 insertions(+), 1262 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MergeState.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MergeState.java index fc15a0b39c..9bf6616bdf 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MergeState.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MergeState.java @@ -23,10 +23,6 @@ public enum MergeState { * Not merging */ NONE, - /** - * created, stored in zookeeper, other merges are prevented on the table - */ - STARTED, /** * when the number of chopped tablets in the range matches the number of online tablets in the * range, take the tablets offline diff --git a/server/base/src/test/java/org/apache/accumulo/server/manager/state/MergeInfoTest.java b/server/base/src/test/java/org/apache/accumulo/server/manager/state/MergeInfoTest.java index 01ae58bf24..2d7a3a7927 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/manager/state/MergeInfoTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/manager/state/MergeInfoTest.java @@ -76,7 +76,7 @@ public class MergeInfoTest { Text prevEndRow = new Text("begin"); keyExtent = new KeyExtent(TableId.of(table), endRow, prevEndRow); mi = new MergeInfo(keyExtent, MergeInfo.Operation.DELETE); - mi.setState(MergeState.STARTED); + mi.setState(MergeState.WAITING_FOR_OFFLINE); ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(baos); mi.write(dos); @@ -84,7 +84,7 @@ public class MergeInfoTest { DataInputStream dis = new DataInputStream(bais); mi = new MergeInfo(); mi.readFields(dis); - assertSame(MergeState.STARTED, mi.getState()); + assertSame(MergeState.WAITING_FOR_OFFLINE, mi.getState()); assertEquals(keyExtent, mi.getExtent()); assertSame(MergeInfo.Operation.DELETE, mi.getOperation()); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 21183349f5..f10dbd25c1 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -539,7 +539,7 @@ public class Manager extends AbstractServer throw new AssertionError("Unlikely", ex); } context.getZooReaderWriter().putPersistentData(path, out.getData(), - state.equals(MergeState.STARTED) ? ZooUtil.NodeExistsPolicy.FAIL + state.equals(MergeState.WAITING_FOR_OFFLINE) ? ZooUtil.NodeExistsPolicy.FAIL : ZooUtil.NodeExistsPolicy.OVERWRITE); } mergeLock.notifyAll(); @@ -711,8 +711,6 @@ public class Manager extends AbstractServer case NONE: case COMPLETE: break; - case STARTED: - return TabletGoalState.HOSTED; case WAITING_FOR_OFFLINE: case MERGING: return TabletGoalState.UNASSIGNED; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index 31c15071a4..29fe40cdce 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -26,44 +26,29 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Objects; -import java.util.Optional; import java.util.Set; import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeMap; -import java.util.TreeSet; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Predicate; -import java.util.stream.Stream; -import org.apache.accumulo.core.client.AccumuloClient; -import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.MutationsRejectedException; -import org.apache.accumulo.core.client.RowIterator; import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.admin.TabletHostingGoal; -import org.apache.accumulo.core.clientImpl.TabletHostingGoalUtil; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.PartialKey; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.gc.ReferenceFile; import org.apache.accumulo.core.logging.TabletLogger; import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl; import org.apache.accumulo.core.manager.state.TabletManagement; @@ -74,41 +59,25 @@ import org.apache.accumulo.core.manager.thrift.ManagerState; import org.apache.accumulo.core.manager.thrift.TabletServerStatus; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; -import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.TabletState; -import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; -import org.apache.accumulo.core.metadata.schema.DataFileValue; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.HostingColumnFamily; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; -import org.apache.accumulo.core.metadata.schema.MetadataTime; import org.apache.accumulo.core.metadata.schema.TabletMetadata; -import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; -import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.spi.balancer.data.TabletServerId; -import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.TextUtil; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.core.util.threads.Threads.AccumuloDaemonThread; import org.apache.accumulo.manager.Manager.TabletGoalState; import org.apache.accumulo.manager.split.SplitTask; -import org.apache.accumulo.manager.state.MergeStats; import org.apache.accumulo.manager.state.TableCounts; import org.apache.accumulo.manager.state.TableStats; -import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.ServiceEnvironmentImpl; import org.apache.accumulo.server.compaction.CompactionJobGenerator; import org.apache.accumulo.server.conf.TableConfiguration; -import org.apache.accumulo.server.gc.AllVolumesDirectory; import org.apache.accumulo.server.log.WalStateManager; import org.apache.accumulo.server.log.WalStateManager.WalMarkerException; import org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection; @@ -116,23 +85,17 @@ import org.apache.accumulo.server.manager.state.Assignment; import org.apache.accumulo.server.manager.state.ClosableIterator; import org.apache.accumulo.server.manager.state.DistributedStoreException; import org.apache.accumulo.server.manager.state.MergeInfo; -import org.apache.accumulo.server.manager.state.MergeState; import org.apache.accumulo.server.manager.state.TabletManagementIterator; import org.apache.accumulo.server.manager.state.TabletStateStore; import org.apache.accumulo.server.manager.state.UnassignedTablet; -import org.apache.accumulo.server.tablets.TabletTime; -import org.apache.accumulo.server.util.MetadataTableUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSortedSet; import com.google.common.collect.Iterators; -import com.google.common.collect.Sets; abstract class TabletGroupWatcher extends AccumuloDaemonThread { @@ -327,8 +290,6 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { private static class TableMgmtStats { int[] counts = new int[TabletState.values().length]; private int totalUnloaded; - - Map<TableId,MergeStats> mergeStatsCache = new HashMap<>(); } private TableMgmtStats manageTablets(Iterator<TabletManagement> iter, @@ -339,10 +300,10 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { TableMgmtStats tableMgmtStats = new TableMgmtStats(); int unloaded = 0; - Map<TableId,MergeStats> currentMerges = new HashMap<>(); + Map<TableId,MergeInfo> currentMerges = new HashMap<>(); for (MergeInfo merge : manager.merges()) { if (merge.getExtent() != null) { - currentMerges.put(merge.getExtent().tableId(), new MergeStats(merge)); + currentMerges.put(merge.getExtent().tableId(), merge); } } @@ -391,11 +352,8 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { final TableConfiguration tableConf = manager.getContext().getTableConfiguration(tableId); - final MergeStats mergeStats = tableMgmtStats.mergeStatsCache.computeIfAbsent(tableId, k -> { - var mStats = currentMerges.get(k); - return mStats != null ? mStats : new MergeStats(new MergeInfo()); - }); - TabletGoalState goal = manager.getGoalState(tm, mergeStats.getMergeInfo()); + TabletGoalState goal = manager.getGoalState(tm, + currentMerges.computeIfAbsent(tm.getTableId(), k -> new MergeInfo())); TabletState state = TabletState.compute(tm, currentTServers.keySet(), manager.tabletBalancer, resourceGroups); @@ -414,7 +372,6 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { if (isFullScan) { stats.update(tableId, state); } - mergeStats.update(tm.getExtent(), state); // Always follow through with assignments if (state == TabletState.ASSIGNED) { @@ -633,8 +590,6 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { tabletMgmtStats.totalUnloaded); } - updateMergeState(tabletMgmtStats.mergeStatsCache); - synchronized (this) { lastScanServers = ImmutableSortedSet.copyOf(currentTServers.keySet()); } @@ -809,706 +764,6 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { return result; } - private void updateMergeState(Map<TableId,MergeStats> mergeStatsCache) { - for (MergeStats stats : mergeStatsCache.values()) { - try { - MergeState update = stats.nextMergeState(manager.getContext(), manager); - // when next state is MERGING, its important to persist this before - // starting the merge... the verification check that is done before - // moving into the merging state could fail if merge starts but does - // not finish - if (update == MergeState.COMPLETE) { - update = MergeState.NONE; - } - if (update != stats.getMergeInfo().getState()) { - manager.setMergeState(stats.getMergeInfo(), update); - } - - if (update == MergeState.MERGING) { - try { - if (stats.getMergeInfo().isDelete()) { - deleteTablets(stats.getMergeInfo()); - } else { - mergeMetadataRecords(stats.getMergeInfo()); - } - update = MergeState.COMPLETE; - manager.setMergeState(stats.getMergeInfo(), update); - } catch (Exception ex) { - Manager.log.error("Unable merge metadata table records", ex); - } - } - } catch (Exception ex) { - Manager.log.error( - "Unable to update merge state for merge " + stats.getMergeInfo().getExtent(), ex); - } - } - } - - // This method finds returns the deletion starting row (exclusive) for tablets that - // need to be actually deleted. If the startTablet is null then - // the deletion start row will just be null as all tablets are being deleted - // up to the end. Otherwise, this returns the endRow of the first tablet - // as the first tablet should be kept and will have been previously - // fenced if necessary - private Text getDeletionStartRow(final KeyExtent startTablet) { - if (startTablet == null) { - Manager.log.debug("First tablet for delete range is null"); - return null; - } - - final Text deletionStartRow = startTablet.endRow(); - Manager.log.debug("Start row is {} for deletion", deletionStartRow); - - return deletionStartRow; - } - - // This method finds returns the deletion ending row (inclusive) for tablets that - // need to be actually deleted. If the endTablet is null then - // the deletion end row will just be null as all tablets are being deleted - // after the start row. Otherwise, this returns the prevEndRow of the last tablet - // as the last tablet should be kept and will have been previously - // fenced if necessary - private Text getDeletionEndRow(final KeyExtent endTablet) { - if (endTablet == null) { - Manager.log.debug("Last tablet for delete range is null"); - return null; - } - - Text deletionEndRow = endTablet.prevEndRow(); - Manager.log.debug("Deletion end row is {}", deletionEndRow); - - return deletionEndRow; - } - - private static boolean isFirstTabletInTable(KeyExtent tablet) { - return tablet != null && tablet.prevEndRow() == null; - } - - private static boolean isLastTabletInTable(KeyExtent tablet) { - return tablet != null && tablet.endRow() == null; - } - - private static boolean areContiguousTablets(KeyExtent firstTablet, KeyExtent lastTablet) { - return firstTablet != null && lastTablet != null - && Objects.equals(firstTablet.endRow(), lastTablet.prevEndRow()); - } - - private boolean hasTabletsToDelete(final KeyExtent firstTabletInRange, - final KeyExtent lastTableInRange) { - // If the tablets are equal (and not null) then the deletion range is just part of 1 tablet - // which will be fenced so there are no tablets to delete. The null check is because if both - // are null then we are just deleting everything, so we do have tablets to delete - if (Objects.equals(firstTabletInRange, lastTableInRange) && firstTabletInRange != null) { - Manager.log.trace( - "No tablets to delete, firstTablet {} equals lastTablet {} in deletion range and was fenced.", - firstTabletInRange, lastTableInRange); - return false; - // If the lastTablet of the deletion range is the first tablet of the table it has been fenced - // already so nothing to actually delete before it - } else if (isFirstTabletInTable(lastTableInRange)) { - Manager.log.trace( - "No tablets to delete, lastTablet {} in deletion range is the first tablet of the table and was fenced.", - lastTableInRange); - return false; - // If the firstTablet of the deletion range is the last tablet of the table it has been fenced - // already so nothing to actually delete after it - } else if (isLastTabletInTable(firstTabletInRange)) { - Manager.log.trace( - "No tablets to delete, firstTablet {} in deletion range is the last tablet of the table and was fenced.", - firstTabletInRange); - return false; - // If the firstTablet and lastTablet are contiguous tablets then there is nothing to delete as - // each will be fenced and nothing between - } else if (areContiguousTablets(firstTabletInRange, lastTableInRange)) { - Manager.log.trace( - "No tablets to delete, firstTablet {} and lastTablet {} in deletion range are contiguous and were fenced.", - firstTabletInRange, lastTableInRange); - return false; - } - - return true; - } - - private void deleteTablets(MergeInfo info) throws AccumuloException { - // Before updated metadata and get the first and last tablets which - // are fenced if necessary - final Pair<KeyExtent,KeyExtent> firstAndLastTablets = updateMetadataRecordsForDelete(info); - - // Find the deletion start row (exclusive) for tablets that need to be actually deleted - // This will be null if deleting everything up until the end row or it will be - // the endRow of the first tablet as the first tablet should be kept and will have - // already been fenced if necessary - final Text deletionStartRow = getDeletionStartRow(firstAndLastTablets.getFirst()); - - // Find the deletion end row (inclusive) for tablets that need to be actually deleted - // This will be null if deleting everything after the starting row or it will be - // the prevEndRow of the last tablet as the last tablet should be kept and will have - // already been fenced if necessary - Text deletionEndRow = getDeletionEndRow(firstAndLastTablets.getSecond()); - - // check if there are any tablets to delete and if not return - if (!hasTabletsToDelete(firstAndLastTablets.getFirst(), firstAndLastTablets.getSecond())) { - Manager.log.trace("No tablets to delete for range {}, returning", info.getExtent()); - return; - } - - // Build an extent for the actual deletion range - final KeyExtent extent = - new KeyExtent(info.getExtent().tableId(), deletionEndRow, deletionStartRow); - Manager.log.debug("Tablet deletion range is {}", extent); - String targetSystemTable = extent.isMeta() ? RootTable.NAME : MetadataTable.NAME; - Manager.log.debug("Deleting tablets for {}", extent); - MetadataTime metadataTime = null; - KeyExtent followingTablet = null; - Set<TabletHostingGoal> goals = new HashSet<>(); - if (extent.endRow() != null) { - Key nextExtent = new Key(extent.endRow()).followingKey(PartialKey.ROW); - followingTablet = - getHighTablet(new KeyExtent(extent.tableId(), nextExtent.getRow(), extent.endRow())); - Manager.log.debug("Found following tablet {}", followingTablet); - } - try { - AccumuloClient client = manager.getContext(); - ServerContext context = manager.getContext(); - Ample ample = context.getAmple(); - Text start = extent.prevEndRow(); - if (start == null) { - start = new Text(); - } - Manager.log.debug("Making file deletion entries for {}", extent); - Range deleteRange = new Range(TabletsSection.encodeRow(extent.tableId(), start), false, - TabletsSection.encodeRow(extent.tableId(), extent.endRow()), true); - Scanner scanner = client.createScanner(targetSystemTable, Authorizations.EMPTY); - scanner.setRange(deleteRange); - ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner); - ServerColumnFamily.TIME_COLUMN.fetch(scanner); - HostingColumnFamily.GOAL_COLUMN.fetch(scanner); - scanner.fetchColumnFamily(DataFileColumnFamily.NAME); - scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME); - Set<ReferenceFile> datafilesAndDirs = new TreeSet<>(); - for (Entry<Key,Value> entry : scanner) { - Key key = entry.getKey(); - if (key.compareColumnFamily(DataFileColumnFamily.NAME) == 0) { - var stf = new StoredTabletFile(key.getColumnQualifierData().toString()); - datafilesAndDirs.add(new ReferenceFile(stf.getTableId(), stf)); - if (datafilesAndDirs.size() > 1000) { - ample.putGcFileAndDirCandidates(extent.tableId(), datafilesAndDirs); - datafilesAndDirs.clear(); - } - } else if (ServerColumnFamily.TIME_COLUMN.hasColumns(key)) { - metadataTime = MetadataTime.parse(entry.getValue().toString()); - } else if (key.compareColumnFamily(CurrentLocationColumnFamily.NAME) == 0) { - throw new IllegalStateException( - "Tablet " + key.getRow() + " is assigned during a merge!"); - } else if (ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) { - var allVolumesDirectory = - new AllVolumesDirectory(extent.tableId(), entry.getValue().toString()); - datafilesAndDirs.add(allVolumesDirectory); - if (datafilesAndDirs.size() > 1000) { - ample.putGcFileAndDirCandidates(extent.tableId(), datafilesAndDirs); - datafilesAndDirs.clear(); - } - } else if (HostingColumnFamily.GOAL_COLUMN.hasColumns(key)) { - TabletHostingGoal thisGoal = TabletHostingGoalUtil.fromValue(entry.getValue()); - goals.add(thisGoal); - } - } - ample.putGcFileAndDirCandidates(extent.tableId(), datafilesAndDirs); - BatchWriter bw = client.createBatchWriter(targetSystemTable); - try { - deleteTablets(info, deleteRange, bw, client); - } finally { - bw.close(); - } - - if (followingTablet != null) { - Manager.log.debug("Updating prevRow of {} to {}", followingTablet, extent.prevEndRow()); - bw = client.createBatchWriter(targetSystemTable); - try { - Mutation m = new Mutation(followingTablet.toMetaRow()); - TabletColumnFamily.PREV_ROW_COLUMN.put(m, - TabletColumnFamily.encodePrevEndRow(extent.prevEndRow())); - bw.addMutation(m); - bw.flush(); - } finally { - bw.close(); - } - } else { - // Recreate the default tablet to hold the end of the table - MetadataTableUtil.addTablet(new KeyExtent(extent.tableId(), null, extent.prevEndRow()), - ServerColumnFamily.DEFAULT_TABLET_DIR_NAME, manager.getContext(), - metadataTime.getType(), manager.managerLock, getMergeHostingGoal(extent, goals)); - } - } catch (RuntimeException | TableNotFoundException ex) { - throw new AccumuloException(ex); - } - } - - private void mergeMetadataRecords(MergeInfo info) throws AccumuloException { - KeyExtent range = info.getExtent(); - Manager.log.debug("Merging metadata for {}", range); - KeyExtent stop = getHighTablet(range); - Manager.log.debug("Highest tablet is {}", stop); - Value firstPrevRowValue = null; - Text stopRow = stop.toMetaRow(); - Text start = range.prevEndRow(); - if (start == null) { - start = new Text(); - } - Range scanRange = - new Range(TabletsSection.encodeRow(range.tableId(), start), false, stopRow, false); - String targetSystemTable = MetadataTable.NAME; - if (range.isMeta()) { - targetSystemTable = RootTable.NAME; - } - Set<TabletHostingGoal> goals = new HashSet<>(); - - AccumuloClient client = manager.getContext(); - - KeyExtent stopExtent = KeyExtent.fromMetaRow(stop.toMetaRow()); - KeyExtent previousKeyExtent = null; - KeyExtent lastExtent = null; - - try (BatchWriter bw = client.createBatchWriter(targetSystemTable)) { - long fileCount = 0; - // Make file entries in highest tablet - Scanner scanner = client.createScanner(targetSystemTable, Authorizations.EMPTY); - // Update to set the range to include the highest tablet - scanner.setRange( - new Range(TabletsSection.encodeRow(range.tableId(), start), false, stopRow, true)); - TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner); - ServerColumnFamily.TIME_COLUMN.fetch(scanner); - ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner); - HostingColumnFamily.GOAL_COLUMN.fetch(scanner); - scanner.fetchColumnFamily(DataFileColumnFamily.NAME); - Mutation m = new Mutation(stopRow); - MetadataTime maxLogicalTime = null; - for (Entry<Key,Value> entry : scanner) { - Key key = entry.getKey(); - Value value = entry.getValue(); - - final KeyExtent keyExtent = KeyExtent.fromMetaRow(key.getRow()); - - // Keep track of the last Key Extent seen so we can use it to fence - // of RFiles when merging the metadata - if (lastExtent != null && !keyExtent.equals(lastExtent)) { - previousKeyExtent = lastExtent; - } - - // Special case to handle the highest/stop tablet, which is where files are - // merged to. The existing merge code won't delete files from this tablet - // so we need to handle the deletes in this tablet when fencing files. - // We may be able to make this simpler in the future. - if (keyExtent.equals(stopExtent)) { - if (previousKeyExtent != null - && key.getColumnFamily().equals(DataFileColumnFamily.NAME)) { - - // Fence off existing files by the end row of the previous tablet and current tablet - final StoredTabletFile existing = StoredTabletFile.of(key.getColumnQualifier()); - // The end row should be inclusive for the current tablet and the previous end row - // should be exclusive for the start row - Range fenced = new Range(previousKeyExtent.endRow(), false, keyExtent.endRow(), true); - - // Clip range if exists - fenced = existing.hasRange() ? existing.getRange().clip(fenced) : fenced; - - final StoredTabletFile newFile = StoredTabletFile.of(existing.getPath(), fenced); - // If the existing metadata does not match then we need to delete the old - // and replace with a new range - if (!existing.equals(newFile)) { - m.putDelete(DataFileColumnFamily.NAME, existing.getMetadataText()); - m.put(key.getColumnFamily(), newFile.getMetadataText(), value); - } - - fileCount++; - } - // For the highest tablet we only care about the DataFileColumnFamily - continue; - } - - // Handle metadata updates for all other tablets except the highest tablet - // Ranges are created for the files and then added to the highest tablet in - // the merge range. Deletes are handled later for the old files when the tablets - // are removed. - if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) { - final StoredTabletFile existing = StoredTabletFile.of(key.getColumnQualifier()); - - // Fence off files by the previous tablet and current tablet that is being merged - // The end row should be inclusive for the current tablet and the previous end row should - // be exclusive for the start row. - Range fenced = new Range(previousKeyExtent != null ? previousKeyExtent.endRow() : null, - false, keyExtent.endRow(), true); - - // Clip range with the tablet range if the range already exists - fenced = existing.hasRange() ? existing.getRange().clip(fenced) : fenced; - - // Move the file and range to the last tablet - StoredTabletFile newFile = StoredTabletFile.of(existing.getPath(), fenced); - m.put(key.getColumnFamily(), newFile.getMetadataText(), value); - - fileCount++; - } else if (TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key) - && firstPrevRowValue == null) { - Manager.log.debug("prevRow entry for lowest tablet is {}", value); - firstPrevRowValue = new Value(value); - } else if (ServerColumnFamily.TIME_COLUMN.hasColumns(key)) { - maxLogicalTime = - TabletTime.maxMetadataTime(maxLogicalTime, MetadataTime.parse(value.toString())); - } else if (ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) { - var allVolumesDir = new AllVolumesDirectory(range.tableId(), value.toString()); - bw.addMutation(manager.getContext().getAmple().createDeleteMutation(allVolumesDir)); - } else if (HostingColumnFamily.GOAL_COLUMN.hasColumns(key)) { - TabletHostingGoal thisGoal = TabletHostingGoalUtil.fromValue(value); - goals.add(thisGoal); - } - - lastExtent = keyExtent; - } - - // read the logical time from the last tablet in the merge range, it is not included in - // the loop above - scanner = client.createScanner(targetSystemTable, Authorizations.EMPTY); - scanner.setRange(new Range(stopRow)); - ServerColumnFamily.TIME_COLUMN.fetch(scanner); - HostingColumnFamily.GOAL_COLUMN.fetch(scanner); - scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME); - Set<String> extCompIds = new HashSet<>(); - for (Entry<Key,Value> entry : scanner) { - if (ServerColumnFamily.TIME_COLUMN.hasColumns(entry.getKey())) { - maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime, - MetadataTime.parse(entry.getValue().toString())); - } else if (ExternalCompactionColumnFamily.NAME.equals(entry.getKey().getColumnFamily())) { - extCompIds.add(entry.getKey().getColumnQualifierData().toString()); - } else if (HostingColumnFamily.GOAL_COLUMN.hasColumns(entry.getKey())) { - TabletHostingGoal thisGoal = TabletHostingGoalUtil.fromValue(entry.getValue()); - goals.add(thisGoal); - } - } - - if (maxLogicalTime != null) { - ServerColumnFamily.TIME_COLUMN.put(m, new Value(maxLogicalTime.encode())); - } - - // delete any entries for external compactions - extCompIds.forEach(ecid -> m.putDelete(ExternalCompactionColumnFamily.STR_NAME, ecid)); - - // Set the TabletHostingGoal for this tablet based on the goals of the other tablets in - // the merge range. Always takes priority over never. - TabletHostingGoal mergeHostingGoal = getMergeHostingGoal(range, goals); - HostingColumnFamily.GOAL_COLUMN.put(m, TabletHostingGoalUtil.toValue(mergeHostingGoal)); - - if (!m.getUpdates().isEmpty()) { - bw.addMutation(m); - } - - bw.flush(); - - Manager.log.debug("Moved {} files to {}", fileCount, stop); - - if (firstPrevRowValue == null) { - Manager.log.debug("tablet already merged"); - return; - } - - stop = new KeyExtent(stop.tableId(), stop.endRow(), - TabletColumnFamily.decodePrevEndRow(firstPrevRowValue)); - Mutation updatePrevRow = TabletColumnFamily.createPrevRowMutation(stop); - Manager.log.debug("Setting the prevRow for last tablet: {}", stop); - bw.addMutation(updatePrevRow); - bw.flush(); - - deleteTablets(info, scanRange, bw, client); - - } catch (Exception ex) { - throw new AccumuloException(ex); - } - } - - private static TabletHostingGoal getMergeHostingGoal(KeyExtent range, - Set<TabletHostingGoal> goals) { - TabletHostingGoal mergeHostingGoal = TabletHostingGoal.ONDEMAND; - if (range.isMeta() || goals.contains(TabletHostingGoal.ALWAYS)) { - mergeHostingGoal = TabletHostingGoal.ALWAYS; - } else if (goals.contains(TabletHostingGoal.NEVER)) { - mergeHostingGoal = TabletHostingGoal.NEVER; - } - return mergeHostingGoal; - } - - // This method is used to detect if a tablet needs to be split/chopped for a delete - // Instead of performing a split or chop compaction, the tablet will have its files fenced. - private boolean needsFencingForDeletion(MergeInfo info, KeyExtent keyExtent) { - // Does this extent cover the end points of the delete? - final Predicate<Text> isWithin = r -> r != null && keyExtent.contains(r); - final Predicate<Text> isNotBoundary = - r -> !r.equals(keyExtent.endRow()) && !r.equals(keyExtent.prevEndRow()); - final KeyExtent deleteRange = info.getExtent(); - - return (keyExtent.overlaps(deleteRange) && Stream - .of(deleteRange.prevEndRow(), deleteRange.endRow()).anyMatch(isWithin.and(isNotBoundary))) - || info.needsToBeChopped(keyExtent); - } - - // Instead of splitting or chopping tablets for a delete we instead create ranges - // to exclude the portion of the tablet that should be deleted - private Text followingRow(Text row) { - if (row == null) { - return null; - } - return new Key(row).followingKey(PartialKey.ROW).getRow(); - } - - // Instead of splitting or chopping tablets for a delete we instead create ranges - // to exclude the portion of the tablet that should be deleted - private List<Range> createRangesForDeletion(TabletMetadata tabletMetadata, - final KeyExtent deleteRange) { - final KeyExtent tabletExtent = tabletMetadata.getExtent(); - - // If the delete range wholly contains the tablet being deleted then there is no range to clip - // files to because the files should be completely dropped. - Preconditions.checkArgument(!deleteRange.contains(tabletExtent), "delete range:%s tablet:%s", - deleteRange, tabletExtent); - - final List<Range> ranges = new ArrayList<>(); - - if (deleteRange.overlaps(tabletExtent)) { - if (deleteRange.prevEndRow() != null - && tabletExtent.contains(followingRow(deleteRange.prevEndRow()))) { - Manager.log.trace("Fencing tablet {} files to ({},{}]", tabletExtent, - tabletExtent.prevEndRow(), deleteRange.prevEndRow()); - ranges.add(new Range(tabletExtent.prevEndRow(), false, deleteRange.prevEndRow(), true)); - } - - // This covers the case of when a deletion range overlaps the last tablet. We need to create a - // range that excludes the deletion. - if (deleteRange.endRow() != null - && tabletMetadata.getExtent().contains(deleteRange.endRow())) { - Manager.log.trace("Fencing tablet {} files to ({},{}]", tabletExtent, deleteRange.endRow(), - tabletExtent.endRow()); - ranges.add(new Range(deleteRange.endRow(), false, tabletExtent.endRow(), true)); - } - } else { - Manager.log.trace( - "Fencing tablet {} files to itself because it does not overlap delete range", - tabletExtent); - ranges.add(tabletExtent.toDataRange()); - } - - return ranges; - } - - private Pair<KeyExtent,KeyExtent> updateMetadataRecordsForDelete(MergeInfo info) - throws AccumuloException { - final KeyExtent range = info.getExtent(); - - String targetSystemTable = MetadataTable.NAME; - if (range.isMeta()) { - targetSystemTable = RootTable.NAME; - } - final Pair<KeyExtent,KeyExtent> startAndEndTablets; - - final AccumuloClient client = manager.getContext(); - - try (BatchWriter bw = client.createBatchWriter(targetSystemTable)) { - final Text startRow = range.prevEndRow(); - final Text endRow = range.endRow() != null - ? new Key(range.endRow()).followingKey(PartialKey.ROW).getRow() : null; - - // Find the tablets that overlap the start and end row of the deletion range - // If the startRow is null then there will be an empty startTablet we don't need - // to fence a starting tablet as we are deleting everything up to the end tablet - // Likewise, if the endRow is null there will be an empty endTablet as we are deleting - // all tablets after the starting tablet - final Optional<TabletMetadata> startTablet = Optional.ofNullable(startRow).flatMap( - row -> loadTabletMetadata(range.tableId(), row, ColumnType.PREV_ROW, ColumnType.FILES)); - final Optional<TabletMetadata> endTablet = Optional.ofNullable(endRow).flatMap( - row -> loadTabletMetadata(range.tableId(), row, ColumnType.PREV_ROW, ColumnType.FILES)); - - // Store the tablets in a Map if present so that if we have the same Tablet we - // only need to process the same tablet once when fencing - final SortedMap<KeyExtent,TabletMetadata> tabletMetadatas = new TreeMap<>(); - startTablet.ifPresent(ft -> tabletMetadatas.put(ft.getExtent(), ft)); - endTablet.ifPresent(lt -> tabletMetadatas.putIfAbsent(lt.getExtent(), lt)); - - // Capture the tablets to return them or null if not loaded - startAndEndTablets = new Pair<>(startTablet.map(TabletMetadata::getExtent).orElse(null), - endTablet.map(TabletMetadata::getExtent).orElse(null)); - - for (TabletMetadata tabletMetadata : tabletMetadatas.values()) { - final KeyExtent keyExtent = tabletMetadata.getExtent(); - - // Check if this tablet needs to have its files fenced for the deletion - if (needsFencingForDeletion(info, keyExtent)) { - Manager.log.debug("Found overlapping keyExtent {} for delete, fencing files.", keyExtent); - - // Create the ranges for fencing the files, this takes the place of - // chop compactions and splits - final List<Range> ranges = createRangesForDeletion(tabletMetadata, range); - Preconditions.checkState(!ranges.isEmpty(), - "No ranges found that overlap deletion range."); - - // Go through and fence each of the files that are part of the tablet - for (Entry<StoredTabletFile,DataFileValue> entry : tabletMetadata.getFilesMap() - .entrySet()) { - final StoredTabletFile existing = entry.getKey(); - final DataFileValue value = entry.getValue(); - - final Mutation m = new Mutation(keyExtent.toMetaRow()); - - // Go through each range that was created and modify the metadata for the file - // The end row should be inclusive for the current tablet and the previous end row - // should be exclusive for the start row. - final Set<StoredTabletFile> newFiles = new HashSet<>(); - final Set<StoredTabletFile> existingFile = Set.of(existing); - - for (Range fenced : ranges) { - // Clip range with the tablet range if the range already exists - fenced = existing.hasRange() ? existing.getRange().clip(fenced, true) : fenced; - - // If null the range is disjoint which can happen if there are existing fenced files - // If the existing file is disjoint then later we will delete if the file is not part - // of the newFiles set which means it is disjoint with all ranges - if (fenced != null) { - final StoredTabletFile newFile = StoredTabletFile.of(existing.getPath(), fenced); - Manager.log.trace("Adding new file {} with range {}", newFile.getMetadataPath(), - newFile.getRange()); - - // Add the new file to the newFiles set, it will be added later if it doesn't match - // the existing file already. We still need to add to the set to be checked later - // even if it matches the existing file as later the deletion logic will check to - // see if the existing file is part of this set before deleting. This is done to - // make sure the existing file isn't deleted unless it is not needed/disjoint - // with all ranges. - newFiles.add(newFile); - } else { - Manager.log.trace("Found a disjoint file {} with range {} on delete", - existing.getMetadataPath(), existing.getRange()); - } - } - - // If the existingFile is not contained in the newFiles set then we can delete it - Sets.difference(existingFile, newFiles).forEach( - delete -> m.putDelete(DataFileColumnFamily.NAME, existing.getMetadataText())); - - // Add any new files that don't match the existingFile - // As of now we will only have at most 2 files as up to 2 ranges are created - final List<StoredTabletFile> filesToAdd = - new ArrayList<>(Sets.difference(newFiles, existingFile)); - Preconditions.checkArgument(filesToAdd.size() <= 2, - "There should only be at most 2 StoredTabletFiles after computing new ranges."); - - // If more than 1 new file then re-calculate the num entries and size - if (filesToAdd.size() == 2) { - // This splits up the values in half and makes sure they total the original - // values - final Pair<DataFileValue,DataFileValue> newDfvs = computeNewDfv(value); - m.put(DataFileColumnFamily.NAME, filesToAdd.get(0).getMetadataText(), - newDfvs.getFirst().encodeAsValue()); - m.put(DataFileColumnFamily.NAME, filesToAdd.get(1).getMetadataText(), - newDfvs.getSecond().encodeAsValue()); - } else { - // Will be 0 or 1 files - filesToAdd.forEach(newFile -> m.put(DataFileColumnFamily.NAME, - newFile.getMetadataText(), value.encodeAsValue())); - } - - if (!m.getUpdates().isEmpty()) { - bw.addMutation(m); - } - } - } else { - Manager.log.debug( - "Skipping metadata update on file for keyExtent {} for delete as not overlapping on rows.", - keyExtent); - } - } - - bw.flush(); - - return startAndEndTablets; - } catch (Exception ex) { - throw new AccumuloException(ex); - } - } - - // Divide each new DFV in half and make sure the sum equals the original - @VisibleForTesting - protected static Pair<DataFileValue,DataFileValue> computeNewDfv(DataFileValue value) { - final DataFileValue file1Value = new DataFileValue(Math.max(1, value.getSize() / 2), - Math.max(1, value.getNumEntries() / 2), value.getTime()); - - final DataFileValue file2Value = - new DataFileValue(Math.max(1, value.getSize() - file1Value.getSize()), - Math.max(1, value.getNumEntries() - file1Value.getNumEntries()), value.getTime()); - - return new Pair<>(file1Value, file2Value); - } - - private Optional<TabletMetadata> loadTabletMetadata(TableId tabletId, final Text row, - ColumnType... columns) { - try (TabletsMetadata tabletsMetadata = manager.getContext().getAmple().readTablets() - .forTable(tabletId).overlapping(row, true, row).fetch(columns).build()) { - return tabletsMetadata.stream().findFirst(); - } - } - - private void deleteTablets(MergeInfo info, Range scanRange, BatchWriter bw, AccumuloClient client) - throws TableNotFoundException, MutationsRejectedException { - Scanner scanner; - Mutation m; - // Delete everything in the other tablets - // group all deletes into tablet into one mutation, this makes tablets - // either disappear entirely or not all.. this is important for the case - // where the process terminates in the loop below... - scanner = client.createScanner(info.getExtent().isMeta() ? RootTable.NAME : MetadataTable.NAME, - Authorizations.EMPTY); - Manager.log.debug("Deleting range {}", scanRange); - scanner.setRange(scanRange); - RowIterator rowIter = new RowIterator(scanner); - while (rowIter.hasNext()) { - Iterator<Entry<Key,Value>> row = rowIter.next(); - m = null; - while (row.hasNext()) { - Entry<Key,Value> entry = row.next(); - Key key = entry.getKey(); - - if (m == null) { - m = new Mutation(key.getRow()); - } - - m.putDelete(key.getColumnFamily(), key.getColumnQualifier()); - Manager.log.debug("deleting entry {}", key); - } - bw.addMutation(m); - } - - bw.flush(); - } - - private KeyExtent getHighTablet(KeyExtent range) throws AccumuloException { - try { - AccumuloClient client = manager.getContext(); - Scanner scanner = client.createScanner(range.isMeta() ? RootTable.NAME : MetadataTable.NAME, - Authorizations.EMPTY); - TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner); - KeyExtent start = new KeyExtent(range.tableId(), range.endRow(), null); - scanner.setRange(new Range(start.toMetaRow(), null)); - Iterator<Entry<Key,Value>> iterator = scanner.iterator(); - if (!iterator.hasNext()) { - throw new AccumuloException("No last tablet for a merge " + range); - } - Entry<Key,Value> entry = iterator.next(); - KeyExtent highTablet = KeyExtent.fromMetaPrevRow(entry); - if (!highTablet.tableId().equals(range.tableId())) { - throw new AccumuloException("No last tablet for merge " + range + " " + highTablet); - } - return highTablet; - } catch (Exception ex) { - throw new AccumuloException("Unexpected failure finding the last tablet for a merge " + range, - ex); - } - } - private void handleDeadTablets(TabletLists tLists) throws WalMarkerException, DistributedStoreException { var deadTablets = tLists.assignedToDeadServers; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java b/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java deleted file mode 100644 index 59e4b087f1..0000000000 --- a/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java +++ /dev/null @@ -1,238 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.manager.state; - -import java.io.IOException; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; - -import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.client.Accumulo; -import org.apache.accumulo.core.client.AccumuloClient; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.TableId; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; -import org.apache.accumulo.core.fate.zookeeper.ZooUtil; -import org.apache.accumulo.core.manager.state.TabletManagement; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.RootTable; -import org.apache.accumulo.core.metadata.TServerInstance; -import org.apache.accumulo.core.metadata.TabletState; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; -import org.apache.accumulo.core.metadata.schema.TabletMetadata; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.trace.TraceUtil; -import org.apache.accumulo.server.cli.ServerUtilOpts; -import org.apache.accumulo.server.manager.state.CurrentState; -import org.apache.accumulo.server.manager.state.MergeInfo; -import org.apache.accumulo.server.manager.state.MergeState; -import org.apache.accumulo.server.manager.state.TabletManagementIterator; -import org.apache.hadoop.io.DataInputBuffer; -import org.apache.hadoop.io.Text; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.context.Scope; - -public class MergeStats { - final static private Logger log = LoggerFactory.getLogger(MergeStats.class); - private final MergeInfo info; - private int hosted = 0; - private int unassigned = 0; - private int total = 0; - - public MergeStats(MergeInfo info) { - this.info = info; - } - - public MergeInfo getMergeInfo() { - return info; - } - - public void update(KeyExtent ke, TabletState state) { - if (info.getState().equals(MergeState.NONE)) { - return; - } - if (!info.overlaps(ke)) { - return; - } - this.total++; - if (state.equals(TabletState.HOSTED)) { - this.hosted++; - } - if (state.equals(TabletState.UNASSIGNED) || state.equals(TabletState.SUSPENDED)) { - this.unassigned++; - } - } - - public MergeState nextMergeState(AccumuloClient accumuloClient, CurrentState manager) - throws Exception { - MergeState state = info.getState(); - if (state == MergeState.NONE) { - return state; - } - if (total == 0) { - log.trace("failed to see any tablets for this range, ignoring {}", info.getExtent()); - return state; - } - log.info("Computing next merge state for {} which is presently {} isDelete : {}", - info.getExtent(), state, info.isDelete()); - if (state == MergeState.STARTED) { - log.info("{} are hosted, total {}", hosted, total); - if (!info.isDelete() && total == 1) { - log.info("Merge range is already contained in a single tablet {}", info.getExtent()); - state = MergeState.COMPLETE; - } else if (hosted == total) { - state = MergeState.WAITING_FOR_OFFLINE; - } else { - log.info("Waiting for {} hosted tablets to be {} {}", hosted, total, info.getExtent()); - } - } - if (state == MergeState.WAITING_FOR_OFFLINE) { - if (unassigned == total) { - if (verifyMergeConsistency(accumuloClient, manager)) { - state = MergeState.MERGING; - } else { - log.info("Merge consistency check failed {}", info.getExtent()); - } - } else { - log.info("Waiting for {} unassigned tablets to be {} {}", unassigned, total, - info.getExtent()); - } - } - if (state == MergeState.MERGING) { - if (hosted != 0) { - // Shouldn't happen - log.error("Unexpected state: hosted tablets should be zero {} merge {}", hosted, - info.getExtent()); - state = MergeState.WAITING_FOR_OFFLINE; - } - if (unassigned != total) { - // Shouldn't happen - log.error("Unexpected state: unassigned tablets should be {} was {} merge {}", total, - unassigned, info.getExtent()); - state = MergeState.WAITING_FOR_OFFLINE; - } - log.info("{} tablets are unassigned {}", unassigned, info.getExtent()); - } - return state; - } - - private boolean verifyMergeConsistency(AccumuloClient accumuloClient, CurrentState manager) - throws TableNotFoundException, IOException { - MergeStats verify = new MergeStats(info); - KeyExtent extent = info.getExtent(); - Scanner scanner = accumuloClient - .createScanner(extent.isMeta() ? RootTable.NAME : MetadataTable.NAME, Authorizations.EMPTY); - TabletManagementIterator.configureScanner(scanner, manager); - Text start = extent.prevEndRow(); - if (start == null) { - start = new Text(); - } - TableId tableId = extent.tableId(); - Text first = TabletsSection.encodeRow(tableId, start); - Range range = new Range(first, false, null, true); - scanner.setRange(range.clip(TabletsSection.getRange())); - KeyExtent prevExtent = null; - - log.debug("Scanning range {}", range); - for (Entry<Key,Value> entry : scanner) { - final TabletManagement mti = TabletManagementIterator.decode(entry); - final TabletMetadata tm = mti.getTabletMetadata(); - - log.debug("consistency check: {} walogs {}", tm, tm.getLogs().size()); - if (!tm.getTableId().equals(tableId)) { - break; - } - - if (prevExtent == null) { - // this is the first tablet observed, it must be offline and its prev row must be less than - // the start of the merge range - if (tm.getExtent().prevEndRow() != null - && tm.getExtent().prevEndRow().compareTo(start) > 0) { - log.debug("failing consistency: prev row is too high {}", start); - return false; - } - - Set<TServerInstance> liveTServers1 = manager.onlineTabletServers(); - if (TabletState.compute(tm, liveTServers1) != TabletState.UNASSIGNED) { - Set<TServerInstance> liveTServers = manager.onlineTabletServers(); - if (TabletState.compute(tm, liveTServers) != TabletState.SUSPENDED) { - log.debug("failing consistency: assigned or hosted {}", tm); - return false; - } - } - - } else if (!tm.getExtent().isPreviousExtent(prevExtent)) { - log.debug("hole in {}", MetadataTable.NAME); - return false; - } - - prevExtent = tm.getExtent(); - - Set<TServerInstance> liveTServers = manager.onlineTabletServers(); - verify.update(tm.getExtent(), TabletState.compute(tm, liveTServers)); - // stop when we've seen the tablet just beyond our range - if (tm.getExtent().prevEndRow() != null && extent.endRow() != null - && tm.getExtent().prevEndRow().compareTo(extent.endRow()) > 0) { - break; - } - } - log.debug("unassigned {} v.unassigned {} verify.total {}", unassigned, verify.unassigned, - verify.total); - - return unassigned == verify.unassigned && unassigned == verify.total; - } - - public static void main(String[] args) throws Exception { - ServerUtilOpts opts = new ServerUtilOpts(); - opts.parseArgs(MergeStats.class.getName(), args); - - Span span = TraceUtil.startSpan(MergeStats.class, "main"); - try (Scope scope = span.makeCurrent()) { - try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build()) { - Map<String,String> tableIdMap = client.tableOperations().tableIdMap(); - ZooReaderWriter zooReaderWriter = opts.getServerContext().getZooReaderWriter(); - for (Entry<String,String> entry : tableIdMap.entrySet()) { - final String table = entry.getKey(), tableId = entry.getValue(); - String path = ZooUtil.getRoot(client.instanceOperations().getInstanceId()) - + Constants.ZTABLES + "/" + tableId + "/merge"; - MergeInfo info = new MergeInfo(); - if (zooReaderWriter.exists(path)) { - byte[] data = zooReaderWriter.getData(path); - DataInputBuffer in = new DataInputBuffer(); - in.reset(data, data.length); - info.readFields(in); - } - System.out.printf("%25s %10s %10s %s%n", table, info.getState(), info.getOperation(), - info.getExtent()); - } - } - } finally { - span.end(); - } - } -} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteRows.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteRows.java new file mode 100644 index 0000000000..c6614cae5f --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteRows.java @@ -0,0 +1,600 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.tableOps.merge; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.function.Predicate; +import java.util.stream.Stream; + +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.RowIterator; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.TabletHostingGoal; +import org.apache.accumulo.core.clientImpl.TabletHostingGoalUtil; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.NamespaceId; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.gc.ReferenceFile; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.metadata.schema.MetadataTime; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.tableOps.ManagerRepo; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.gc.AllVolumesDirectory; +import org.apache.accumulo.server.manager.state.MergeInfo; +import org.apache.accumulo.server.manager.state.MergeState; +import org.apache.accumulo.server.util.MetadataTableUtil; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; + +public class DeleteRows extends ManagerRepo { + + private static final long serialVersionUID = 1L; + + private static final Logger log = LoggerFactory.getLogger(DeleteRows.class); + + private final NamespaceId namespaceId; + private final TableId tableId; + + public DeleteRows(NamespaceId namespaceId, TableId tableId) { + this.namespaceId = namespaceId; + this.tableId = tableId; + } + + @Override + public Repo<Manager> call(long tid, Manager manager) throws Exception { + MergeInfo mergeInfo = manager.getMergeInfo(tableId); + Preconditions.checkState(mergeInfo.getState() == MergeState.MERGING); + Preconditions.checkState(mergeInfo.isDelete()); + + deleteTablets(manager, mergeInfo); + + manager.setMergeState(mergeInfo, MergeState.COMPLETE); + + // TODO namespace id + return new FinishTableRangeOp(namespaceId, tableId); + } + + private void deleteTablets(Manager manager, MergeInfo info) throws AccumuloException { + // Before updated metadata and get the first and last tablets which + // are fenced if necessary + final Pair<KeyExtent,KeyExtent> firstAndLastTablets = + updateMetadataRecordsForDelete(manager, info); + + // Find the deletion start row (exclusive) for tablets that need to be actually deleted + // This will be null if deleting everything up until the end row or it will be + // the endRow of the first tablet as the first tablet should be kept and will have + // already been fenced if necessary + final Text deletionStartRow = getDeletionStartRow(firstAndLastTablets.getFirst()); + + // Find the deletion end row (inclusive) for tablets that need to be actually deleted + // This will be null if deleting everything after the starting row or it will be + // the prevEndRow of the last tablet as the last tablet should be kept and will have + // already been fenced if necessary + Text deletionEndRow = getDeletionEndRow(firstAndLastTablets.getSecond()); + + // check if there are any tablets to delete and if not return + if (!hasTabletsToDelete(firstAndLastTablets.getFirst(), firstAndLastTablets.getSecond())) { + log.trace("No tablets to delete for range {}, returning", info.getExtent()); + return; + } + + // Build an extent for the actual deletion range + final KeyExtent extent = + new KeyExtent(info.getExtent().tableId(), deletionEndRow, deletionStartRow); + log.debug("Tablet deletion range is {}", extent); + String targetSystemTable = extent.isMeta() ? RootTable.NAME : MetadataTable.NAME; + log.debug("Deleting tablets for {}", extent); + MetadataTime metadataTime = null; + KeyExtent followingTablet = null; + Set<TabletHostingGoal> goals = new HashSet<>(); + if (extent.endRow() != null) { + Key nextExtent = new Key(extent.endRow()).followingKey(PartialKey.ROW); + followingTablet = getHighTablet(manager, + new KeyExtent(extent.tableId(), nextExtent.getRow(), extent.endRow())); + log.debug("Found following tablet {}", followingTablet); + } + try { + AccumuloClient client = manager.getContext(); + ServerContext context = manager.getContext(); + Ample ample = context.getAmple(); + Text start = extent.prevEndRow(); + if (start == null) { + start = new Text(); + } + log.debug("Making file deletion entries for {}", extent); + Range deleteRange = + new Range(MetadataSchema.TabletsSection.encodeRow(extent.tableId(), start), false, + MetadataSchema.TabletsSection.encodeRow(extent.tableId(), extent.endRow()), true); + Scanner scanner = client.createScanner(targetSystemTable, Authorizations.EMPTY); + scanner.setRange(deleteRange); + MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner); + MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(scanner); + MetadataSchema.TabletsSection.HostingColumnFamily.GOAL_COLUMN.fetch(scanner); + scanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME); + scanner.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME); + Set<ReferenceFile> datafilesAndDirs = new TreeSet<>(); + for (Map.Entry<Key,Value> entry : scanner) { + Key key = entry.getKey(); + if (key.compareColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME) == 0) { + var stf = new StoredTabletFile(key.getColumnQualifierData().toString()); + datafilesAndDirs.add(new ReferenceFile(stf.getTableId(), stf)); + if (datafilesAndDirs.size() > 1000) { + ample.putGcFileAndDirCandidates(extent.tableId(), datafilesAndDirs); + datafilesAndDirs.clear(); + } + } else if (MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(key)) { + metadataTime = MetadataTime.parse(entry.getValue().toString()); + } else if (key.compareColumnFamily( + MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME) == 0) { + throw new IllegalStateException( + "Tablet " + key.getRow() + " is assigned during a merge!"); + } else if (MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN + .hasColumns(key)) { + var allVolumesDirectory = + new AllVolumesDirectory(extent.tableId(), entry.getValue().toString()); + datafilesAndDirs.add(allVolumesDirectory); + if (datafilesAndDirs.size() > 1000) { + ample.putGcFileAndDirCandidates(extent.tableId(), datafilesAndDirs); + datafilesAndDirs.clear(); + } + } else if (MetadataSchema.TabletsSection.HostingColumnFamily.GOAL_COLUMN.hasColumns(key)) { + TabletHostingGoal thisGoal = TabletHostingGoalUtil.fromValue(entry.getValue()); + goals.add(thisGoal); + } + } + ample.putGcFileAndDirCandidates(extent.tableId(), datafilesAndDirs); + BatchWriter bw = client.createBatchWriter(targetSystemTable); + try { + deleteTablets(info, deleteRange, bw, client); + } finally { + bw.close(); + } + + if (followingTablet != null) { + log.debug("Updating prevRow of {} to {}", followingTablet, extent.prevEndRow()); + bw = client.createBatchWriter(targetSystemTable); + try { + Mutation m = new Mutation(followingTablet.toMetaRow()); + MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.put(m, + MetadataSchema.TabletsSection.TabletColumnFamily + .encodePrevEndRow(extent.prevEndRow())); + bw.addMutation(m); + bw.flush(); + } finally { + bw.close(); + } + } else { + // Recreate the default tablet to hold the end of the table + MetadataTableUtil.addTablet(new KeyExtent(extent.tableId(), null, extent.prevEndRow()), + MetadataSchema.TabletsSection.ServerColumnFamily.DEFAULT_TABLET_DIR_NAME, + manager.getContext(), metadataTime.getType(), manager.getManagerLock(), + getMergeHostingGoal(extent, goals)); + } + } catch (RuntimeException | TableNotFoundException ex) { + throw new AccumuloException(ex); + } + } + + private Pair<KeyExtent,KeyExtent> updateMetadataRecordsForDelete(Manager manager, MergeInfo info) + throws AccumuloException { + final KeyExtent range = info.getExtent(); + + String targetSystemTable = MetadataTable.NAME; + if (range.isMeta()) { + targetSystemTable = RootTable.NAME; + } + final Pair<KeyExtent,KeyExtent> startAndEndTablets; + + final AccumuloClient client = manager.getContext(); + + try (BatchWriter bw = client.createBatchWriter(targetSystemTable)) { + final Text startRow = range.prevEndRow(); + final Text endRow = range.endRow() != null + ? new Key(range.endRow()).followingKey(PartialKey.ROW).getRow() : null; + + // Find the tablets that overlap the start and end row of the deletion range + // If the startRow is null then there will be an empty startTablet we don't need + // to fence a starting tablet as we are deleting everything up to the end tablet + // Likewise, if the endRow is null there will be an empty endTablet as we are deleting + // all tablets after the starting tablet + final Optional<TabletMetadata> startTablet = + Optional.ofNullable(startRow).flatMap(row -> loadTabletMetadata(manager, range.tableId(), + row, TabletMetadata.ColumnType.PREV_ROW, TabletMetadata.ColumnType.FILES)); + final Optional<TabletMetadata> endTablet = + Optional.ofNullable(endRow).flatMap(row -> loadTabletMetadata(manager, range.tableId(), + row, TabletMetadata.ColumnType.PREV_ROW, TabletMetadata.ColumnType.FILES)); + + // Store the tablets in a Map if present so that if we have the same Tablet we + // only need to process the same tablet once when fencing + final SortedMap<KeyExtent,TabletMetadata> tabletMetadatas = new TreeMap<>(); + startTablet.ifPresent(ft -> tabletMetadatas.put(ft.getExtent(), ft)); + endTablet.ifPresent(lt -> tabletMetadatas.putIfAbsent(lt.getExtent(), lt)); + + // Capture the tablets to return them or null if not loaded + startAndEndTablets = new Pair<>(startTablet.map(TabletMetadata::getExtent).orElse(null), + endTablet.map(TabletMetadata::getExtent).orElse(null)); + + for (TabletMetadata tabletMetadata : tabletMetadatas.values()) { + final KeyExtent keyExtent = tabletMetadata.getExtent(); + + // Check if this tablet needs to have its files fenced for the deletion + if (needsFencingForDeletion(info, keyExtent)) { + log.debug("Found overlapping keyExtent {} for delete, fencing files.", keyExtent); + + // Create the ranges for fencing the files, this takes the place of + // chop compactions and splits + final List<Range> ranges = createRangesForDeletion(tabletMetadata, range); + Preconditions.checkState(!ranges.isEmpty(), + "No ranges found that overlap deletion range."); + + // Go through and fence each of the files that are part of the tablet + for (Map.Entry<StoredTabletFile,DataFileValue> entry : tabletMetadata.getFilesMap() + .entrySet()) { + final StoredTabletFile existing = entry.getKey(); + final DataFileValue value = entry.getValue(); + + final Mutation m = new Mutation(keyExtent.toMetaRow()); + + // Go through each range that was created and modify the metadata for the file + // The end row should be inclusive for the current tablet and the previous end row + // should be exclusive for the start row. + final Set<StoredTabletFile> newFiles = new HashSet<>(); + final Set<StoredTabletFile> existingFile = Set.of(existing); + + for (Range fenced : ranges) { + // Clip range with the tablet range if the range already exists + fenced = existing.hasRange() ? existing.getRange().clip(fenced, true) : fenced; + + // If null the range is disjoint which can happen if there are existing fenced files + // If the existing file is disjoint then later we will delete if the file is not part + // of the newFiles set which means it is disjoint with all ranges + if (fenced != null) { + final StoredTabletFile newFile = StoredTabletFile.of(existing.getPath(), fenced); + log.trace("Adding new file {} with range {}", newFile.getMetadataPath(), + newFile.getRange()); + + // Add the new file to the newFiles set, it will be added later if it doesn't match + // the existing file already. We still need to add to the set to be checked later + // even if it matches the existing file as later the deletion logic will check to + // see if the existing file is part of this set before deleting. This is done to + // make sure the existing file isn't deleted unless it is not needed/disjoint + // with all ranges. + newFiles.add(newFile); + } else { + log.trace("Found a disjoint file {} with range {} on delete", + existing.getMetadataPath(), existing.getRange()); + } + } + + // If the existingFile is not contained in the newFiles set then we can delete it + Sets.difference(existingFile, newFiles).forEach( + delete -> m.putDelete(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME, + existing.getMetadataText())); + + // Add any new files that don't match the existingFile + // As of now we will only have at most 2 files as up to 2 ranges are created + final List<StoredTabletFile> filesToAdd = + new ArrayList<>(Sets.difference(newFiles, existingFile)); + Preconditions.checkArgument(filesToAdd.size() <= 2, + "There should only be at most 2 StoredTabletFiles after computing new ranges."); + + // If more than 1 new file then re-calculate the num entries and size + if (filesToAdd.size() == 2) { + // This splits up the values in half and makes sure they total the original + // values + final Pair<DataFileValue,DataFileValue> newDfvs = computeNewDfv(value); + m.put(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME, + filesToAdd.get(0).getMetadataText(), newDfvs.getFirst().encodeAsValue()); + m.put(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME, + filesToAdd.get(1).getMetadataText(), newDfvs.getSecond().encodeAsValue()); + } else { + // Will be 0 or 1 files + filesToAdd + .forEach(newFile -> m.put(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME, + newFile.getMetadataText(), value.encodeAsValue())); + } + + if (!m.getUpdates().isEmpty()) { + bw.addMutation(m); + } + } + } else { + log.debug( + "Skipping metadata update on file for keyExtent {} for delete as not overlapping on rows.", + keyExtent); + } + } + + bw.flush(); + + return startAndEndTablets; + } catch (Exception ex) { + throw new AccumuloException(ex); + } + } + + // This method finds returns the deletion starting row (exclusive) for tablets that + // need to be actually deleted. If the startTablet is null then + // the deletion start row will just be null as all tablets are being deleted + // up to the end. Otherwise, this returns the endRow of the first tablet + // as the first tablet should be kept and will have been previously + // fenced if necessary + private Text getDeletionStartRow(final KeyExtent startTablet) { + if (startTablet == null) { + log.debug("First tablet for delete range is null"); + return null; + } + + final Text deletionStartRow = startTablet.endRow(); + log.debug("Start row is {} for deletion", deletionStartRow); + + return deletionStartRow; + } + + // This method finds returns the deletion ending row (inclusive) for tablets that + // need to be actually deleted. If the endTablet is null then + // the deletion end row will just be null as all tablets are being deleted + // after the start row. Otherwise, this returns the prevEndRow of the last tablet + // as the last tablet should be kept and will have been previously + // fenced if necessary + private Text getDeletionEndRow(final KeyExtent endTablet) { + if (endTablet == null) { + log.debug("Last tablet for delete range is null"); + return null; + } + + Text deletionEndRow = endTablet.prevEndRow(); + log.debug("Deletion end row is {}", deletionEndRow); + + return deletionEndRow; + } + + static TabletHostingGoal getMergeHostingGoal(KeyExtent range, Set<TabletHostingGoal> goals) { + TabletHostingGoal mergeHostingGoal = TabletHostingGoal.ONDEMAND; + if (range.isMeta() || goals.contains(TabletHostingGoal.ALWAYS)) { + mergeHostingGoal = TabletHostingGoal.ALWAYS; + } else if (goals.contains(TabletHostingGoal.NEVER)) { + mergeHostingGoal = TabletHostingGoal.NEVER; + } + return mergeHostingGoal; + } + + // Divide each new DFV in half and make sure the sum equals the original + @VisibleForTesting + static Pair<DataFileValue,DataFileValue> computeNewDfv(DataFileValue value) { + final DataFileValue file1Value = new DataFileValue(Math.max(1, value.getSize() / 2), + Math.max(1, value.getNumEntries() / 2), value.getTime()); + + final DataFileValue file2Value = + new DataFileValue(Math.max(1, value.getSize() - file1Value.getSize()), + Math.max(1, value.getNumEntries() - file1Value.getNumEntries()), value.getTime()); + + return new Pair<>(file1Value, file2Value); + } + + private Optional<TabletMetadata> loadTabletMetadata(Manager manager, TableId tabletId, + final Text row, TabletMetadata.ColumnType... columns) { + try (TabletsMetadata tabletsMetadata = manager.getContext().getAmple().readTablets() + .forTable(tabletId).overlapping(row, true, row).fetch(columns).build()) { + return tabletsMetadata.stream().findFirst(); + } + } + + // This method is used to detect if a tablet needs to be split/chopped for a delete + // Instead of performing a split or chop compaction, the tablet will have its files fenced. + private boolean needsFencingForDeletion(MergeInfo info, KeyExtent keyExtent) { + // Does this extent cover the end points of the delete? + final Predicate<Text> isWithin = r -> r != null && keyExtent.contains(r); + final Predicate<Text> isNotBoundary = + r -> !r.equals(keyExtent.endRow()) && !r.equals(keyExtent.prevEndRow()); + final KeyExtent deleteRange = info.getExtent(); + + return (keyExtent.overlaps(deleteRange) && Stream + .of(deleteRange.prevEndRow(), deleteRange.endRow()).anyMatch(isWithin.and(isNotBoundary))) + || info.needsToBeChopped(keyExtent); + } + + // Instead of splitting or chopping tablets for a delete we instead create ranges + // to exclude the portion of the tablet that should be deleted + private Text followingRow(Text row) { + if (row == null) { + return null; + } + return new Key(row).followingKey(PartialKey.ROW).getRow(); + } + + // Instead of splitting or chopping tablets for a delete we instead create ranges + // to exclude the portion of the tablet that should be deleted + private List<Range> createRangesForDeletion(TabletMetadata tabletMetadata, + final KeyExtent deleteRange) { + final KeyExtent tabletExtent = tabletMetadata.getExtent(); + + // If the delete range wholly contains the tablet being deleted then there is no range to clip + // files to because the files should be completely dropped. + Preconditions.checkArgument(!deleteRange.contains(tabletExtent), "delete range:%s tablet:%s", + deleteRange, tabletExtent); + + final List<Range> ranges = new ArrayList<>(); + + if (deleteRange.overlaps(tabletExtent)) { + if (deleteRange.prevEndRow() != null + && tabletExtent.contains(followingRow(deleteRange.prevEndRow()))) { + log.trace("Fencing tablet {} files to ({},{}]", tabletExtent, tabletExtent.prevEndRow(), + deleteRange.prevEndRow()); + ranges.add(new Range(tabletExtent.prevEndRow(), false, deleteRange.prevEndRow(), true)); + } + + // This covers the case of when a deletion range overlaps the last tablet. We need to create a + // range that excludes the deletion. + if (deleteRange.endRow() != null + && tabletMetadata.getExtent().contains(deleteRange.endRow())) { + log.trace("Fencing tablet {} files to ({},{}]", tabletExtent, deleteRange.endRow(), + tabletExtent.endRow()); + ranges.add(new Range(deleteRange.endRow(), false, tabletExtent.endRow(), true)); + } + } else { + log.trace("Fencing tablet {} files to itself because it does not overlap delete range", + tabletExtent); + ranges.add(tabletExtent.toDataRange()); + } + + return ranges; + } + + private static boolean isFirstTabletInTable(KeyExtent tablet) { + return tablet != null && tablet.prevEndRow() == null; + } + + private static boolean isLastTabletInTable(KeyExtent tablet) { + return tablet != null && tablet.endRow() == null; + } + + private static boolean areContiguousTablets(KeyExtent firstTablet, KeyExtent lastTablet) { + return firstTablet != null && lastTablet != null + && Objects.equals(firstTablet.endRow(), lastTablet.prevEndRow()); + } + + private boolean hasTabletsToDelete(final KeyExtent firstTabletInRange, + final KeyExtent lastTableInRange) { + // If the tablets are equal (and not null) then the deletion range is just part of 1 tablet + // which will be fenced so there are no tablets to delete. The null check is because if both + // are null then we are just deleting everything, so we do have tablets to delete + if (Objects.equals(firstTabletInRange, lastTableInRange) && firstTabletInRange != null) { + log.trace( + "No tablets to delete, firstTablet {} equals lastTablet {} in deletion range and was fenced.", + firstTabletInRange, lastTableInRange); + return false; + // If the lastTablet of the deletion range is the first tablet of the table it has been fenced + // already so nothing to actually delete before it + } else if (isFirstTabletInTable(lastTableInRange)) { + log.trace( + "No tablets to delete, lastTablet {} in deletion range is the first tablet of the table and was fenced.", + lastTableInRange); + return false; + // If the firstTablet of the deletion range is the last tablet of the table it has been fenced + // already so nothing to actually delete after it + } else if (isLastTabletInTable(firstTabletInRange)) { + log.trace( + "No tablets to delete, firstTablet {} in deletion range is the last tablet of the table and was fenced.", + firstTabletInRange); + return false; + // If the firstTablet and lastTablet are contiguous tablets then there is nothing to delete as + // each will be fenced and nothing between + } else if (areContiguousTablets(firstTabletInRange, lastTableInRange)) { + log.trace( + "No tablets to delete, firstTablet {} and lastTablet {} in deletion range are contiguous and were fenced.", + firstTabletInRange, lastTableInRange); + return false; + } + + return true; + } + + static void deleteTablets(MergeInfo info, Range scanRange, BatchWriter bw, AccumuloClient client) + throws TableNotFoundException, MutationsRejectedException { + Scanner scanner; + Mutation m; + // Delete everything in the other tablets + // group all deletes into tablet into one mutation, this makes tablets + // either disappear entirely or not all.. this is important for the case + // where the process terminates in the loop below... + scanner = client.createScanner(info.getExtent().isMeta() ? RootTable.NAME : MetadataTable.NAME, + Authorizations.EMPTY); + log.debug("Deleting range {}", scanRange); + scanner.setRange(scanRange); + RowIterator rowIter = new RowIterator(scanner); + while (rowIter.hasNext()) { + Iterator<Map.Entry<Key,Value>> row = rowIter.next(); + m = null; + while (row.hasNext()) { + Map.Entry<Key,Value> entry = row.next(); + Key key = entry.getKey(); + + if (m == null) { + m = new Mutation(key.getRow()); + } + + m.putDelete(key.getColumnFamily(), key.getColumnQualifier()); + log.debug("deleting entry {}", key); + } + bw.addMutation(m); + } + + bw.flush(); + } + + static KeyExtent getHighTablet(Manager manager, KeyExtent range) throws AccumuloException { + try { + AccumuloClient client = manager.getContext(); + Scanner scanner = client.createScanner(range.isMeta() ? RootTable.NAME : MetadataTable.NAME, + Authorizations.EMPTY); + MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner); + KeyExtent start = new KeyExtent(range.tableId(), range.endRow(), null); + scanner.setRange(new Range(start.toMetaRow(), null)); + Iterator<Map.Entry<Key,Value>> iterator = scanner.iterator(); + if (!iterator.hasNext()) { + throw new AccumuloException("No last tablet for a merge " + range); + } + Map.Entry<Key,Value> entry = iterator.next(); + KeyExtent highTablet = KeyExtent.fromMetaPrevRow(entry); + if (!highTablet.tableId().equals(range.tableId())) { + throw new AccumuloException("No last tablet for merge " + range + " " + highTablet); + } + return highTablet; + } catch (Exception ex) { + throw new AccumuloException("Unexpected failure finding the last tablet for a merge " + range, + ex); + } + } +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/TableRangeOpWait.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java similarity index 86% rename from server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/TableRangeOpWait.java rename to server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java index 9b73d4c538..574335dcf3 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/TableRangeOpWait.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java @@ -25,11 +25,12 @@ import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.tableOps.ManagerRepo; import org.apache.accumulo.manager.tableOps.Utils; import org.apache.accumulo.server.manager.state.MergeInfo; -import org.apache.accumulo.server.manager.state.MergeState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** + * ELASTICITY_TODO edit these docs which are pre elasticity changes. Best done after #3763 + * * Merge makes things hard. * * Typically, a client will read the list of tablets, and begin an operation on that tablet at the @@ -46,26 +47,18 @@ import org.slf4j.LoggerFactory; * Normal operations, like bulk imports, will grab the read lock and prevent merges (writes) while * they run. Merge operations will lock out some operations while they run. */ -class TableRangeOpWait extends ManagerRepo { - private static final Logger log = LoggerFactory.getLogger(TableRangeOpWait.class); +class FinishTableRangeOp extends ManagerRepo { + private static final Logger log = LoggerFactory.getLogger(FinishTableRangeOp.class); private static final long serialVersionUID = 1L; private TableId tableId; private NamespaceId namespaceId; - public TableRangeOpWait(NamespaceId namespaceId, TableId tableId) { + public FinishTableRangeOp(NamespaceId namespaceId, TableId tableId) { this.tableId = tableId; this.namespaceId = namespaceId; } - @Override - public long isReady(long tid, Manager env) { - if (!env.getMergeInfo(tableId).getState().equals(MergeState.NONE)) { - return 50; - } - return 0; - } - @Override public Repo<Manager> call(long tid, Manager manager) throws Exception { MergeInfo mergeInfo = manager.getMergeInfo(tableId); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java new file mode 100644 index 0000000000..d6a7de9716 --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.tableOps.merge; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.admin.TabletHostingGoal; +import org.apache.accumulo.core.clientImpl.TabletHostingGoalUtil; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.NamespaceId; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.metadata.schema.MetadataTime; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.tableOps.ManagerRepo; +import org.apache.accumulo.server.gc.AllVolumesDirectory; +import org.apache.accumulo.server.manager.state.MergeInfo; +import org.apache.accumulo.server.manager.state.MergeState; +import org.apache.accumulo.server.tablets.TabletTime; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +public class MergeTablets extends ManagerRepo { + + private static final long serialVersionUID = 1L; + + private static final Logger log = LoggerFactory.getLogger(MergeTablets.class); + + private final NamespaceId namespaceId; + private final TableId tableId; + + public MergeTablets(NamespaceId namespaceId, TableId tableId) { + this.namespaceId = namespaceId; + this.tableId = tableId; + } + + @Override + public Repo<Manager> call(long tid, Manager manager) throws Exception { + MergeInfo mergeInfo = manager.getMergeInfo(tableId); + Preconditions.checkState(mergeInfo.getState() == MergeState.MERGING); + Preconditions.checkState(!mergeInfo.isDelete()); + + var extent = mergeInfo.getExtent(); + long tabletCount; + + try (var tabletMeta = manager.getContext().getAmple().readTablets().forTable(extent.tableId()) + .overlapping(extent.prevEndRow(), extent.endRow()).fetch(TabletMetadata.ColumnType.PREV_ROW) + .checkConsistency().build()) { + tabletCount = tabletMeta.stream().count(); + } + + if (tabletCount > 1) { + mergeMetadataRecords(manager, mergeInfo); + } + + return new FinishTableRangeOp(namespaceId, tableId); + } + + private void mergeMetadataRecords(Manager manager, MergeInfo info) throws AccumuloException { + KeyExtent range = info.getExtent(); + log.debug("Merging metadata for {}", range); + KeyExtent stop = DeleteRows.getHighTablet(manager, range); + log.debug("Highest tablet is {}", stop); + Value firstPrevRowValue = null; + Text stopRow = stop.toMetaRow(); + Text start = range.prevEndRow(); + if (start == null) { + start = new Text(); + } + Range scanRange = new Range(MetadataSchema.TabletsSection.encodeRow(range.tableId(), start), + false, stopRow, false); + String targetSystemTable = MetadataTable.NAME; + if (range.isMeta()) { + targetSystemTable = RootTable.NAME; + } + Set<TabletHostingGoal> goals = new HashSet<>(); + + AccumuloClient client = manager.getContext(); + + KeyExtent stopExtent = KeyExtent.fromMetaRow(stop.toMetaRow()); + KeyExtent previousKeyExtent = null; + KeyExtent lastExtent = null; + + try (BatchWriter bw = client.createBatchWriter(targetSystemTable)) { + long fileCount = 0; + // Make file entries in highest tablet + Scanner scanner = client.createScanner(targetSystemTable, Authorizations.EMPTY); + // Update to set the range to include the highest tablet + scanner.setRange(new Range(MetadataSchema.TabletsSection.encodeRow(range.tableId(), start), + false, stopRow, true)); + MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner); + MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(scanner); + MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner); + MetadataSchema.TabletsSection.HostingColumnFamily.GOAL_COLUMN.fetch(scanner); + scanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME); + Mutation m = new Mutation(stopRow); + MetadataTime maxLogicalTime = null; + for (Map.Entry<Key,Value> entry : scanner) { + Key key = entry.getKey(); + Value value = entry.getValue(); + + final KeyExtent keyExtent = KeyExtent.fromMetaRow(key.getRow()); + + // Keep track of the last Key Extent seen so we can use it to fence + // of RFiles when merging the metadata + if (lastExtent != null && !keyExtent.equals(lastExtent)) { + previousKeyExtent = lastExtent; + } + + // Special case to handle the highest/stop tablet, which is where files are + // merged to. The existing merge code won't delete files from this tablet + // so we need to handle the deletes in this tablet when fencing files. + // We may be able to make this simpler in the future. + if (keyExtent.equals(stopExtent)) { + if (previousKeyExtent != null && key.getColumnFamily() + .equals(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME)) { + + // Fence off existing files by the end row of the previous tablet and current tablet + final StoredTabletFile existing = StoredTabletFile.of(key.getColumnQualifier()); + // The end row should be inclusive for the current tablet and the previous end row + // should be exclusive for the start row + Range fenced = new Range(previousKeyExtent.endRow(), false, keyExtent.endRow(), true); + + // Clip range if exists + fenced = existing.hasRange() ? existing.getRange().clip(fenced) : fenced; + + final StoredTabletFile newFile = StoredTabletFile.of(existing.getPath(), fenced); + // If the existing metadata does not match then we need to delete the old + // and replace with a new range + if (!existing.equals(newFile)) { + m.putDelete(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME, + existing.getMetadataText()); + m.put(key.getColumnFamily(), newFile.getMetadataText(), value); + } + + fileCount++; + } + // For the highest tablet we only care about the DataFileColumnFamily + continue; + } + + // Handle metadata updates for all other tablets except the highest tablet + // Ranges are created for the files and then added to the highest tablet in + // the merge range. Deletes are handled later for the old files when the tablets + // are removed. + if (key.getColumnFamily().equals(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME)) { + final StoredTabletFile existing = StoredTabletFile.of(key.getColumnQualifier()); + + // Fence off files by the previous tablet and current tablet that is being merged + // The end row should be inclusive for the current tablet and the previous end row should + // be exclusive for the start row. + Range fenced = new Range(previousKeyExtent != null ? previousKeyExtent.endRow() : null, + false, keyExtent.endRow(), true); + + // Clip range with the tablet range if the range already exists + fenced = existing.hasRange() ? existing.getRange().clip(fenced) : fenced; + + // Move the file and range to the last tablet + StoredTabletFile newFile = StoredTabletFile.of(existing.getPath(), fenced); + m.put(key.getColumnFamily(), newFile.getMetadataText(), value); + + fileCount++; + } else if (MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key) + && firstPrevRowValue == null) { + log.debug("prevRow entry for lowest tablet is {}", value); + firstPrevRowValue = new Value(value); + } else if (MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(key)) { + maxLogicalTime = + TabletTime.maxMetadataTime(maxLogicalTime, MetadataTime.parse(value.toString())); + } else if (MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN + .hasColumns(key)) { + var allVolumesDir = new AllVolumesDirectory(range.tableId(), value.toString()); + bw.addMutation(manager.getContext().getAmple().createDeleteMutation(allVolumesDir)); + } else if (MetadataSchema.TabletsSection.HostingColumnFamily.GOAL_COLUMN.hasColumns(key)) { + TabletHostingGoal thisGoal = TabletHostingGoalUtil.fromValue(value); + goals.add(thisGoal); + } + + lastExtent = keyExtent; + } + + // read the logical time from the last tablet in the merge range, it is not included in + // the loop above + scanner = client.createScanner(targetSystemTable, Authorizations.EMPTY); + scanner.setRange(new Range(stopRow)); + MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(scanner); + MetadataSchema.TabletsSection.HostingColumnFamily.GOAL_COLUMN.fetch(scanner); + scanner.fetchColumnFamily(MetadataSchema.TabletsSection.ExternalCompactionColumnFamily.NAME); + Set<String> extCompIds = new HashSet<>(); + for (Map.Entry<Key,Value> entry : scanner) { + if (MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN + .hasColumns(entry.getKey())) { + maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime, + MetadataTime.parse(entry.getValue().toString())); + } else if (MetadataSchema.TabletsSection.ExternalCompactionColumnFamily.NAME + .equals(entry.getKey().getColumnFamily())) { + extCompIds.add(entry.getKey().getColumnQualifierData().toString()); + } else if (MetadataSchema.TabletsSection.HostingColumnFamily.GOAL_COLUMN + .hasColumns(entry.getKey())) { + TabletHostingGoal thisGoal = TabletHostingGoalUtil.fromValue(entry.getValue()); + goals.add(thisGoal); + } + } + + if (maxLogicalTime != null) { + MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN.put(m, + new Value(maxLogicalTime.encode())); + } + + // delete any entries for external compactions + extCompIds.forEach(ecid -> m + .putDelete(MetadataSchema.TabletsSection.ExternalCompactionColumnFamily.STR_NAME, ecid)); + + // Set the TabletHostingGoal for this tablet based on the goals of the other tablets in + // the merge range. Always takes priority over never. + TabletHostingGoal mergeHostingGoal = DeleteRows.getMergeHostingGoal(range, goals); + MetadataSchema.TabletsSection.HostingColumnFamily.GOAL_COLUMN.put(m, + TabletHostingGoalUtil.toValue(mergeHostingGoal)); + + if (!m.getUpdates().isEmpty()) { + bw.addMutation(m); + } + + bw.flush(); + + log.debug("Moved {} files to {}", fileCount, stop); + + if (firstPrevRowValue == null) { + log.debug("tablet already merged"); + return; + } + + stop = new KeyExtent(stop.tableId(), stop.endRow(), + MetadataSchema.TabletsSection.TabletColumnFamily.decodePrevEndRow(firstPrevRowValue)); + Mutation updatePrevRow = + MetadataSchema.TabletsSection.TabletColumnFamily.createPrevRowMutation(stop); + log.debug("Setting the prevRow for last tablet: {}", stop); + bw.addMutation(updatePrevRow); + bw.flush(); + + DeleteRows.deleteTablets(info, scanRange, bw, client); + + } catch (Exception ex) { + throw new AccumuloException(ex); + } + } +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/TableRangeOp.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/TableRangeOp.java index 2990b9e59f..fa1f587e3a 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/TableRangeOp.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/TableRangeOp.java @@ -86,12 +86,14 @@ public class TableRangeOp extends ManagerRepo { MergeInfo info = env.getMergeInfo(tableId); + // ELASTICITY_TODO can remove MergeState and MergeInfo once opid is set, these only exists now + // to get tablets unassigned. Once an opid is set on a tablet it will be unassigned. See #3763 if (info.getState() == MergeState.NONE) { KeyExtent range = new KeyExtent(tableId, end, start); - env.setMergeState(new MergeInfo(range, op), MergeState.STARTED); + env.setMergeState(new MergeInfo(range, op), MergeState.WAITING_FOR_OFFLINE); } - return new TableRangeOpWait(namespaceId, tableId); + return new WaitForOffline(namespaceId, tableId); } @Override diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/WaitForOffline.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/WaitForOffline.java new file mode 100644 index 0000000000..c55ff213c5 --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/WaitForOffline.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.tableOps.merge; + +import org.apache.accumulo.core.data.NamespaceId; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.fate.FateTxId; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.tableOps.ManagerRepo; +import org.apache.accumulo.server.manager.state.MergeInfo; +import org.apache.accumulo.server.manager.state.MergeState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class WaitForOffline extends ManagerRepo { + + private static final Logger log = LoggerFactory.getLogger(WaitForOffline.class); + + private static final long serialVersionUID = 1L; + + private final NamespaceId namespaceId; + private final TableId tableId; + + public WaitForOffline(NamespaceId namespaceId, TableId tableId) { + this.namespaceId = namespaceId; + this.tableId = tableId; + } + + @Override + public long isReady(long tid, Manager env) throws Exception { + MergeInfo mergeInfo = env.getMergeInfo(tableId); + var extent = mergeInfo.getExtent(); + + long tabletsWithLocations; + + try (var tabletMeta = env.getContext().getAmple().readTablets().forTable(extent.tableId()) + .overlapping(extent.prevEndRow(), extent.endRow()) + .fetch(TabletMetadata.ColumnType.PREV_ROW, TabletMetadata.ColumnType.LOCATION) + .checkConsistency().build()) { + tabletsWithLocations = tabletMeta.stream().filter(tm -> tm.getLocation() != null).count(); + } + + log.info("{} waiting for {} tablets with locations", FateTxId.formatTid(tid), + tabletsWithLocations); + + if (tabletsWithLocations > 0) { + return 1000; + } else { + return 0; + } + } + + @Override + public Repo<Manager> call(long tid, Manager env) throws Exception { + MergeInfo mergeInfo = env.getMergeInfo(tableId); + env.setMergeState(mergeInfo, MergeState.MERGING); + if (mergeInfo.isDelete()) { + return new DeleteRows(namespaceId, tableId); + } else { + return new MergeTablets(namespaceId, tableId); + } + } +} diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/TabletGroupWatcherTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/DeleteRowsTest.java similarity index 87% rename from server/manager/src/test/java/org/apache/accumulo/manager/TabletGroupWatcherTest.java rename to server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/DeleteRowsTest.java index 35026efbe9..0dfe5ee6a5 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/TabletGroupWatcherTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/DeleteRowsTest.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.accumulo.manager; +package org.apache.accumulo.manager.tableOps.merge; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -24,12 +24,12 @@ import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.util.Pair; import org.junit.jupiter.api.Test; -public class TabletGroupWatcherTest { +public class DeleteRowsTest { @Test public void testComputeNewDfvEven() { DataFileValue original = new DataFileValue(20, 10, 100); - Pair<DataFileValue,DataFileValue> newValues = TabletGroupWatcher.computeNewDfv(original); + Pair<DataFileValue,DataFileValue> newValues = DeleteRows.computeNewDfv(original); assertEquals(10, newValues.getFirst().getSize()); assertEquals(5, newValues.getFirst().getNumEntries()); @@ -42,7 +42,7 @@ public class TabletGroupWatcherTest { @Test public void testComputeNewDfvOdd() { DataFileValue original = new DataFileValue(21, 11, 100); - Pair<DataFileValue,DataFileValue> newValues = TabletGroupWatcher.computeNewDfv(original); + Pair<DataFileValue,DataFileValue> newValues = DeleteRows.computeNewDfv(original); assertEquals(10, newValues.getFirst().getSize()); assertEquals(5, newValues.getFirst().getNumEntries()); @@ -55,7 +55,7 @@ public class TabletGroupWatcherTest { @Test public void testComputeNewDfvSmall() { DataFileValue original = new DataFileValue(1, 2, 100); - Pair<DataFileValue,DataFileValue> newValues = TabletGroupWatcher.computeNewDfv(original); + Pair<DataFileValue,DataFileValue> newValues = DeleteRows.computeNewDfv(original); assertEquals(1, newValues.getFirst().getSize()); assertEquals(1, newValues.getFirst().getNumEntries()); diff --git a/test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java b/test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java deleted file mode 100644 index 1806f4c638..0000000000 --- a/test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java +++ /dev/null @@ -1,247 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.test.manager; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; - -import org.apache.accumulo.core.client.Accumulo; -import org.apache.accumulo.core.client.AccumuloClient; -import org.apache.accumulo.core.client.BatchDeleter; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.MutationsRejectedException; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.admin.NewTableConfiguration; -import org.apache.accumulo.core.client.admin.TabletHostingGoal; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.TableId; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.manager.state.TabletManagement; -import org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction; -import org.apache.accumulo.core.manager.thrift.ManagerState; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.TServerInstance; -import org.apache.accumulo.core.metadata.TabletState; -import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; -import org.apache.accumulo.core.metadata.schema.TabletMetadata; -import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; -import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.security.TablePermission; -import org.apache.accumulo.core.tabletserver.log.LogEntry; -import org.apache.accumulo.manager.state.MergeStats; -import org.apache.accumulo.server.ServerContext; -import org.apache.accumulo.server.manager.state.Assignment; -import org.apache.accumulo.server.manager.state.CurrentState; -import org.apache.accumulo.server.manager.state.MergeInfo; -import org.apache.accumulo.server.manager.state.MergeState; -import org.apache.accumulo.server.manager.state.TabletStateStore; -import org.apache.accumulo.test.functional.ConfigurableMacBase; -import org.apache.hadoop.io.Text; -import org.junit.jupiter.api.Test; - -import com.google.common.net.HostAndPort; - -public class MergeStateIT extends ConfigurableMacBase { - - private static class MockCurrentState implements CurrentState { - - TServerInstance someTServer = - new TServerInstance(HostAndPort.fromParts("127.0.0.1", 1234), 0x123456); - MergeInfo mergeInfo; - - MockCurrentState(MergeInfo info) { - this.mergeInfo = info; - } - - @Override - public Set<TableId> onlineTables() { - return Collections.singleton(TableId.of("t")); - } - - @Override - public Set<TServerInstance> onlineTabletServers() { - return Collections.singleton(someTServer); - } - - @Override - public Map<String,Set<TServerInstance>> tServerResourceGroups() { - return new HashMap<>(); - } - - @Override - public Collection<MergeInfo> merges() { - return Collections.singleton(mergeInfo); - } - - @Override - public Set<KeyExtent> migrationsSnapshot() { - return Collections.emptySet(); - } - - @Override - public ManagerState getManagerState() { - return ManagerState.NORMAL; - } - - @Override - public Map<Long,Map<String,String>> getCompactionHints() { - return Map.of(); - } - - @Override - public Set<TServerInstance> shutdownServers() { - return Collections.emptySet(); - } - - } - - private static void update(AccumuloClient c, Mutation m) - throws TableNotFoundException, MutationsRejectedException { - try (BatchWriter bw = c.createBatchWriter(MetadataTable.NAME)) { - bw.addMutation(m); - } - } - - @Test - public void test() throws Exception { - ServerContext context = getServerContext(); - try (AccumuloClient accumuloClient = Accumulo.newClient().from(getClientProperties()).build()) { - accumuloClient.securityOperations().grantTablePermission(accumuloClient.whoami(), - MetadataTable.NAME, TablePermission.WRITE); - BatchWriter bw = accumuloClient.createBatchWriter(MetadataTable.NAME); - - TreeSet<Text> splits = new TreeSet<>(); - splits.add(new Text("a")); - splits.add(new Text("e")); - splits.add(new Text("j")); - splits.add(new Text("o")); - splits.add(new Text("t")); - splits.add(new Text("z")); - NewTableConfiguration ntc = new NewTableConfiguration(); - ntc.withSplits(splits); - accumuloClient.tableOperations().create("merge_test_table"); - TableId tableId = - TableId.of(accumuloClient.tableOperations().tableIdMap().get("merge_test_table")); - - Text pr = null; - for (Text split : splits) { - Mutation prevRow = - TabletColumnFamily.createPrevRowMutation(new KeyExtent(tableId, split, pr)); - prevRow.put(CurrentLocationColumnFamily.NAME, new Text("123456"), - new Value("127.0.0.1:1234")); - bw.addMutation(prevRow); - pr = split; - } - bw.close(); - - // Read out the TabletLocationStates - MockCurrentState state = - new MockCurrentState(new MergeInfo(new KeyExtent(tableId, new Text("p"), new Text("e")), - MergeInfo.Operation.MERGE)); - - // Verify the tablet state: hosted, and count - TabletStateStore metaDataStateStore = - TabletStateStore.getStoreForLevel(DataLevel.USER, context, state); - int count = 0; - for (TabletManagement mti : metaDataStateStore) { - if (mti != null) { - assertEquals(1, mti.actions.size()); - assertEquals(ManagementAction.NEEDS_LOCATION_UPDATE, mti.getActions().iterator().next()); - count++; - } - } - assertEquals(6, count); - - // Create the hole - // Split the tablet at one end of the range - Mutation m = TabletColumnFamily - .createPrevRowMutation(new KeyExtent(tableId, new Text("t"), new Text("p"))); - TabletColumnFamily.SPLIT_RATIO_COLUMN.put(m, new Value("0.5")); - TabletColumnFamily.OLD_PREV_ROW_COLUMN.put(m, - TabletColumnFamily.encodePrevEndRow(new Text("o"))); - update(accumuloClient, m); - - // ELASTICITY_TODO: Tried to fix this up, not sure how this works - - // do the state check - MergeStats stats = scan(state, metaDataStateStore); - // MergeState newState = stats.nextMergeState(accumuloClient, state); - // assertEquals(MergeState.WAITING_FOR_OFFLINE, newState); - - // unassign the tablets - try (BatchDeleter deleter = - accumuloClient.createBatchDeleter(MetadataTable.NAME, Authorizations.EMPTY, 1000)) { - deleter.fetchColumnFamily(CurrentLocationColumnFamily.NAME); - deleter.setRanges(Collections.singletonList(new Range())); - deleter.delete(); - } - - // now we should be ready to merge but, we have inconsistent metadata - stats = scan(state, metaDataStateStore); - assertEquals(MergeState.WAITING_FOR_OFFLINE, stats.nextMergeState(accumuloClient, state)); - - // finish the split - KeyExtent tablet = new KeyExtent(tableId, new Text("p"), new Text("o")); - m = TabletColumnFamily.createPrevRowMutation(tablet); - TabletColumnFamily.SPLIT_RATIO_COLUMN.put(m, new Value("0.5")); - update(accumuloClient, m); - metaDataStateStore - .setLocations(Collections.singletonList(new Assignment(tablet, state.someTServer, null))); - - stats = scan(state, metaDataStateStore); - assertEquals(MergeState.WAITING_FOR_OFFLINE, stats.nextMergeState(accumuloClient, state)); - - // take it offline - m = TabletColumnFamily.createPrevRowMutation(tablet); - List<LogEntry> walogs = Collections.emptyList(); - metaDataStateStore.unassign( - Collections.singletonList(TabletMetadata.builder(tablet) - .putLocation(Location.current(state.someTServer)) - .putHostingGoal(TabletHostingGoal.ALWAYS).build(ColumnType.LAST, ColumnType.SUSPEND)), - null); - - // now we can split - stats = scan(state, metaDataStateStore); - assertEquals(MergeState.MERGING, stats.nextMergeState(accumuloClient, state)); - } - } - - private MergeStats scan(MockCurrentState state, TabletStateStore metaDataStateStore) { - MergeStats stats = new MergeStats(state.mergeInfo); - stats.getMergeInfo().setState(MergeState.WAITING_FOR_OFFLINE); - for (TabletManagement tm : metaDataStateStore) { - TabletMetadata tabletMetadata = tm.getTabletMetadata(); - stats.update(tm.getTabletMetadata().getExtent(), - TabletState.compute(tabletMetadata, state.onlineTabletServers())); - } - return stats; - } -}