This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 2c8947b90044344fb049a334388aa2fbf46271bb Merge: 14b23a63fb 3061ff028d Author: Christopher Tubbs <ctubb...@apache.org> AuthorDate: Thu Dec 14 00:42:59 2023 -0500 Merge branch 'main' into elasticity .../accumulo/core/tabletserver/log/LogEntry.java | 36 +++++++++++----------- .../core/tabletserver/log/LogEntryTest.java | 6 ++-- .../server/constraints/MetadataConstraints.java | 7 +++-- .../org/apache/accumulo/server/fs/VolumeUtil.java | 8 ++--- .../accumulo/server/util/ListVolumesUsed.java | 2 +- .../apache/accumulo/server/fs/VolumeUtilTest.java | 6 ++-- .../accumulo/gc/GarbageCollectWriteAheadLogs.java | 2 +- .../accumulo/manager/recovery/RecoveryManager.java | 2 +- .../org/apache/accumulo/tserver/TabletServer.java | 2 +- .../org/apache/accumulo/tserver/tablet/Tablet.java | 2 +- .../accumulo/test/fate/zookeeper/FateIT.java | 10 ++++-- 11 files changed, 45 insertions(+), 38 deletions(-) diff --cc server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java index 9fad119948,3579fdbc6e..dbf4daf4f5 --- a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java +++ b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java @@@ -224,9 -213,8 +224,10 @@@ public class MetadataConstraints implem continue; } - if (columnUpdate.getValue().length == 0 && !columnFamily.equals(ScanFileColumnFamily.NAME) - && !HostingColumnFamily.REQUESTED_COLUMN.equals(columnFamily, columnQualifier) - && !columnFamily.equals(CompactedColumnFamily.NAME)) { + if (columnUpdate.getValue().length == 0 && !(columnFamily.equals(ScanFileColumnFamily.NAME) - || columnFamily.equals(LogColumnFamily.NAME))) { ++ || columnFamily.equals(LogColumnFamily.NAME) ++ || HostingColumnFamily.REQUESTED_COLUMN.equals(columnFamily, columnQualifier) ++ || columnFamily.equals(CompactedColumnFamily.NAME))) { violations = addViolation(violations, 6); } diff --cc server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java index 5c3ac18584,4021840869..140f23cae5 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java @@@ -130,59 -128,37 +130,59 @@@ public class VolumeUtil } } - /** - * This method does two things. First, it switches any volumes a tablet is using that are - * configured in instance.volumes.replacements. Second, if a tablet dir is no longer configured - * for use it chooses a new tablet directory. - */ - public static TabletFiles updateTabletVolumes(ServerContext context, ServiceLock zooLock, - KeyExtent extent, TabletFiles tabletFiles) { - List<Pair<Path,Path>> replacements = context.getVolumeReplacements(); + public static boolean needsVolumeReplacement(final List<Pair<Path,Path>> replacements, + final TabletMetadata tm) { if (replacements.isEmpty()) { - return tabletFiles; + return false; } - log.trace("Using volume replacements: {}", replacements); - List<LogEntry> logsToRemove = new ArrayList<>(); - List<LogEntry> logsToAdd = new ArrayList<>(); + MutableBoolean needsReplacement = new MutableBoolean(false); + + Consumer<LogEntry> consumer = le -> needsReplacement.setTrue(); + + volumeReplacementEvaluation(replacements, tm, consumer, consumer, + f -> needsReplacement.setTrue(), (f, dfv) -> needsReplacement.setTrue()); + + return needsReplacement.booleanValue(); + } - List<StoredTabletFile> filesToRemove = new ArrayList<>(); - SortedMap<ReferencedTabletFile,DataFileValue> filesToAdd = new TreeMap<>(); + public static class VolumeReplacements { + public final TabletMetadata tabletMeta; + public final List<LogEntry> logsToRemove = new ArrayList<>(); + public final List<LogEntry> logsToAdd = new ArrayList<>(); + public final List<StoredTabletFile> filesToRemove = new ArrayList<>(); + public final Map<ReferencedTabletFile,DataFileValue> filesToAdd = new HashMap<>(); - TabletFiles ret = new TabletFiles(); + public VolumeReplacements(TabletMetadata tabletMeta) { + this.tabletMeta = tabletMeta; + } + } - for (LogEntry logEntry : tabletFiles.logEntries) { + public static VolumeReplacements + computeVolumeReplacements(final List<Pair<Path,Path>> replacements, final TabletMetadata tm) { + var vr = new VolumeReplacements(tm); + volumeReplacementEvaluation(replacements, tm, vr.logsToRemove::add, vr.logsToAdd::add, + vr.filesToRemove::add, vr.filesToAdd::put); + return vr; + } + + public static void volumeReplacementEvaluation(final List<Pair<Path,Path>> replacements, + final TabletMetadata tm, final Consumer<LogEntry> logsToRemove, + final Consumer<LogEntry> logsToAdd, final Consumer<StoredTabletFile> filesToRemove, + final BiConsumer<ReferencedTabletFile,DataFileValue> filesToAdd) { + if (replacements.isEmpty() || (tm.getFilesMap().isEmpty() && tm.getLogs().isEmpty())) { + return; + } + + log.trace("Using volume replacements: {}", replacements); + for (LogEntry logEntry : tm.getLogs()) { + log.trace("Evaluating walog {} for replacement.", logEntry); LogEntry switchedLogEntry = switchVolumes(logEntry, replacements); if (switchedLogEntry != null) { - logsToRemove.add(logEntry); - logsToAdd.add(switchedLogEntry); - ret.logEntries.add(switchedLogEntry); - log.debug("Replacing volume {} : {} -> {}", extent, logEntry.getFilePath(), + logsToRemove.accept(logEntry); + logsToAdd.accept(switchedLogEntry); - log.trace("Replacing volume {} : {} -> {}", tm.getExtent(), logEntry.getLogReference(), - switchedLogEntry.getLogReference()); ++ log.trace("Replacing volume {} : {} -> {}", tm.getExtent(), logEntry.getFilePath(), + switchedLogEntry.getFilePath()); - } else { - ret.logEntries.add(logEntry); } } diff --cc server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java index 3c533c2691,807ba9f690..493b64afb4 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java @@@ -294,15 -286,16 +294,15 @@@ public class GarbageCollectWriteAheadLo } // Tablet is being recovered and has WAL references, remove all the WALs for the dead server // that made the WALs. - for (Collection<String> wals : state.walogs) { - for (String wal : wals) { - UUID walUUID = path2uuid(new Path(wal)); - TServerInstance dead = result.get(walUUID); - // There's a reference to a log file, so skip that server's logs - Set<UUID> idsToIgnore = candidates.remove(dead); - if (idsToIgnore != null) { - result.keySet().removeAll(idsToIgnore); - recoveryLogs.keySet().removeAll(idsToIgnore); - } + for (LogEntry wals : tabletMetadata.getLogs()) { - String wal = wals.getLogReference(); ++ String wal = wals.getFilePath(); + UUID walUUID = path2uuid(new Path(wal)); + TServerInstance dead = result.get(walUUID); + // There's a reference to a log file, so skip that server's logs + Set<UUID> idsToIgnore = candidates.remove(dead); + if (idsToIgnore != null) { + result.keySet().removeAll(idsToIgnore); + recoveryLogs.keySet().removeAll(idsToIgnore); } } } diff --cc server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java index 3c8d7922f2,86bc06930c..9cb41a90c1 --- a/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java @@@ -158,70 -156,72 +158,70 @@@ public class RecoveryManager } } - public boolean recoverLogs(KeyExtent extent, Collection<Collection<String>> walogs) - throws IOException { + public boolean recoverLogs(KeyExtent extent, Collection<LogEntry> walogs) throws IOException { boolean recoveryNeeded = false; - for (Collection<String> logs : walogs) { - for (String walog : logs) { - - Path switchedWalog = VolumeUtil.switchVolume(new Path(walog), FileType.WAL, - manager.getContext().getVolumeReplacements()); - if (switchedWalog != null) { - // replaces the volume used for sorting, but do not change entry in metadata table. When - // the tablet loads it will change the metadata table entry. If - // the tablet has the same replacement config, then it will find the sorted log. - log.info("Volume replaced {} -> {}", walog, switchedWalog); - walog = switchedWalog.toString(); - } + for (LogEntry entry : walogs) { - String walog = entry.getLogReference(); ++ String walog = entry.getFilePath(); + + Path switchedWalog = VolumeUtil.switchVolume(new Path(walog), FileType.WAL, + manager.getContext().getVolumeReplacements()); + if (switchedWalog != null) { + // replaces the volume used for sorting, but do not change entry in metadata table. When + // the tablet loads it will change the metadata table entry. If + // the tablet has the same replacement config, then it will find the sorted log. + log.info("Volume replaced {} -> {}", walog, switchedWalog); + walog = switchedWalog.toString(); + } - String[] parts = walog.split("/"); - String sortId = parts[parts.length - 1]; - String filename = new Path(walog).toString(); - String dest = RecoveryPath.getRecoveryPath(new Path(filename)).toString(); + String[] parts = walog.split("/"); + String sortId = parts[parts.length - 1]; + String filename = new Path(walog).toString(); + String dest = RecoveryPath.getRecoveryPath(new Path(filename)).toString(); + + boolean sortQueued; + synchronized (this) { + sortQueued = sortsQueued.contains(sortId); + } - boolean sortQueued; + if (sortQueued + && zooCache.get(manager.getZooKeeperRoot() + Constants.ZRECOVERY + "/" + sortId) + == null) { synchronized (this) { - sortQueued = sortsQueued.contains(sortId); + sortsQueued.remove(sortId); } + } - if (sortQueued - && zooCache.get(manager.getZooKeeperRoot() + Constants.ZRECOVERY + "/" + sortId) - == null) { - synchronized (this) { - sortsQueued.remove(sortId); - } + if (exists(SortedLogState.getFinishedMarkerPath(dest))) { + synchronized (this) { + closeTasksQueued.remove(sortId); + recoveryDelay.remove(sortId); + sortsQueued.remove(sortId); } + continue; + } - if (exists(SortedLogState.getFinishedMarkerPath(dest))) { - synchronized (this) { - closeTasksQueued.remove(sortId); - recoveryDelay.remove(sortId); - sortsQueued.remove(sortId); + recoveryNeeded = true; + synchronized (this) { + if (!closeTasksQueued.contains(sortId) && !sortsQueued.contains(sortId)) { + AccumuloConfiguration aconf = manager.getConfiguration(); + LogCloser closer = Property.createInstanceFromPropertyName(aconf, + Property.MANAGER_WAL_CLOSER_IMPLEMENTATION, LogCloser.class, new HadoopLogCloser()); + Long delay = recoveryDelay.get(sortId); + if (delay == null) { + delay = aconf.getTimeInMillis(Property.MANAGER_RECOVERY_DELAY); + } else { + delay = Math.min(2 * delay, 1000 * 60 * 5L); } - continue; - } - recoveryNeeded = true; - synchronized (this) { - if (!closeTasksQueued.contains(sortId) && !sortsQueued.contains(sortId)) { - AccumuloConfiguration aconf = manager.getConfiguration(); - LogCloser closer = Property.createInstanceFromPropertyName(aconf, - Property.MANAGER_WAL_CLOSER_IMPLEMENTATION, LogCloser.class, new HadoopLogCloser()); - Long delay = recoveryDelay.get(sortId); - if (delay == null) { - delay = aconf.getTimeInMillis(Property.MANAGER_RECOVERY_DELAY); - } else { - delay = Math.min(2 * delay, 1000 * 60 * 5L); - } - - log.info("Starting recovery of {} (in : {}s), tablet {} holds a reference", filename, - (delay / 1000), extent); - - ScheduledFuture<?> future = executor.schedule( - new LogSortTask(closer, filename, dest, sortId), delay, TimeUnit.MILLISECONDS); - ThreadPools.watchNonCriticalScheduledTask(future); - closeTasksQueued.add(sortId); - recoveryDelay.put(sortId, delay); - } + log.info("Starting recovery of {} (in : {}s), tablet {} holds a reference", filename, + (delay / 1000), extent); + + ScheduledFuture<?> future = executor.schedule( + new LogSortTask(closer, filename, dest, sortId), delay, TimeUnit.MILLISECONDS); + ThreadPools.watchNonCriticalScheduledTask(future); + closeTasksQueued.add(sortId); + recoveryDelay.put(sortId, delay); } } } diff --cc test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java index 5091d0be0b,d4da1ebae4..b7ae2a13ad --- a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java @@@ -272,11 -276,10 +276,11 @@@ public class FateIT long txid = fate.startTransaction(); LOG.debug("Starting test testCancelWhileSubmitted with {}", FateTxId.formatTid(txid)); assertEquals(NEW, getTxStatus(zk, txid)); - fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID), true, "Test Op"); - assertEquals(SUBMITTED, getTxStatus(zk, txid)); + fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID), false, "Test Op"); + Wait.waitFor(() -> IN_PROGRESS == getTxStatus(zk, txid)); // This is false because the transaction runner has reserved the FaTe // transaction. + Wait.waitFor(() -> IN_PROGRESS == getTxStatus(zk, txid)); assertFalse(fate.cancel(txid)); callStarted.await(); finishCall.countDown();