This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit a68eba3bb91a08e29836ce59d3a4cd5895e64787
Merge: 87d5b7fc7f 19a9ad9d65
Author: Keith Turner <[email protected]>
AuthorDate: Tue Dec 2 18:53:50 2025 +0000

    Merge branch '2.1'

 .../apache/accumulo/core/logging/BulkLogger.java   | 67 ++++++++++++++++++++++
 .../manager/tableOps/bulkVer2/BulkImportMove.java  |  4 +-
 .../tableOps/bulkVer2/CleanUpBulkImport.java       |  2 +
 .../manager/tableOps/bulkVer2/LoadFiles.java       |  8 +--
 .../manager/tableOps/bulkVer2/PrepBulkImport.java  |  5 ++
 5 files changed, 80 insertions(+), 6 deletions(-)

diff --cc core/src/main/java/org/apache/accumulo/core/logging/BulkLogger.java
index 0000000000,90d5552984..306dd154ba
mode 000000,100644..100644
--- a/core/src/main/java/org/apache/accumulo/core/logging/BulkLogger.java
+++ b/core/src/main/java/org/apache/accumulo/core/logging/BulkLogger.java
@@@ -1,0 -1,71 +1,67 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   https://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.accumulo.core.logging;
+ 
+ import java.util.Map;
+ 
 -import org.apache.accumulo.core.data.Key;
+ import org.apache.accumulo.core.data.TableId;
 -import org.apache.accumulo.core.data.Value;
+ import org.apache.accumulo.core.dataImpl.KeyExtent;
 -import org.apache.accumulo.core.fate.FateTxId;
