This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 5b7272595086ae92521f8f7e5195cf574a6dea13 Merge: 32e4f3fda2 d32d10a9f3 Author: Christopher L. Shannon <cshan...@apache.org> AuthorDate: Fri Jun 21 08:00:28 2024 -0400 Merge branch '2.1' .../accumulo/manager/TabletGroupWatcher.java | 78 +++++++++++++++++++--- 1 file changed, 68 insertions(+), 10 deletions(-) diff --cc server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index 443df6c8f3,827f688c49..148882ae83 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@@ -860,17 -797,18 +860,27 @@@ abstract class TabletGroupWatcher exten AccumuloClient client = manager.getContext(); + KeyExtent stopExtent = KeyExtent.fromMetaRow(stop.toMetaRow()); ++ + // Used when scanning the table to track the extent of the previous column. + // This value is updated for every column read at the end of the loop below + // with the extent for the column. We scan multiple columns for each tablet, + // so this is useful to detect when we have reached a different tablet. + KeyExtent prevColumnExtent = null; + + // Used when scanning the table to track the previous tablet from the + // current one. This value will update whenever the current extent for + // the column read in the loop is different from the previously read column, + // which is tracked by prevColumnExtent KeyExtent previousKeyExtent = null; - KeyExtent lastExtent = null; + // Check if we have already previously fenced the tablets + if (highTablet.isMerged()) { + Manager.log.debug("tablet metadata already fenced for merge {}", range); + // Return as we already fenced the files + return; + } + try (BatchWriter bw = client.createBatchWriter(targetSystemTable)) { long fileCount = 0; // Make file entries in highest tablet @@@ -892,79 -828,48 +902,100 @@@ Key key = entry.getKey(); Value value = entry.getValue(); + // Verify that Tablet is offline + if (isTabletAssigned(key)) { + throw new IllegalStateException( + "Tablet " + key.getRow() + " is assigned during a merge!"); + // Verify that Tablet has no WALs + } else if (key.getColumnFamily().equals(LogColumnFamily.NAME)) { + throw new IllegalStateException("Tablet " + key.getRow() + " has walogs during a merge!"); + } + final KeyExtent keyExtent = KeyExtent.fromMetaRow(key.getRow()); - // Keep track of the last Key Extent seen so we can use it to fence - // of RFiles when merging the metadata - if (lastExtent != null && !keyExtent.equals(lastExtent)) { - previousKeyExtent = lastExtent; - // Keep track of extents to verify the linked list. ++ // Keep track of extents to verify the linked list and also we need the ++ // prevColumnExtent seen so we can use it to fence off RFiles when merging + // 'keyExtent' represents the current tablet for this colum + // 'prevColumnExtent' is the extent seen from the previous column read. + // 'previousKeyExtent' is the extent for the previous tablet + // + // If 'prevColumnExtent' is different from 'keyExtent' then we have reached a new tablet + // and we can update 'previousKeyExtent' with the value from 'prevColumnExtent' + if (prevColumnExtent != null && !keyExtent.equals(prevColumnExtent)) { + previousKeyExtent = prevColumnExtent; } - // Verify that Tablet is offline - if (isTabletAssigned(key)) { - throw new IllegalStateException( - "Tablet " + key.getRow() + " is assigned during a merge!"); - // Verify that Tablet has no WALs - } else if (key.getColumnFamily().equals(LogColumnFamily.NAME)) { - throw new IllegalStateException("Tablet " + key.getRow() + " has walogs during a merge!"); - } else if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) { - m.put(key.getColumnFamily(), key.getColumnQualifier(), value); + // Special case to handle the highest/stop tablet, which is where files are + // merged to. The existing merge code won't delete files from this tablet + // so we need to handle the deletes in this tablet when fencing files. + // We may be able to make this simpler in the future. + if (keyExtent.equals(stopExtent)) { + if (previousKeyExtent != null + && key.getColumnFamily().equals(DataFileColumnFamily.NAME)) { + + // Fence off existing files by the end row of the previous tablet and current tablet + final StoredTabletFile existing = StoredTabletFile.of(key.getColumnQualifier()); + // The end row should be inclusive for the current tablet and the previous end row + // should be exclusive for the start row + Range fenced = new Range(previousKeyExtent.endRow(), false, keyExtent.endRow(), true); + + // Clip range if exists + fenced = existing.hasRange() ? existing.getRange().clip(fenced) : fenced; + + final StoredTabletFile newFile = StoredTabletFile.of(existing.getPath(), fenced); + // If the existing metadata does not match then we need to delete the old + // and replace with a new range + if (!existing.equals(newFile)) { + m.putDelete(DataFileColumnFamily.NAME, existing.getMetadataText()); + m.put(key.getColumnFamily(), newFile.getMetadataText(), value); + } + + fileCount++; + } + // For the highest tablet we only care about the DataFileColumnFamily + continue; + } + + // Handle metadata updates for all other tablets except the highest tablet + // Ranges are created for the files and then added to the highest tablet in + // the merge range. Deletes are handled later for the old files when the tablets + // are removed. + if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) { + final StoredTabletFile existing = StoredTabletFile.of(key.getColumnQualifier()); + + // Fence off files by the previous tablet and current tablet that is being merged + // The end row should be inclusive for the current tablet and the previous end row should + // be exclusive for the start row. + Range fenced = new Range(previousKeyExtent != null ? previousKeyExtent.endRow() : null, + false, keyExtent.endRow(), true); + + // Clip range with the tablet range if the range already exists + fenced = existing.hasRange() ? existing.getRange().clip(fenced) : fenced; + + // Move the file and range to the last tablet + StoredTabletFile newFile = StoredTabletFile.of(existing.getPath(), fenced); + m.put(key.getColumnFamily(), newFile.getMetadataText(), value); + fileCount++; - } else if (TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key) - && firstPrevRowValue == null) { - Manager.log.debug("prevRow entry for lowest tablet is {}", value); - firstPrevRowValue = new Value(value); + } else if (TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) { + // Handle the first tablet in the range + if (firstPrevRowValue == null) { + // This is the first PREV_ROW_COLUMN we are seeing, therefore this should be the first + // tablet and previousKeyExtent should be null + Preconditions.checkState(previousKeyExtent == null, + "previousKeyExtent was unexpectedly set when scanning metadata table %s %s %s", + previousKeyExtent, keyExtent, value); + Manager.log.debug("prevRow entry for lowest tablet is {}", value); + firstPrevRowValue = value; + // Handle other tablets, besides the first tablet. This will process every tablet in the + // merge range except for the last tablet as that tablet is not part of the scan range. + } else { + // This is not the first PREV_ROW_COLUMN we are seeing, therefore previousKeyExtent + // should never be null as this is at least the second tablet we are iterating over + // Because this loop does not process the last tablet in the range, this check should + // always be true because if the merge already happened we would not reach this point. + validateLinkedList(previousKeyExtent, value, keyExtent); + } } else if (ServerColumnFamily.TIME_COLUMN.hasColumns(key)) { maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime, MetadataTime.parse(value.toString())); @@@ -999,284 -910,62 +1036,305 @@@ // delete any entries for external compactions extCompIds.forEach(ecid -> m.putDelete(ExternalCompactionColumnFamily.STR_NAME, ecid)); - if (!m.getUpdates().isEmpty()) { - bw.addMutation(m); - } + // Add a marker so we know the tablets have been fenced in case the merge operation + // needs to be recovered and restarted to finish later. + MergedColumnFamily.MERGED_COLUMN.put(m, MergedColumnFamily.MERGED_VALUE); - bw.flush(); + // Add the prev row column update to the same mutation as the + // file updates so it will be atomic and only update the prev row + // if the tablets were fenced + Preconditions.checkState(firstPrevRowValue != null, + "Previous row entry for lowest tablet was not found."); + - Manager.log.debug("Moved {} files to {}", fileCount, stop); - - // If this is null then we already merged as only 1 tablet is part of the range - // and the previous scan skips the last tablet, so we just return. This makes the merge - // idempotent if run more than once. - if (firstPrevRowValue == null) { - Manager.log.debug("tablet already merged"); - return; - } else { - // At this point prevColumnExtent should never be null because at least 1 previous - // tablet was scanned. lastTabletPrevRowValue should also never be null as we read - // it from the last tablet. - validateLinkedList(prevColumnExtent, lastTabletPrevRowValue, stop); - } ++ // lastTabletPrevRowValue should also never be null as it should always ++ // be read from the last tablet and we already verified we did not ++ // already complete the merge by checking the merged marker earlier ++ validateLinkedList(prevColumnExtent, lastTabletPrevRowValue, stop); + stop = new KeyExtent(stop.tableId(), stop.endRow(), TabletColumnFamily.decodePrevEndRow(firstPrevRowValue)); - Mutation updatePrevRow = TabletColumnFamily.createPrevRowMutation(stop); + TabletColumnFamily.PREV_ROW_COLUMN.put(m, + TabletColumnFamily.encodePrevEndRow(stop.prevEndRow())); Manager.log.debug("Setting the prevRow for last tablet: {}", stop); - bw.addMutation(updatePrevRow); - bw.flush(); - - deleteTablets(info, scanRange, bw, client); - - // Clean-up the last chopped marker - var m2 = new Mutation(stopRow); - ChoppedColumnFamily.CHOPPED_COLUMN.putDelete(m2); - bw.addMutation(m2); + bw.addMutation(m); bw.flush(); + Manager.log.debug("Moved {} files to {}", fileCount, stop); } catch (Exception ex) { throw new AccumuloException(ex); } } + // Need to ensure the tablets being merged form a proper linked list by verifying the previous + // extent end row matches the prev end row value in the next tablet in the range + private static void validateLinkedList(KeyExtent previousExtent, Value prevEndRow, + KeyExtent currentTablet) { + Preconditions.checkState(previousExtent != null && prevEndRow != null, + "previousExtent or prevEndRow was unexpectedly not set when scanning metadata table %s %s %s", + previousExtent, prevEndRow, currentTablet); + + boolean pointsToPrevious = + Objects.equals(previousExtent.endRow(), TabletColumnFamily.decodePrevEndRow(prevEndRow)); + Preconditions.checkState(pointsToPrevious, + "unexpectedly saw a hole in the metadata table %s %s %s", previousExtent, prevEndRow, + currentTablet); + } + + private void deleteMergedTablets(MergeInfo info) throws AccumuloException { + KeyExtent range = info.getExtent(); + Manager.log.debug("Deleting merged tablets for {}", range); + HighTablet highTablet = getHighTablet(range); + if (!highTablet.isMerged()) { + Manager.log.debug("Tablets have already been deleted for merge with range {}, returning", + range); + return; + } + + KeyExtent stop = highTablet.getExtent(); + Manager.log.debug("Highest tablet is {}", stop); + + Text stopRow = stop.toMetaRow(); + Text start = range.prevEndRow(); + if (start == null) { + start = new Text(); + } + Range scanRange = + new Range(TabletsSection.encodeRow(range.tableId(), start), false, stopRow, false); + String targetSystemTable = AccumuloTable.METADATA.tableName(); + if (range.isMeta()) { + targetSystemTable = AccumuloTable.ROOT.tableName(); + } + + AccumuloClient client = manager.getContext(); + + try (BatchWriter bw = client.createBatchWriter(targetSystemTable)) { + // Continue and delete the tablets that were merged + deleteTablets(info, scanRange, bw, client); + + // Clear the merged marker after we finish deleting tablets + clearMerged(info, bw, highTablet); + } catch (Exception ex) { + throw new AccumuloException(ex); + } + } + + // This method is used to detect if a tablet needs to be split/chopped for a delete + // Instead of performing a split or chop compaction, the tablet will have its files fenced. + private boolean needsFencingForDeletion(MergeInfo info, KeyExtent keyExtent) { + // Does this extent cover the end points of the delete? + final Predicate<Text> isWithin = r -> r != null && keyExtent.contains(r); + final Predicate<Text> isNotBoundary = + r -> !r.equals(keyExtent.endRow()) && !r.equals(keyExtent.prevEndRow()); + final KeyExtent deleteRange = info.getExtent(); + + return (keyExtent.overlaps(deleteRange) && Stream + .of(deleteRange.prevEndRow(), deleteRange.endRow()).anyMatch(isWithin.and(isNotBoundary))) + || info.needsToBeChopped(keyExtent); + } + + // Instead of splitting or chopping tablets for a delete we instead create ranges + // to exclude the portion of the tablet that should be deleted + private Text followingRow(Text row) { + if (row == null) { + return null; + } + return new Key(row).followingKey(PartialKey.ROW).getRow(); + } + + // Instead of splitting or chopping tablets for a delete we instead create ranges + // to exclude the portion of the tablet that should be deleted + private List<Range> createRangesForDeletion(TabletMetadata tabletMetadata, + final KeyExtent deleteRange) { + final KeyExtent tabletExtent = tabletMetadata.getExtent(); + + // If the delete range wholly contains the tablet being deleted then there is no range to clip + // files to because the files should be completely dropped. + Preconditions.checkArgument(!deleteRange.contains(tabletExtent), "delete range:%s tablet:%s", + deleteRange, tabletExtent); + + final List<Range> ranges = new ArrayList<>(); + + if (deleteRange.overlaps(tabletExtent)) { + if (deleteRange.prevEndRow() != null + && tabletExtent.contains(followingRow(deleteRange.prevEndRow()))) { + Manager.log.trace("Fencing tablet {} files to ({},{}]", tabletExtent, + tabletExtent.prevEndRow(), deleteRange.prevEndRow()); + ranges.add(new Range(tabletExtent.prevEndRow(), false, deleteRange.prevEndRow(), true)); + } + + // This covers the case of when a deletion range overlaps the last tablet. We need to create a + // range that excludes the deletion. + if (deleteRange.endRow() != null + && tabletMetadata.getExtent().contains(deleteRange.endRow())) { + Manager.log.trace("Fencing tablet {} files to ({},{}]", tabletExtent, deleteRange.endRow(), + tabletExtent.endRow()); + ranges.add(new Range(deleteRange.endRow(), false, tabletExtent.endRow(), true)); + } + } else { + Manager.log.trace( + "Fencing tablet {} files to itself because it does not overlap delete range", + tabletExtent); + ranges.add(tabletExtent.toDataRange()); + } + + return ranges; + } + + private Pair<KeyExtent,KeyExtent> updateMetadataRecordsForDelete(MergeInfo info) + throws AccumuloException { + final KeyExtent range = info.getExtent(); + + String targetSystemTable = AccumuloTable.METADATA.tableName(); + if (range.isMeta()) { + targetSystemTable = AccumuloTable.ROOT.tableName(); + } + final Pair<KeyExtent,KeyExtent> startAndEndTablets; + + final AccumuloClient client = manager.getContext(); + + try (BatchWriter bw = client.createBatchWriter(targetSystemTable)) { + final Text startRow = range.prevEndRow(); + final Text endRow = range.endRow() != null + ? new Key(range.endRow()).followingKey(PartialKey.ROW).getRow() : null; + + // Find the tablets that overlap the start and end row of the deletion range + // If the startRow is null then there will be an empty startTablet we don't need + // to fence a starting tablet as we are deleting everything up to the end tablet + // Likewise, if the endRow is null there will be an empty endTablet as we are deleting + // all tablets after the starting tablet + final Optional<TabletMetadata> startTablet = Optional.ofNullable(startRow).flatMap( + row -> loadTabletMetadata(range.tableId(), row, ColumnType.PREV_ROW, ColumnType.FILES)); + final Optional<TabletMetadata> endTablet = Optional.ofNullable(endRow).flatMap( + row -> loadTabletMetadata(range.tableId(), row, ColumnType.PREV_ROW, ColumnType.FILES)); + + // Store the tablets in a Map if present so that if we have the same Tablet we + // only need to process the same tablet once when fencing + final SortedMap<KeyExtent,TabletMetadata> tabletMetadatas = new TreeMap<>(); + startTablet.ifPresent(ft -> tabletMetadatas.put(ft.getExtent(), ft)); + endTablet.ifPresent(lt -> tabletMetadatas.putIfAbsent(lt.getExtent(), lt)); + + // Capture the tablets to return them or null if not loaded + startAndEndTablets = new Pair<>(startTablet.map(TabletMetadata::getExtent).orElse(null), + endTablet.map(TabletMetadata::getExtent).orElse(null)); + + for (TabletMetadata tabletMetadata : tabletMetadatas.values()) { + final KeyExtent keyExtent = tabletMetadata.getExtent(); + + // Check if this tablet needs to have its files fenced for the deletion + if (needsFencingForDeletion(info, keyExtent)) { + Manager.log.debug("Found overlapping keyExtent {} for delete, fencing files.", keyExtent); + + // Create the ranges for fencing the files, this takes the place of + // chop compactions and splits + final List<Range> ranges = createRangesForDeletion(tabletMetadata, range); + Preconditions.checkState(!ranges.isEmpty(), + "No ranges found that overlap deletion range."); + + // Go through and fence each of the files that are part of the tablet + for (Entry<StoredTabletFile,DataFileValue> entry : tabletMetadata.getFilesMap() + .entrySet()) { + final StoredTabletFile existing = entry.getKey(); + final DataFileValue value = entry.getValue(); + + final Mutation m = new Mutation(keyExtent.toMetaRow()); + + // Go through each range that was created and modify the metadata for the file + // The end row should be inclusive for the current tablet and the previous end row + // should be exclusive for the start row. + final Set<StoredTabletFile> newFiles = new HashSet<>(); + final Set<StoredTabletFile> existingFile = Set.of(existing); + + for (Range fenced : ranges) { + // Clip range with the tablet range if the range already exists + fenced = existing.hasRange() ? existing.getRange().clip(fenced, true) : fenced; + + // If null the range is disjoint which can happen if there are existing fenced files + // If the existing file is disjoint then later we will delete if the file is not part + // of the newFiles set which means it is disjoint with all ranges + if (fenced != null) { + final StoredTabletFile newFile = StoredTabletFile.of(existing.getPath(), fenced); + Manager.log.trace("Adding new file {} with range {}", newFile.getMetadataPath(), + newFile.getRange()); + + // Add the new file to the newFiles set, it will be added later if it doesn't match + // the existing file already. We still need to add to the set to be checked later + // even if it matches the existing file as later the deletion logic will check to + // see if the existing file is part of this set before deleting. This is done to + // make sure the existing file isn't deleted unless it is not needed/disjoint + // with all ranges. + newFiles.add(newFile); + } else { + Manager.log.trace("Found a disjoint file {} with range {} on delete", + existing.getMetadataPath(), existing.getRange()); + } + } + + // If the existingFile is not contained in the newFiles set then we can delete it + Sets.difference(existingFile, newFiles).forEach( + delete -> m.putDelete(DataFileColumnFamily.NAME, existing.getMetadataText())); + + // Add any new files that don't match the existingFile + // As of now we will only have at most 2 files as up to 2 ranges are created + final List<StoredTabletFile> filesToAdd = + new ArrayList<>(Sets.difference(newFiles, existingFile)); + Preconditions.checkArgument(filesToAdd.size() <= 2, + "There should only be at most 2 StoredTabletFiles after computing new ranges."); + + // If more than 1 new file then re-calculate the num entries and size + if (filesToAdd.size() == 2) { + // This splits up the values in half and makes sure they total the original + // values + final Pair<DataFileValue,DataFileValue> newDfvs = computeNewDfv(value); + m.put(DataFileColumnFamily.NAME, filesToAdd.get(0).getMetadataText(), + newDfvs.getFirst().encodeAsValue()); + m.put(DataFileColumnFamily.NAME, filesToAdd.get(1).getMetadataText(), + newDfvs.getSecond().encodeAsValue()); + } else { + // Will be 0 or 1 files + filesToAdd.forEach(newFile -> m.put(DataFileColumnFamily.NAME, + newFile.getMetadataText(), value.encodeAsValue())); + } + + if (!m.getUpdates().isEmpty()) { + bw.addMutation(m); + } + } + } else { + Manager.log.debug( + "Skipping metadata update on file for keyExtent {} for delete as not overlapping on rows.", + keyExtent); + } + } + + bw.flush(); + + return startAndEndTablets; + } catch (Exception ex) { + throw new AccumuloException(ex); + } + } + + // Divide each new DFV in half and make sure the sum equals the original + @VisibleForTesting + protected static Pair<DataFileValue,DataFileValue> computeNewDfv(DataFileValue value) { + final DataFileValue file1Value = new DataFileValue(Math.max(1, value.getSize() / 2), + Math.max(1, value.getNumEntries() / 2), value.getTime()); + + final DataFileValue file2Value = + new DataFileValue(Math.max(1, value.getSize() - file1Value.getSize()), + Math.max(1, value.getNumEntries() - file1Value.getNumEntries()), value.getTime()); + + return new Pair<>(file1Value, file2Value); + } + + private Optional<TabletMetadata> loadTabletMetadata(TableId tabletId, final Text row, + ColumnType... columns) { + try (TabletsMetadata tabletsMetadata = manager.getContext().getAmple().readTablets() + .forTable(tabletId).overlapping(row, true, row).fetch(columns).build()) { + return tabletsMetadata.stream().findFirst(); + } + } + private KeyExtent deleteTablets(MergeInfo info, Range scanRange, BatchWriter bw, AccumuloClient client) throws TableNotFoundException, AccumuloException { Scanner scanner;