This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit a68eba3bb91a08e29836ce59d3a4cd5895e64787 Merge: 87d5b7fc7f 19a9ad9d65 Author: Keith Turner <[email protected]> AuthorDate: Tue Dec 2 18:53:50 2025 +0000 Merge branch '2.1' .../apache/accumulo/core/logging/BulkLogger.java | 67 ++++++++++++++++++++++ .../manager/tableOps/bulkVer2/BulkImportMove.java | 4 +- .../tableOps/bulkVer2/CleanUpBulkImport.java | 2 + .../manager/tableOps/bulkVer2/LoadFiles.java | 8 +-- .../manager/tableOps/bulkVer2/PrepBulkImport.java | 5 ++ 5 files changed, 80 insertions(+), 6 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/logging/BulkLogger.java index 0000000000,90d5552984..306dd154ba mode 000000,100644..100644 --- a/core/src/main/java/org/apache/accumulo/core/logging/BulkLogger.java +++ b/core/src/main/java/org/apache/accumulo/core/logging/BulkLogger.java @@@ -1,0 -1,71 +1,67 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.accumulo.core.logging; + + import java.util.Map; + -import org.apache.accumulo.core.data.Key; + import org.apache.accumulo.core.data.TableId; -import org.apache.accumulo.core.data.Value; + import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.fate.FateTxId; ++import org.apache.accumulo.core.fate.FateId; ++import org.apache.accumulo.core.metadata.StoredTabletFile; + import org.apache.accumulo.core.metadata.TabletFile; + import org.apache.hadoop.fs.Path; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + /** + * This logger tracks significant changes that bulk import v2 operations make to DFS and tablets. + */ + public class BulkLogger { + private static final Logger log = LoggerFactory.getLogger(Logging.PREFIX + "bulk"); + - public static void initiating(long fateId, TableId tableId, boolean setTime, String sourceDir, ++ public static void initiating(FateId fateId, TableId tableId, boolean setTime, String sourceDir, + String destinationDir) { + // Log the key pieces of information about a bulk import in a single log message to tie them all + // together. - log.info("{} initiating bulk import, tableId:{} setTime:{} source:{} destination:{}", - FateTxId.formatTid(fateId), tableId, setTime, sourceDir, destinationDir); ++ log.info("{} initiating bulk import, tableId:{} setTime:{} source:{} destination:{}", fateId, ++ tableId, setTime, sourceDir, destinationDir); + } + - public static void renamed(String fateId, Path source, Path destination) { ++ public static void renamed(FateId fateId, Path source, Path destination) { + // The initiating message logged the full directory paths, so do not need to repeat that + // information here. Log the bulk destination directory as it is unique and easy to search for. + log.debug("{} renamed {} to {}/{}", fateId, source.getName(), destination.getParent().getName(), + destination.getName()); + } + + /** + * This is called when a bulk load operation is cleaning up load entries in the metadata table. + * Turning this on allows seeing what files were loaded into which tablets by a bulk load + * operation. The information logged is redundant with + * {@link TabletLogger#bulkImported(KeyExtent, TabletFile)} except that it will happen on the + * manager instead of the tserver. + */ - public static void deletingLoadEntry(Map.Entry<Key,Value> entry) { ++ public static void deletingLoadEntry(KeyExtent extent, Map.Entry<StoredTabletFile,FateId> entry) { + if (log.isTraceEnabled()) { - var key = entry.getKey(); - // the column qualifier contains the file loaded - var path = new Path(key.getColumnQualifierData().toString()); - // the value is the fate id ++ var path = entry.getKey().getPath(); + log.trace("{} loaded {}/{} into {}", entry.getValue(), path.getParent().getName(), - path.getName(), key.getRowData()); ++ path.getName(), extent); + } + } + } diff --cc server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java index cc8a5a6238,6eb247b33c..7a0022acf1 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java @@@ -26,12 -29,17 +26,13 @@@ import org.apache.accumulo.core.clientI import org.apache.accumulo.core.clientImpl.bulk.BulkSerialize; import org.apache.accumulo.core.clientImpl.thrift.TableOperation; import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType; -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.fate.FateTxId; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.Repo; + import org.apache.accumulo.core.logging.BulkLogger; -import org.apache.accumulo.core.manager.state.tables.TableState; -import org.apache.accumulo.core.master.thrift.BulkImportState; -import org.apache.accumulo.manager.Manager; -import org.apache.accumulo.manager.tableOps.ManagerRepo; +import org.apache.accumulo.core.manager.thrift.BulkImportState; +import org.apache.accumulo.manager.tableOps.AbstractFateOperation; +import org.apache.accumulo.manager.tableOps.FateEnv; import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -68,15 -76,17 +69,13 @@@ class BulkImportMove extends AbstractFa final Path bulkDir = new Path(bulkInfo.bulkDir); final Path sourceDir = new Path(bulkInfo.sourceDir); - log.debug("{} sourceDir {}", fateId, sourceDir); - VolumeManager fs = manager.getVolumeManager(); -- - if (bulkInfo.tableState == TableState.ONLINE) { - ZooArbitrator.start(manager.getContext(), Constants.BULK_ARBITRATOR_TYPE, tid); - } + VolumeManager fs = env.getVolumeManager(); try { - manager.updateBulkImportStatus(sourceDir.toString(), BulkImportState.MOVING); + env.updateBulkImportStatus(sourceDir.toString(), BulkImportState.MOVING); Map<String,String> oldToNewNameMap = BulkSerialize.readRenameMap(bulkDir.toString(), fs::open); - moveFiles(tid, sourceDir, bulkDir, manager, fs, oldToNewNameMap); + moveFiles(fateId, sourceDir, bulkDir, env, fs, oldToNewNameMap); return new LoadFiles(bulkInfo); } catch (Exception ex) { @@@ -101,7 -116,8 +100,8 @@@ oldToNewMap.put(originalPath, newPath); } try { - fs.bulkRename(oldToNewMap, workerCount, BULK_IMPORT_DIR_MOVE_POOL.poolName, fmtTid); - oldToNewMap.forEach((oldPath, newPath) -> BulkLogger.renamed(fmtTid, oldPath, newPath)); + fs.bulkRename(oldToNewMap, env.getRenamePool(), fateId); ++ oldToNewMap.forEach((oldPath, newPath) -> BulkLogger.renamed(fateId, oldPath, newPath)); } catch (IOException ioe) { throw new AcceptableThriftTableOperationException(bulkInfo.tableId.canonical(), null, TableOperation.BULK_IMPORT, TableOperationExceptionType.OTHER, diff --cc server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java index 47110524f9,b26b720f7c..f22127d962 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java @@@ -19,27 -19,19 +19,28 @@@ package org.apache.accumulo.manager.tableOps.bulkVer2; import java.io.IOException; +import java.time.Duration; import java.util.Collections; +import java.util.Map; +import java.util.Optional; import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.fate.FateTxId; +import org.apache.accumulo.core.data.AbstractId; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.fate.zookeeper.LockRange; import org.apache.accumulo.core.gc.ReferenceFile; -import org.apache.accumulo.core.manager.state.tables.TableState; -import org.apache.accumulo.core.master.thrift.BulkImportState; ++import org.apache.accumulo.core.logging.BulkLogger; +import org.apache.accumulo.core.manager.thrift.BulkImportState; import org.apache.accumulo.core.metadata.schema.Ample; -import org.apache.accumulo.manager.Manager; -import org.apache.accumulo.manager.tableOps.ManagerRepo; +import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; +import org.apache.accumulo.core.util.Retry; +import org.apache.accumulo.manager.tableOps.AbstractFateOperation; +import org.apache.accumulo.manager.tableOps.FateEnv; import org.apache.accumulo.manager.tableOps.Utils; -import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.slf4j.Logger; @@@ -81,66 -76,17 +82,67 @@@ public class CleanUpBulkImport extends Path renamingFile = new Path(bulkDir, Constants.BULK_RENAME_FILE); Path mappingFile = new Path(bulkDir, Constants.BULK_LOAD_MAPPING); try { - manager.getVolumeManager().delete(renamingFile); - manager.getVolumeManager().delete(mappingFile); + env.getVolumeManager().delete(renamingFile); + env.getVolumeManager().delete(mappingFile); } catch (IOException ioe) { - log.debug("{} Failed to delete renames and/or loadmap", FateTxId.formatTid(tid), ioe); + log.debug("{} Failed to delete renames and/or loadmap", fateId, ioe); } - log.debug("completing bulkDir import transaction " + FateTxId.formatTid(tid)); - if (info.tableState == TableState.ONLINE) { - ZooArbitrator.cleanup(manager.getContext(), Constants.BULK_ARBITRATOR_TYPE, tid); - } - manager.removeBulkImportStatus(info.sourceDir); + log.debug("completing bulkDir import transaction " + fateId); + env.removeBulkImportStatus(info.sourceDir); return null; } + + private static void removeBulkLoadEntries(Ample ample, TableId tableId, FateId fateId, + Text firstSplit, Text lastSplit) { + + Retry retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100)) + .incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(1)).backOffFactor(1.5) + .logInterval(Duration.ofMinutes(3)).createRetry(); + + while (true) { + try ( + var tablets = ample.readTablets().forTable(tableId).overlapping(firstSplit, lastSplit) + .checkConsistency().fetch(ColumnType.PREV_ROW, ColumnType.LOADED).build(); + var tabletsMutator = ample.conditionallyMutateTablets()) { + + for (var tablet : tablets) { + if (tablet.getLoaded().values().stream() + .anyMatch(loadedFateId -> loadedFateId.equals(fateId))) { + var tabletMutator = + tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation(); + tablet.getLoaded().entrySet().stream().filter(entry -> entry.getValue().equals(fateId)) ++ .peek(entry -> BulkLogger.deletingLoadEntry(tablet.getExtent(), entry)) + .map(Map.Entry::getKey).forEach(tabletMutator::deleteBulkFile); + tabletMutator.submit(tm -> false, () -> "remove bulk load entries " + fateId); + } + } + + var results = tabletsMutator.process(); + + if (results.values().stream() + .anyMatch(condResult -> condResult.getStatus() != Status.ACCEPTED)) { + + results.forEach((extent, condResult) -> { + if (condResult.getStatus() != Status.ACCEPTED) { + var metadata = Optional.ofNullable(condResult.readMetadata()); + log.debug("Tablet update failed {} {} {} {} ", fateId, extent, condResult.getStatus(), + metadata.map(TabletMetadata::getOperationId).map(AbstractId::toString) + .orElse("tablet is gone")); + } + }); + + try { + retry.waitForNextAttempt(log, + String.format("%s tableId:%s conditional mutations to delete load markers failed.", + fateId, tableId)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } else { + break; + } + } + } + } } diff --cc server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java index 88238036e3,e26db3c4e6..dae7fe4b2f --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java @@@ -103,33 -107,27 +103,33 @@@ class LoadFiles extends AbstractFateOpe } @Override - public long isReady(long tid, Manager manager) throws Exception { - log.trace("Starting for {} (tid = {})", bulkInfo.sourceDir, FateTxId.formatTid(tid)); - if (manager.onlineTabletServers().isEmpty()) { - log.warn("There are no tablet server to process bulkDir import, waiting (tid = " - + FateTxId.formatTid(tid) + ")"); + public long isReady(FateId fateId, FateEnv env) throws Exception { - log.info("Starting for {} (tid = {})", bulkInfo.sourceDir, fateId); ++ log.trace("Starting for {} (tid = {})", bulkInfo.sourceDir, fateId); + if (env.onlineTabletServers().isEmpty()) { + log.warn("There are no tablet server to process bulkDir import, waiting (fateId = " + fateId + + ")"); return 100; } - VolumeManager fs = manager.getVolumeManager(); + VolumeManager fs = env.getVolumeManager(); final Path bulkDir = new Path(bulkInfo.bulkDir); - manager.updateBulkImportStatus(bulkInfo.sourceDir, BulkImportState.LOADING); - try ( - LoadMappingIterator lmi = - BulkSerialize.getUpdatedLoadMapping(bulkDir.toString(), bulkInfo.tableId, fs::open); - Loader loader = (bulkInfo.tableState == TableState.ONLINE - ? new OnlineLoader(manager.getConfiguration()) : new OfflineLoader())) { - TabletsMetadataFactory tmf = (startRow) -> TabletsMetadata.builder(manager.getContext()) + env.updateBulkImportStatus(bulkInfo.sourceDir, BulkImportState.LOADING); + try (LoadMappingIterator lmi = + BulkSerialize.getUpdatedLoadMapping(bulkDir.toString(), bulkInfo.tableId, fs::open)) { + + Loader loader = new Loader(env, bulkInfo.tableId); + + List<ColumnType> fetchCols = new ArrayList<>(List.of(PREV_ROW, LOCATION, LOADED, TIME)); + if (loader.pauseLimit > 0) { + fetchCols.add(FILES); + } + + TabletsMetadataFactory tmf = (startRow) -> TabletsMetadata.builder(env.getContext()) .forTable(bulkInfo.tableId).overlapping(startRow, null).checkConsistency() - .fetch(PREV_ROW, LOCATION, LOADED).build(); - int skip = manager.getContext().getTableConfiguration(bulkInfo.tableId) + .fetch(fetchCols.toArray(new ColumnType[0])).build(); + + int skip = env.getContext().getTableConfiguration(bulkInfo.tableId) .getCount(Property.TABLE_BULK_SKIP_THRESHOLD); - return loadFiles(loader, bulkInfo, bulkDir, lmi, tmf, manager, tid, skip); + return loadFiles(loader, bulkInfo, bulkDir, lmi, tmf, fateId, skip); } } @@@ -438,13 -540,13 +438,13 @@@ log.trace("{}: Completed Finding Overlapping Tablets", fmtTid); - if (importTimingStats.callCount > 0) { - log.debug( + if (importTimingStats.callCount > 0 && log.isTraceEnabled()) { + log.trace( "Stats for {} (tid = {}): processed {} tablets in {} calls which took {}ms ({} nanos). Skipped {} iterations which took {}ms ({} nanos) or {}% of the processing time.", - bulkInfo.sourceDir, FateTxId.formatTid(tid), importTimingStats.tabletCount, - importTimingStats.callCount, totalProcessingTime.toMillis(), - totalProcessingTime.toNanos(), importTimingStats.wastedIterations, - importTimingStats.totalWastedTime.toMillis(), importTimingStats.totalWastedTime.toNanos(), + bulkInfo.sourceDir, fateId, importTimingStats.tabletCount, importTimingStats.callCount, + totalProcessingTime.toMillis(), totalProcessingTime.toNanos(), + importTimingStats.wastedIterations, importTimingStats.totalWastedTime.toMillis(), + importTimingStats.totalWastedTime.toNanos(), (importTimingStats.totalWastedTime.toNanos() * 100) / totalProcessingTime.toNanos()); } diff --cc server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java index f303900b64,0c1c5d7473..78a2303318 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java @@@ -42,11 -42,10 +42,12 @@@ import org.apache.accumulo.core.clientI import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.fate.FateTxId; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock; +import org.apache.accumulo.core.fate.zookeeper.LockRange; import org.apache.accumulo.core.file.FilePrefix; + import org.apache.accumulo.core.logging.BulkLogger; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.util.PeekingIterator; @@@ -287,6 -291,10 +288,10 @@@ public class PrepBulkImport extends Abs BulkSerialize.writeRenameMap(oldToNewNameMap, bulkDir.toString(), fs::create); bulkInfo.bulkDir = bulkDir.toString(); + - BulkLogger.initiating(tid, bulkInfo.tableId, bulkInfo.setTime, bulkInfo.sourceDir, ++ BulkLogger.initiating(fateId, bulkInfo.tableId, bulkInfo.setTime, bulkInfo.sourceDir, + bulkInfo.bulkDir); + // return the next step, which will move files return new BulkImportMove(bulkInfo); }
