This is an automated email from the ASF dual-hosted git repository.

ddanielr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit fa09c1a7f5a1e1cebe2d113c730af8478246ba09
Merge: 22c3e23695 b1e28302ae
Author: Daniel Roberts ddanielr <ddani...@gmail.com>
AuthorDate: Mon Jun 23 17:59:00 2025 +0000

    Merge branch '2.1'
    
    Add to add location info to ScanfileManager.removeOrphanedScanRefs to
    compile code.

 .../core/spi/balancer/HostRegexTableLoadBalancer.java  | 18 ++++++++++--------
 .../manager/balancer/BalancerEnvironmentImpl.java      |  2 +-
 .../java/org/apache/accumulo/tserver/TabletServer.java | 12 ++++++++++++
 .../accumulo/tserver/tablet/ScanfileManager.java       | 16 ++++++++++++++++
 .../org/apache/accumulo/tserver/tablet/Tablet.java     |  4 ++++
 5 files changed, 43 insertions(+), 9 deletions(-)

diff --cc 
core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java
index 5d53209057,4637d8c610..3afd93886f
--- 
a/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java
@@@ -370,12 -370,13 +370,13 @@@ public class HostRegexTableLoadBalance
            continue;
          }
        }
-       LOG.debug("Sending {} tablets to balancer for table {} for assignment 
within tservers {}",
-           e.getValue().size(), tableName, currentView.keySet());
+       LOG.debug("Sending {} tablets to balancer for table {} for assignment 
within {} tservers",
+           e.getValue().size(), tableName, currentView.keySet().size());
+       LOG.trace("table {} tserver assignment set {}", tableName, 
currentView.keySet());
        assignmentTimer.restart();
 -      getBalancerForTable(e.getKey())
 -          .getAssignments(new AssignmentParamsImpl(currentView, e.getValue(), 
newAssignments));
 +      getBalancerForTable(e.getKey()).getAssignments(new 
AssignmentParamsImpl(currentView,
 +          params.currentResourceGroups(), e.getValue(), newAssignments));
-       LOG.trace("assignment results table:{} assignments:{} time:{}ms", 
tableName,
+       LOG.debug("assignment results table:{} assignments:{} time:{}ms", 
tableName,
            newAssignments.size(), 
assignmentTimer.elapsed(TimeUnit.MILLISECONDS));
        newAssignments.forEach(params::addAssignment);
      }
@@@ -519,10 -518,11 +520,11 @@@
        }
        ArrayList<TabletMigration> newMigrations = new ArrayList<>();
        balanceTimer.restart();