++import org.apache.accumulo.core.fate.FateId;
++import org.apache.accumulo.core.metadata.StoredTabletFile;
+ import org.apache.accumulo.core.metadata.TabletFile;
+ import org.apache.hadoop.fs.Path;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ /**
+  * This logger tracks significant changes that bulk import v2 operations make 
to DFS and tablets.
+  */
+ public class BulkLogger {
+   private static final Logger log = LoggerFactory.getLogger(Logging.PREFIX + 
"bulk");
+ 
 -  public static void initiating(long fateId, TableId tableId, boolean 
setTime, String sourceDir,
++  public static void initiating(FateId fateId, TableId tableId, boolean 
setTime, String sourceDir,
+       String destinationDir) {
+     // Log the key pieces of information about a bulk import in a single log 
message to tie them all
+     // together.
 -    log.info("{} initiating bulk import, tableId:{} setTime:{} source:{} 
destination:{}",
 -        FateTxId.formatTid(fateId), tableId, setTime, sourceDir, 
destinationDir);
++    log.info("{} initiating bulk import, tableId:{} setTime:{} source:{} 
destination:{}", fateId,
++        tableId, setTime, sourceDir, destinationDir);
+   }
+ 
 -  public static void renamed(String fateId, Path source, Path destination) {
++  public static void renamed(FateId fateId, Path source, Path destination) {
+     // The initiating message logged the full directory paths, so do not need 
to repeat that
+     // information here. Log the bulk destination directory as it is unique 
and easy to search for.
+     log.debug("{} renamed {} to {}/{}", fateId, source.getName(), 
destination.getParent().getName(),
+         destination.getName());
+   }
+ 
+   /**
+    * This is called when a bulk load operation is cleaning up load entries in 
the metadata table.
+    * Turning this on allows seeing what files were loaded into which tablets 
by a bulk load
+    * operation. The information logged is redundant with
+    * {@link TabletLogger#bulkImported(KeyExtent, TabletFile)} except that it 
will happen on the
+    * manager instead of the tserver.
+    */
 -  public static void deletingLoadEntry(Map.Entry<Key,Value> entry) {
++  public static void deletingLoadEntry(KeyExtent extent, 
Map.Entry<StoredTabletFile,FateId> entry) {
+     if (log.isTraceEnabled()) {
 -      var key = entry.getKey();
 -      // the column qualifier contains the file loaded
 -      var path = new Path(key.getColumnQualifierData().toString());
 -      // the value is the fate id
++      var path = entry.getKey().getPath();
+       log.trace("{} loaded {}/{} into {}", entry.getValue(), 
path.getParent().getName(),
 -          path.getName(), key.getRowData());
++          path.getName(), extent);
+     }
+   }
+ }
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java
index cc8a5a6238,6eb247b33c..7a0022acf1
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java
@@@ -26,12 -29,17 +26,13 @@@ import org.apache.accumulo.core.clientI
  import org.apache.accumulo.core.clientImpl.bulk.BulkSerialize;
  import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
  import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
 -import org.apache.accumulo.core.conf.AccumuloConfiguration;
 -import org.apache.accumulo.core.conf.Property;
 -import org.apache.accumulo.core.fate.FateTxId;
 +import org.apache.accumulo.core.fate.FateId;
  import org.apache.accumulo.core.fate.Repo;
+ import org.apache.accumulo.core.logging.BulkLogger;
 -import org.apache.accumulo.core.manager.state.tables.TableState;
 -import org.apache.accumulo.core.master.thrift.BulkImportState;
 -import org.apache.accumulo.manager.Manager;
 -import org.apache.accumulo.manager.tableOps.ManagerRepo;
 +import org.apache.accumulo.core.manager.thrift.BulkImportState;
 +import org.apache.accumulo.manager.tableOps.AbstractFateOperation;
 +import org.apache.accumulo.manager.tableOps.FateEnv;
  import org.apache.accumulo.server.fs.VolumeManager;
 -import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
  import org.apache.hadoop.fs.Path;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -68,15 -76,17 +69,13 @@@ class BulkImportMove extends AbstractFa
      final Path bulkDir = new Path(bulkInfo.bulkDir);
      final Path sourceDir = new Path(bulkInfo.sourceDir);
  
-     log.debug("{} sourceDir {}", fateId, sourceDir);
 -    VolumeManager fs = manager.getVolumeManager();
--
 -    if (bulkInfo.tableState == TableState.ONLINE) {
 -      ZooArbitrator.start(manager.getContext(), 
Constants.BULK_ARBITRATOR_TYPE, tid);
 -    }
 +    VolumeManager fs = env.getVolumeManager();
  
      try {
 -      manager.updateBulkImportStatus(sourceDir.toString(), 
BulkImportState.MOVING);
 +      env.updateBulkImportStatus(sourceDir.toString(), 
BulkImportState.MOVING);
        Map<String,String> oldToNewNameMap =
            BulkSerialize.readRenameMap(bulkDir.toString(), fs::open);
 -      moveFiles(tid, sourceDir, bulkDir, manager, fs, oldToNewNameMap);
 +      moveFiles(fateId, sourceDir, bulkDir, env, fs, oldToNewNameMap);
  
        return new LoadFiles(bulkInfo);
      } catch (Exception ex) {
@@@ -101,7 -116,8 +100,8 @@@
        oldToNewMap.put(originalPath, newPath);
      }
      try {
 -      fs.bulkRename(oldToNewMap, workerCount, 
BULK_IMPORT_DIR_MOVE_POOL.poolName, fmtTid);
 -      oldToNewMap.forEach((oldPath, newPath) -> BulkLogger.renamed(fmtTid, 
oldPath, newPath));
 +      fs.bulkRename(oldToNewMap, env.getRenamePool(), fateId);
++      oldToNewMap.forEach((oldPath, newPath) -> BulkLogger.renamed(fateId, 
oldPath, newPath));
      } catch (IOException ioe) {
        throw new 
AcceptableThriftTableOperationException(bulkInfo.tableId.canonical(), null,
            TableOperation.BULK_IMPORT, TableOperationExceptionType.OTHER,
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java
index 47110524f9,b26b720f7c..f22127d962
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java
@@@ -19,27 -19,19 +19,28 @@@
  package org.apache.accumulo.manager.tableOps.bulkVer2;
  
  import java.io.IOException;
 +import java.time.Duration;
  import java.util.Collections;
 +import java.util.Map;
 +import java.util.Optional;
  
  import org.apache.accumulo.core.Constants;
 -import org.apache.accumulo.core.fate.FateTxId;
 +import org.apache.accumulo.core.data.AbstractId;
 +import org.apache.accumulo.core.data.TableId;
 +import org.apache.accumulo.core.fate.FateId;
  import org.apache.accumulo.core.fate.Repo;
 +import org.apache.accumulo.core.fate.zookeeper.LockRange;
  import org.apache.accumulo.core.gc.ReferenceFile;
 -import org.apache.accumulo.core.manager.state.tables.TableState;
 -import org.apache.accumulo.core.master.thrift.BulkImportState;
++import org.apache.accumulo.core.logging.BulkLogger;
 +import org.apache.accumulo.core.manager.thrift.BulkImportState;
  import org.apache.accumulo.core.metadata.schema.Ample;
 -import org.apache.accumulo.manager.Manager;
 -import org.apache.accumulo.manager.tableOps.ManagerRepo;
 +import 
org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
 +import org.apache.accumulo.core.util.Retry;
 +import org.apache.accumulo.manager.tableOps.AbstractFateOperation;
 +import org.apache.accumulo.manager.tableOps.FateEnv;
  import org.apache.accumulo.manager.tableOps.Utils;
 -import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.io.Text;
  import org.slf4j.Logger;
@@@ -81,66 -76,17 +82,67 @@@ public class CleanUpBulkImport extends 
      Path renamingFile = new Path(bulkDir, Constants.BULK_RENAME_FILE);
      Path mappingFile = new Path(bulkDir, Constants.BULK_LOAD_MAPPING);
      try {
 -      manager.getVolumeManager().delete(renamingFile);
 -      manager.getVolumeManager().delete(mappingFile);
 +      env.getVolumeManager().delete(renamingFile);
 +      env.getVolumeManager().delete(mappingFile);
      } catch (IOException ioe) {
 -      log.debug("{} Failed to delete renames and/or loadmap", 
FateTxId.formatTid(tid), ioe);
 +      log.debug("{} Failed to delete renames and/or loadmap", fateId, ioe);
      }
  
 -    log.debug("completing bulkDir import transaction " + 
FateTxId.formatTid(tid));
 -    if (info.tableState == TableState.ONLINE) {
 -      ZooArbitrator.cleanup(manager.getContext(), 
Constants.BULK_ARBITRATOR_TYPE, tid);
 -    }
 -    manager.removeBulkImportStatus(info.sourceDir);
 +    log.debug("completing bulkDir import transaction " + fateId);
 +    env.removeBulkImportStatus(info.sourceDir);
      return null;
    }
 +
 +  private static void removeBulkLoadEntries(Ample ample, TableId tableId, 
FateId fateId,
 +      Text firstSplit, Text lastSplit) {
 +
 +    Retry retry = 
Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100))
 +        
.incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(1)).backOffFactor(1.5)
 +        .logInterval(Duration.ofMinutes(3)).createRetry();
 +
 +    while (true) {
 +      try (
 +          var tablets = 
ample.readTablets().forTable(tableId).overlapping(firstSplit, lastSplit)
 +              .checkConsistency().fetch(ColumnType.PREV_ROW, 
ColumnType.LOADED).build();
 +          var tabletsMutator = ample.conditionallyMutateTablets()) {
 +
 +        for (var tablet : tablets) {
 +          if (tablet.getLoaded().values().stream()
 +              .anyMatch(loadedFateId -> loadedFateId.equals(fateId))) {
 +            var tabletMutator =
 +                
tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation();
 +            tablet.getLoaded().entrySet().stream().filter(entry -> 
entry.getValue().equals(fateId))
++                .peek(entry -> 
BulkLogger.deletingLoadEntry(tablet.getExtent(), entry))
 +                
.map(Map.Entry::getKey).forEach(tabletMutator::deleteBulkFile);
 +            tabletMutator.submit(tm -> false, () -> "remove bulk load entries 
" + fateId);
 +          }
 +        }
 +
 +        var results = tabletsMutator.process();
 +
 +        if (results.values().stream()
 +            .anyMatch(condResult -> condResult.getStatus() != 
Status.ACCEPTED)) {
 +
 +          results.forEach((extent, condResult) -> {
 +            if (condResult.getStatus() != Status.ACCEPTED) {
 +              var metadata = Optional.ofNullable(condResult.readMetadata());
 +              log.debug("Tablet update failed {} {} {} {} ", fateId, extent, 
condResult.getStatus(),
 +                  
metadata.map(TabletMetadata::getOperationId).map(AbstractId::toString)
 +                      .orElse("tablet is gone"));
 +            }
 +          });
 +
 +          try {
 +            retry.waitForNextAttempt(log,
 +                String.format("%s tableId:%s conditional mutations to delete 
load markers failed.",
 +                    fateId, tableId));
 +          } catch (InterruptedException e) {
 +            throw new RuntimeException(e);
 +          }
 +        } else {
 +          break;
 +        }
 +      }
 +    }
 +  }
  }
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
index 88238036e3,e26db3c4e6..dae7fe4b2f
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
@@@ -103,33 -107,27 +103,33 @@@ class LoadFiles extends AbstractFateOpe
    }
  
    @Override
 -  public long isReady(long tid, Manager manager) throws Exception {
 -    log.trace("Starting for {} (tid = {})", bulkInfo.sourceDir, 
FateTxId.formatTid(tid));
 -    if (manager.onlineTabletServers().isEmpty()) {
 -      log.warn("There are no tablet server to process bulkDir import, waiting 
(tid = "
 -          + FateTxId.formatTid(tid) + ")");
 +  public long isReady(FateId fateId, FateEnv env) throws Exception {
-     log.info("Starting for {} (tid = {})", bulkInfo.sourceDir, fateId);
++    log.trace("Starting for {} (tid = {})", bulkInfo.sourceDir, fateId);
 +    if (env.onlineTabletServers().isEmpty()) {
 +      log.warn("There are no tablet server to process bulkDir import, waiting 
(fateId = " + fateId
 +          + ")");
        return 100;
      }
 -    VolumeManager fs = manager.getVolumeManager();
 +    VolumeManager fs = env.getVolumeManager();
      final Path bulkDir = new Path(bulkInfo.bulkDir);
 -    manager.updateBulkImportStatus(bulkInfo.sourceDir, 
BulkImportState.LOADING);
 -    try (
 -        LoadMappingIterator lmi =
 -            BulkSerialize.getUpdatedLoadMapping(bulkDir.toString(), 
bulkInfo.tableId, fs::open);
 -        Loader loader = (bulkInfo.tableState == TableState.ONLINE
 -            ? new OnlineLoader(manager.getConfiguration()) : new 
OfflineLoader())) {
 -      TabletsMetadataFactory tmf = (startRow) -> 
TabletsMetadata.builder(manager.getContext())
 +    env.updateBulkImportStatus(bulkInfo.sourceDir, BulkImportState.LOADING);
 +    try (LoadMappingIterator lmi =
 +        BulkSerialize.getUpdatedLoadMapping(bulkDir.toString(), 
bulkInfo.tableId, fs::open)) {
 +
 +      Loader loader = new Loader(env, bulkInfo.tableId);
 +
 +      List<ColumnType> fetchCols = new ArrayList<>(List.of(PREV_ROW, 
LOCATION, LOADED, TIME));
 +      if (loader.pauseLimit > 0) {
 +        fetchCols.add(FILES);
 +      }
 +
 +      TabletsMetadataFactory tmf = (startRow) -> 
TabletsMetadata.builder(env.getContext())
            .forTable(bulkInfo.tableId).overlapping(startRow, 
null).checkConsistency()
 -          .fetch(PREV_ROW, LOCATION, LOADED).build();
 -      int skip = manager.getContext().getTableConfiguration(bulkInfo.tableId)
 +          .fetch(fetchCols.toArray(new ColumnType[0])).build();
 +
 +      int skip = env.getContext().getTableConfiguration(bulkInfo.tableId)
            .getCount(Property.TABLE_BULK_SKIP_THRESHOLD);
 -      return loadFiles(loader, bulkInfo, bulkDir, lmi, tmf, manager, tid, 
skip);
 +      return loadFiles(loader, bulkInfo, bulkDir, lmi, tmf, fateId, skip);
      }
    }
  
@@@ -438,13 -540,13 +438,13 @@@
  
      log.trace("{}: Completed Finding Overlapping Tablets", fmtTid);
  
-     if (importTimingStats.callCount > 0) {
-       log.debug(
+     if (importTimingStats.callCount > 0 && log.isTraceEnabled()) {
+       log.trace(
            "Stats for {} (tid = {}): processed {} tablets in {} calls which 
took {}ms ({} nanos). Skipped {} iterations which took {}ms ({} nanos) or {}% 
of the processing time.",
 -          bulkInfo.sourceDir, FateTxId.formatTid(tid), 
importTimingStats.tabletCount,
 -          importTimingStats.callCount, totalProcessingTime.toMillis(),
 -          totalProcessingTime.toNanos(), importTimingStats.wastedIterations,
 -          importTimingStats.totalWastedTime.toMillis(), 
importTimingStats.totalWastedTime.toNanos(),
 +          bulkInfo.sourceDir, fateId, importTimingStats.tabletCount, 
importTimingStats.callCount,
 +          totalProcessingTime.toMillis(), totalProcessingTime.toNanos(),
 +          importTimingStats.wastedIterations, 
importTimingStats.totalWastedTime.toMillis(),
 +          importTimingStats.totalWastedTime.toNanos(),
            (importTimingStats.totalWastedTime.toNanos() * 100) / 
totalProcessingTime.toNanos());
      }
  
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java
index f303900b64,0c1c5d7473..78a2303318
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java
@@@ -42,11 -42,10 +42,12 @@@ import org.apache.accumulo.core.clientI
  import org.apache.accumulo.core.conf.Property;
  import org.apache.accumulo.core.data.TableId;
  import org.apache.accumulo.core.dataImpl.KeyExtent;
 -import org.apache.accumulo.core.fate.FateTxId;
 +import org.apache.accumulo.core.fate.FateId;
  import org.apache.accumulo.core.fate.Repo;
 +import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock;
 +import org.apache.accumulo.core.fate.zookeeper.LockRange;
  import org.apache.accumulo.core.file.FilePrefix;
+ import org.apache.accumulo.core.logging.BulkLogger;
  import org.apache.accumulo.core.metadata.schema.TabletMetadata;
  import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
  import org.apache.accumulo.core.util.PeekingIterator;
@@@ -287,6 -291,10 +288,10 @@@ public class PrepBulkImport extends Abs
      BulkSerialize.writeRenameMap(oldToNewNameMap, bulkDir.toString(), 
fs::create);
  
      bulkInfo.bulkDir = bulkDir.toString();
+ 
 -    BulkLogger.initiating(tid, bulkInfo.tableId, bulkInfo.setTime, 
bulkInfo.sourceDir,
++    BulkLogger.initiating(fateId, bulkInfo.tableId, bulkInfo.setTime, 
bulkInfo.sourceDir,
+         bulkInfo.bulkDir);
+ 
      // return the next step, which will move files
      return new BulkImportMove(bulkInfo);
    }

Reply via email to