This is an automated email from the ASF dual-hosted git repository. ddanielr pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit fa09c1a7f5a1e1cebe2d113c730af8478246ba09 Merge: 22c3e23695 b1e28302ae Author: Daniel Roberts ddanielr <ddani...@gmail.com> AuthorDate: Mon Jun 23 17:59:00 2025 +0000 Merge branch '2.1' Add to add location info to ScanfileManager.removeOrphanedScanRefs to compile code. .../core/spi/balancer/HostRegexTableLoadBalancer.java | 18 ++++++++++-------- .../manager/balancer/BalancerEnvironmentImpl.java | 2 +- .../java/org/apache/accumulo/tserver/TabletServer.java | 12 ++++++++++++ .../accumulo/tserver/tablet/ScanfileManager.java | 16 ++++++++++++++++ .../org/apache/accumulo/tserver/tablet/Tablet.java | 4 ++++ 5 files changed, 43 insertions(+), 9 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java index 5d53209057,4637d8c610..3afd93886f --- a/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java @@@ -370,12 -370,13 +370,13 @@@ public class HostRegexTableLoadBalance continue; } } - LOG.debug("Sending {} tablets to balancer for table {} for assignment within tservers {}", - e.getValue().size(), tableName, currentView.keySet()); + LOG.debug("Sending {} tablets to balancer for table {} for assignment within {} tservers", + e.getValue().size(), tableName, currentView.keySet().size()); + LOG.trace("table {} tserver assignment set {}", tableName, currentView.keySet()); assignmentTimer.restart(); - getBalancerForTable(e.getKey()) - .getAssignments(new AssignmentParamsImpl(currentView, e.getValue(), newAssignments)); + getBalancerForTable(e.getKey()).getAssignments(new AssignmentParamsImpl(currentView, + params.currentResourceGroups(), e.getValue(), newAssignments)); - LOG.trace("assignment results table:{} assignments:{} time:{}ms", tableName, + LOG.debug("assignment results table:{} assignments:{} time:{}ms", tableName, newAssignments.size(), assignmentTimer.elapsed(TimeUnit.MILLISECONDS)); newAssignments.forEach(params::addAssignment); } @@@ -519,10 -518,11 +520,11 @@@ } ArrayList<TabletMigration> newMigrations = new ArrayList<>(); balanceTimer.restart(); - getBalancerForTable(tableId) - .balance(new BalanceParamsImpl(currentView, params.currentResourceGroups(), migrations, - newMigrations, DataLevel.of(tableId), Map.of(tableName, tableId))); - LOG.trace("balance results tableId:{} migrations:{} time:{}ms", tableId, newMigrations.size(), - TabletBalancer tableBalancer = getBalancerForTable(tableId); - tableBalancer.balance(new BalanceParamsImpl(currentView, migrations, newMigrations, - params.partitionName() + ":" + tableId, Map.of(tableName, tableId))); ++ TabletBalancer tabletBalancer = getBalancerForTable(tableId); ++ tabletBalancer.balance(new BalanceParamsImpl(currentView, params.currentResourceGroups(), ++ migrations, newMigrations, DataLevel.of(tableId), Map.of(tableName, tableId))); + LOG.debug("balance results class:{} tableId:{} migrations:{} time:{}ms", - tableBalancer.getClass().getSimpleName(), tableId, newMigrations.size(), ++ tabletBalancer.getClass().getSimpleName(), tableId, newMigrations.size(), balanceTimer.elapsed(TimeUnit.MILLISECONDS)); if (newMigrations.isEmpty()) { diff --cc server/base/src/main/java/org/apache/accumulo/server/manager/balancer/BalancerEnvironmentImpl.java index 38ac1c2684,be6a904880..cdab458649 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/balancer/BalancerEnvironmentImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/balancer/BalancerEnvironmentImpl.java @@@ -89,10 -86,9 +89,10 @@@ public class BalancerEnvironmentImpl ex @Override public List<TabletStatistics> listOnlineTabletsForTable(TabletServerId tabletServerId, TableId tableId) throws AccumuloException, AccumuloSecurityException { - log.debug("Scanning tablet server {} for table {}", tabletServerId, tableId); + log.trace("Scanning tablet server {} for table {}", tabletServerId, tableId); try { - TabletClientService.Client client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, + TabletServerClientService.Client client = ThriftUtil.getClient( + ThriftClientTypes.TABLET_SERVER, HostAndPort.fromParts(tabletServerId.getHost(), tabletServerId.getPort()), getContext()); try { return client diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 1507b4bf9f,d5d411db50..49cb759299 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@@ -293,8 -310,24 +293,20 @@@ public class TabletServer extends Abstr }), 5, 5, TimeUnit.SECONDS); watchNonCriticalScheduledTask(future); + ScheduledFuture<?> cleanupTask = + context.getScheduledExecutor().scheduleWithFixedDelay(Threads.createNamedRunnable( + "ScanRefCleanupTask", () -> getOnlineTablets().values().forEach(tablet -> { + try { + tablet.removeOrphanedScanRefs(); + } catch (Exception e) { + log.error("Error cleaning up stale scan references for tablet {}", + tablet.getExtent(), e); + } + })), 5, 5, TimeUnit.MINUTES); + watchNonCriticalScheduledTask(cleanupTask); + - @SuppressWarnings("deprecation") - final long walMaxSize = - aconf.getAsBytes(aconf.resolve(Property.TSERV_WAL_MAX_SIZE, Property.TSERV_WALOG_MAX_SIZE)); - @SuppressWarnings("deprecation") - final long walMaxAge = aconf - .getTimeInMillis(aconf.resolve(Property.TSERV_WAL_MAX_AGE, Property.TSERV_WALOG_MAX_AGE)); + final long walMaxSize = aconf.getAsBytes(Property.TSERV_WAL_MAX_SIZE); + final long walMaxAge = aconf.getTimeInMillis(Property.TSERV_WAL_MAX_AGE); final long minBlockSize = context.getHadoopConf().getLong("dfs.namenode.fs-limits.min-block-size", 0); if (minBlockSize != 0 && minBlockSize > walMaxSize) { diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanfileManager.java index 88d08ec16c,0000000000..68ced7e711 mode 100644,000000..100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanfileManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanfileManager.java @@@ -1,166 -1,0 +1,182 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.tserver.tablet; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; +import org.apache.accumulo.core.util.MapCounter; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +class ScanfileManager { + private final Logger log = LoggerFactory.getLogger(ScanfileManager.class); + private final Tablet tablet; + + ScanfileManager(Tablet tablet) { + this.tablet = tablet; + } + + private final Set<StoredTabletFile> filesToDeleteAfterScan = new HashSet<>(); + private final Map<Long,Set<StoredTabletFile>> scanFileReservations = new HashMap<>(); + private final MapCounter<StoredTabletFile> fileScanReferenceCounts = new MapCounter<>(); + private long nextScanReservationId = 0; + + static void rename(VolumeManager fs, Path src, Path dst) throws IOException { + if (!fs.rename(src, dst)) { + throw new IOException("Rename " + src + " to " + dst + " returned false "); + } + } + ++ /** ++ * Removes any scan-in-use metadata entries that were left behind when a scan cleanup was ++ * interrupted. Intended to be called periodically to clear these orphaned scan refs once their ++ * in-memory reference count reaches zero. ++ */ ++ public void removeOrphanedScanRefs() { ++ Set<StoredTabletFile> snapshot; ++ Location currLoc; ++ synchronized (tablet) { ++ snapshot = new HashSet<>(filesToDeleteAfterScan); ++ filesToDeleteAfterScan.clear(); ++ currLoc = Location.current(tablet.getTabletServer().getTabletSession()); ++ } ++ removeFilesAfterScan(snapshot, currLoc); ++ } ++ + static void removeScanFiles(KeyExtent extent, Set<StoredTabletFile> scanFiles, + ServerContext context, Location currLocation) { + try (var mutator = context.getAmple().conditionallyMutateTablets()) { + var tabletMutator = mutator.mutateTablet(extent).requireLocation(currLocation); + + scanFiles.forEach(tabletMutator::deleteScan); + + tabletMutator + .submit(tabletMetadata -> Collections.disjoint(scanFiles, tabletMetadata.getScans())); + + var result = mutator.process().get(extent); + Preconditions.checkState(result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED, + "Failed to remove scan file entries for %s", extent); + } + } + + Pair<Long,Map<StoredTabletFile,DataFileValue>> reserveFilesForScan() { + synchronized (tablet) { + + var tabletsFiles = tablet.getDatafiles(); + Set<StoredTabletFile> absFilePaths = new HashSet<>(tabletsFiles.keySet()); + + long rid = nextScanReservationId++; + + scanFileReservations.put(rid, absFilePaths); + + Map<StoredTabletFile,DataFileValue> ret = new HashMap<>(); + + for (StoredTabletFile path : absFilePaths) { + fileScanReferenceCounts.increment(path, 1); + ret.put(path, tabletsFiles.get(path)); + } + + return new Pair<>(rid, ret); + } + } + + void returnFilesForScan(Long reservationId) { + + final Set<StoredTabletFile> filesToDelete = new HashSet<>(); + + try { + synchronized (tablet) { + Set<StoredTabletFile> absFilePaths = scanFileReservations.remove(reservationId); + + if (absFilePaths == null) { + throw new IllegalArgumentException("Unknown scan reservation id " + reservationId); + } + + boolean notify = false; + try { + for (StoredTabletFile path : absFilePaths) { + long refCount = fileScanReferenceCounts.decrement(path, 1); + if (refCount == 0) { + if (filesToDeleteAfterScan.remove(path)) { + filesToDelete.add(path); + } + notify = true; + } else if (refCount < 0) { + throw new IllegalStateException("Scan ref count for " + path + " is " + refCount); + } + } + } finally { + if (notify) { + tablet.notifyAll(); + } + } + } + } finally { + if (!filesToDelete.isEmpty()) { + // Remove scan files even if the loop above did not fully complete because once a + // file is in the set filesToDelete that means it was removed from filesToDeleteAfterScan + // and would never be added back. + log.debug("Removing scan refs from metadata {} {}", tablet.getExtent(), filesToDelete); + + var currLoc = Location.current(tablet.getTabletServer().getTabletSession()); + removeScanFiles(tablet.getExtent(), filesToDelete, tablet.getContext(), currLoc); + } + } + } + + void removeFilesAfterScan(Collection<StoredTabletFile> scanFiles, Location location) { + if (scanFiles.isEmpty()) { + return; + } + + Set<StoredTabletFile> filesToDelete = new HashSet<>(); + + synchronized (tablet) { + for (StoredTabletFile path : scanFiles) { + if (fileScanReferenceCounts.get(path) == 0) { + filesToDelete.add(path); + } else { + filesToDeleteAfterScan.add(path); + } + } + } + + if (!filesToDelete.isEmpty()) { + log.debug("Removing scan refs from metadata {} {}", tablet.getExtent(), filesToDelete); + removeScanFiles(tablet.getExtent(), filesToDelete, tablet.getContext(), location); + } + } +} diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index e6283cd09b,e4d9f9787c..bb590f96c9 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@@ -1493,9 -2213,17 +1493,13 @@@ public class Tablet extends TabletBase @Override public void returnFilesForScan(long scanId) { - getDatafileManager().returnFilesForScan(scanId); - } - - public MetadataUpdateCount getUpdateCount() { - return getDatafileManager().getUpdateCount(); + getScanfileManager().returnFilesForScan(scanId); } + public void removeOrphanedScanRefs() { - getDatafileManager().removeOrphanedScanRefs(); ++ getScanfileManager().removeOrphanedScanRefs(); + } + TabletMemory getTabletMemory() { return tabletMemory; }