-       getBalancerForTable(tableId)
-           .balance(new BalanceParamsImpl(currentView, 
params.currentResourceGroups(), migrations,
-               newMigrations, DataLevel.of(tableId), Map.of(tableName, 
tableId)));
-       LOG.trace("balance results tableId:{} migrations:{} time:{}ms", 
tableId, newMigrations.size(),
 -      TabletBalancer tableBalancer = getBalancerForTable(tableId);
 -      tableBalancer.balance(new BalanceParamsImpl(currentView, migrations, 
newMigrations,
 -          params.partitionName() + ":" + tableId, Map.of(tableName, 
tableId)));
++      TabletBalancer tabletBalancer = getBalancerForTable(tableId);
++      tabletBalancer.balance(new BalanceParamsImpl(currentView, 
params.currentResourceGroups(),
++          migrations, newMigrations, DataLevel.of(tableId), Map.of(tableName, 
tableId)));
+       LOG.debug("balance results class:{} tableId:{} migrations:{} time:{}ms",
 -          tableBalancer.getClass().getSimpleName(), tableId, 
newMigrations.size(),
++          tabletBalancer.getClass().getSimpleName(), tableId, 
newMigrations.size(),
            balanceTimer.elapsed(TimeUnit.MILLISECONDS));
  
        if (newMigrations.isEmpty()) {
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/manager/balancer/BalancerEnvironmentImpl.java
index 38ac1c2684,be6a904880..cdab458649
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/balancer/BalancerEnvironmentImpl.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/balancer/BalancerEnvironmentImpl.java
@@@ -89,10 -86,9 +89,10 @@@ public class BalancerEnvironmentImpl ex
    @Override
    public List<TabletStatistics> listOnlineTabletsForTable(TabletServerId 
tabletServerId,
        TableId tableId) throws AccumuloException, AccumuloSecurityException {
-     log.debug("Scanning tablet server {} for table {}", tabletServerId, 
tableId);
+     log.trace("Scanning tablet server {} for table {}", tabletServerId, 
tableId);
      try {
 -      TabletClientService.Client client = 
ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER,
 +      TabletServerClientService.Client client = ThriftUtil.getClient(
 +          ThriftClientTypes.TABLET_SERVER,
            HostAndPort.fromParts(tabletServerId.getHost(), 
tabletServerId.getPort()), getContext());
        try {
          return client
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 1507b4bf9f,d5d411db50..49cb759299
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@@ -293,8 -310,24 +293,20 @@@ public class TabletServer extends Abstr
          }), 5, 5, TimeUnit.SECONDS);
      watchNonCriticalScheduledTask(future);
  
+     ScheduledFuture<?> cleanupTask =
+         
context.getScheduledExecutor().scheduleWithFixedDelay(Threads.createNamedRunnable(
+             "ScanRefCleanupTask", () -> 
getOnlineTablets().values().forEach(tablet -> {
+               try {
+                 tablet.removeOrphanedScanRefs();
+               } catch (Exception e) {
+                 log.error("Error cleaning up stale scan references for tablet 
{}",
+                     tablet.getExtent(), e);
+               }
+             })), 5, 5, TimeUnit.MINUTES);
+     watchNonCriticalScheduledTask(cleanupTask);
+ 
 -    @SuppressWarnings("deprecation")
 -    final long walMaxSize =
 -        aconf.getAsBytes(aconf.resolve(Property.TSERV_WAL_MAX_SIZE, 
Property.TSERV_WALOG_MAX_SIZE));
 -    @SuppressWarnings("deprecation")
 -    final long walMaxAge = aconf
 -        .getTimeInMillis(aconf.resolve(Property.TSERV_WAL_MAX_AGE, 
Property.TSERV_WALOG_MAX_AGE));
 +    final long walMaxSize = aconf.getAsBytes(Property.TSERV_WAL_MAX_SIZE);
 +    final long walMaxAge = aconf.getTimeInMillis(Property.TSERV_WAL_MAX_AGE);
      final long minBlockSize =
          
context.getHadoopConf().getLong("dfs.namenode.fs-limits.min-block-size", 0);
      if (minBlockSize != 0 && minBlockSize > walMaxSize) {
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanfileManager.java
index 88d08ec16c,0000000000..68ced7e711
mode 100644,000000..100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanfileManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanfileManager.java
@@@ -1,166 -1,0 +1,182 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.tserver.tablet;
 +
 +import java.io.IOException;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Map;
 +import java.util.Set;
 +
 +import org.apache.accumulo.core.dataImpl.KeyExtent;
 +import org.apache.accumulo.core.metadata.StoredTabletFile;
 +import org.apache.accumulo.core.metadata.schema.Ample;
 +import org.apache.accumulo.core.metadata.schema.DataFileValue;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
 +import org.apache.accumulo.core.util.MapCounter;
 +import org.apache.accumulo.core.util.Pair;
 +import org.apache.accumulo.server.ServerContext;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.hadoop.fs.Path;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.base.Preconditions;
 +
 +class ScanfileManager {
 +  private final Logger log = LoggerFactory.getLogger(ScanfileManager.class);
 +  private final Tablet tablet;
 +
 +  ScanfileManager(Tablet tablet) {
 +    this.tablet = tablet;
 +  }
 +
 +  private final Set<StoredTabletFile> filesToDeleteAfterScan = new 
HashSet<>();
 +  private final Map<Long,Set<StoredTabletFile>> scanFileReservations = new 
HashMap<>();
 +  private final MapCounter<StoredTabletFile> fileScanReferenceCounts = new 
MapCounter<>();
 +  private long nextScanReservationId = 0;
 +
 +  static void rename(VolumeManager fs, Path src, Path dst) throws IOException 
{
 +    if (!fs.rename(src, dst)) {
 +      throw new IOException("Rename " + src + " to " + dst + " returned false 
");
 +    }
 +  }
 +
++  /**
++   * Removes any scan-in-use metadata entries that were left behind when a 
scan cleanup was
++   * interrupted. Intended to be called periodically to clear these orphaned 
scan refs once their
++   * in-memory reference count reaches zero.
++   */
++  public void removeOrphanedScanRefs() {
++    Set<StoredTabletFile> snapshot;
++    Location currLoc;
++    synchronized (tablet) {
++      snapshot = new HashSet<>(filesToDeleteAfterScan);
++      filesToDeleteAfterScan.clear();
++      currLoc = Location.current(tablet.getTabletServer().getTabletSession());
++    }
++    removeFilesAfterScan(snapshot, currLoc);
++  }
++
 +  static void removeScanFiles(KeyExtent extent, Set<StoredTabletFile> 
scanFiles,
 +      ServerContext context, Location currLocation) {
 +    try (var mutator = context.getAmple().conditionallyMutateTablets()) {
 +      var tabletMutator = 
mutator.mutateTablet(extent).requireLocation(currLocation);
 +
 +      scanFiles.forEach(tabletMutator::deleteScan);
 +
 +      tabletMutator
 +          .submit(tabletMetadata -> Collections.disjoint(scanFiles, 
tabletMetadata.getScans()));
 +
 +      var result = mutator.process().get(extent);
 +      Preconditions.checkState(result.getStatus() == 
Ample.ConditionalResult.Status.ACCEPTED,
 +          "Failed to remove scan file entries for %s", extent);
 +    }
 +  }
 +
 +  Pair<Long,Map<StoredTabletFile,DataFileValue>> reserveFilesForScan() {
 +    synchronized (tablet) {
 +
 +      var tabletsFiles = tablet.getDatafiles();
 +      Set<StoredTabletFile> absFilePaths = new 
HashSet<>(tabletsFiles.keySet());
 +
 +      long rid = nextScanReservationId++;
 +
 +      scanFileReservations.put(rid, absFilePaths);
 +
 +      Map<StoredTabletFile,DataFileValue> ret = new HashMap<>();
 +
 +      for (StoredTabletFile path : absFilePaths) {
 +        fileScanReferenceCounts.increment(path, 1);
 +        ret.put(path, tabletsFiles.get(path));
 +      }
 +
 +      return new Pair<>(rid, ret);
 +    }
 +  }
 +
 +  void returnFilesForScan(Long reservationId) {
 +
 +    final Set<StoredTabletFile> filesToDelete = new HashSet<>();
 +
 +    try {
 +      synchronized (tablet) {
 +        Set<StoredTabletFile> absFilePaths = 
scanFileReservations.remove(reservationId);
 +
 +        if (absFilePaths == null) {
 +          throw new IllegalArgumentException("Unknown scan reservation id " + 
reservationId);
 +        }
 +
 +        boolean notify = false;
 +        try {
 +          for (StoredTabletFile path : absFilePaths) {
 +            long refCount = fileScanReferenceCounts.decrement(path, 1);
 +            if (refCount == 0) {
 +              if (filesToDeleteAfterScan.remove(path)) {
 +                filesToDelete.add(path);
 +              }
 +              notify = true;
 +            } else if (refCount < 0) {
 +              throw new IllegalStateException("Scan ref count for " + path + 
" is " + refCount);
 +            }
 +          }
 +        } finally {
 +          if (notify) {
 +            tablet.notifyAll();
 +          }
 +        }
 +      }
 +    } finally {
 +      if (!filesToDelete.isEmpty()) {
 +        // Remove scan files even if the loop above did not fully complete 
because once a
 +        // file is in the set filesToDelete that means it was removed from 
filesToDeleteAfterScan
 +        // and would never be added back.
 +        log.debug("Removing scan refs from metadata {} {}", 
tablet.getExtent(), filesToDelete);
 +
 +        var currLoc = 
Location.current(tablet.getTabletServer().getTabletSession());
 +        removeScanFiles(tablet.getExtent(), filesToDelete, 
tablet.getContext(), currLoc);
 +      }
 +    }
 +  }
 +
 +  void removeFilesAfterScan(Collection<StoredTabletFile> scanFiles, Location 
location) {
 +    if (scanFiles.isEmpty()) {
 +      return;
 +    }
 +
 +    Set<StoredTabletFile> filesToDelete = new HashSet<>();
 +
 +    synchronized (tablet) {
 +      for (StoredTabletFile path : scanFiles) {
 +        if (fileScanReferenceCounts.get(path) == 0) {
 +          filesToDelete.add(path);
 +        } else {
 +          filesToDeleteAfterScan.add(path);
 +        }
 +      }
 +    }
 +
 +    if (!filesToDelete.isEmpty()) {
 +      log.debug("Removing scan refs from metadata {} {}", tablet.getExtent(), 
filesToDelete);
 +      removeScanFiles(tablet.getExtent(), filesToDelete, tablet.getContext(), 
location);
 +    }
 +  }
 +}
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index e6283cd09b,e4d9f9787c..bb590f96c9
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@@ -1493,9 -2213,17 +1493,13 @@@ public class Tablet extends TabletBase 
  
    @Override
    public void returnFilesForScan(long scanId) {
 -    getDatafileManager().returnFilesForScan(scanId);
 -  }
 -
 -  public MetadataUpdateCount getUpdateCount() {
 -    return getDatafileManager().getUpdateCount();
 +    getScanfileManager().returnFilesForScan(scanId);
    }
  
+   public void removeOrphanedScanRefs() {
 -    getDatafileManager().removeOrphanedScanRefs();
++    getScanfileManager().removeOrphanedScanRefs();
+   }
+ 
    TabletMemory getTabletMemory() {
      return tabletMemory;
    }

Reply via email to