This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 2c8947b90044344fb049a334388aa2fbf46271bb
Merge: 14b23a63fb 3061ff028d
Author: Christopher Tubbs <ctubb...@apache.org>
AuthorDate: Thu Dec 14 00:42:59 2023 -0500

    Merge branch 'main' into elasticity

 .../accumulo/core/tabletserver/log/LogEntry.java   | 36 +++++++++++-----------
 .../core/tabletserver/log/LogEntryTest.java        |  6 ++--
 .../server/constraints/MetadataConstraints.java    |  7 +++--
 .../org/apache/accumulo/server/fs/VolumeUtil.java  |  8 ++---
 .../accumulo/server/util/ListVolumesUsed.java      |  2 +-
 .../apache/accumulo/server/fs/VolumeUtilTest.java  |  6 ++--
 .../accumulo/gc/GarbageCollectWriteAheadLogs.java  |  2 +-
 .../accumulo/manager/recovery/RecoveryManager.java |  2 +-
 .../org/apache/accumulo/tserver/TabletServer.java  |  2 +-
 .../org/apache/accumulo/tserver/tablet/Tablet.java |  2 +-
 .../accumulo/test/fate/zookeeper/FateIT.java       | 10 ++++--
 11 files changed, 45 insertions(+), 38 deletions(-)

diff --cc 
server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
index 9fad119948,3579fdbc6e..dbf4daf4f5
--- 
a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
@@@ -224,9 -213,8 +224,10 @@@ public class MetadataConstraints implem
          continue;
        }
  
-       if (columnUpdate.getValue().length == 0 && 
!columnFamily.equals(ScanFileColumnFamily.NAME)
-           && !HostingColumnFamily.REQUESTED_COLUMN.equals(columnFamily, 
columnQualifier)
-           && !columnFamily.equals(CompactedColumnFamily.NAME)) {
+       if (columnUpdate.getValue().length == 0 && 
!(columnFamily.equals(ScanFileColumnFamily.NAME)
 -          || columnFamily.equals(LogColumnFamily.NAME))) {
++          || columnFamily.equals(LogColumnFamily.NAME)
++          || HostingColumnFamily.REQUESTED_COLUMN.equals(columnFamily, 
columnQualifier)
++          || columnFamily.equals(CompactedColumnFamily.NAME))) {
          violations = addViolation(violations, 6);
        }
  
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
index 5c3ac18584,4021840869..140f23cae5
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
@@@ -130,59 -128,37 +130,59 @@@ public class VolumeUtil 
      }
    }
  
 -  /**
 -   * This method does two things. First, it switches any volumes a tablet is 
using that are
 -   * configured in instance.volumes.replacements. Second, if a tablet dir is 
no longer configured
 -   * for use it chooses a new tablet directory.
 -   */
 -  public static TabletFiles updateTabletVolumes(ServerContext context, 
ServiceLock zooLock,
 -      KeyExtent extent, TabletFiles tabletFiles) {
 -    List<Pair<Path,Path>> replacements = context.getVolumeReplacements();
 +  public static boolean needsVolumeReplacement(final List<Pair<Path,Path>> 
replacements,
 +      final TabletMetadata tm) {
      if (replacements.isEmpty()) {
 -      return tabletFiles;
 +      return false;
      }
 -    log.trace("Using volume replacements: {}", replacements);
  
 -    List<LogEntry> logsToRemove = new ArrayList<>();
 -    List<LogEntry> logsToAdd = new ArrayList<>();
 +    MutableBoolean needsReplacement = new MutableBoolean(false);
 +
 +    Consumer<LogEntry> consumer = le -> needsReplacement.setTrue();
 +
 +    volumeReplacementEvaluation(replacements, tm, consumer, consumer,
 +        f -> needsReplacement.setTrue(), (f, dfv) -> 
needsReplacement.setTrue());
 +
 +    return needsReplacement.booleanValue();
 +  }
  
 -    List<StoredTabletFile> filesToRemove = new ArrayList<>();
 -    SortedMap<ReferencedTabletFile,DataFileValue> filesToAdd = new 
TreeMap<>();
 +  public static class VolumeReplacements {
 +    public final TabletMetadata tabletMeta;
 +    public final List<LogEntry> logsToRemove = new ArrayList<>();
 +    public final List<LogEntry> logsToAdd = new ArrayList<>();
 +    public final List<StoredTabletFile> filesToRemove = new ArrayList<>();
 +    public final Map<ReferencedTabletFile,DataFileValue> filesToAdd = new 
HashMap<>();
  
 -    TabletFiles ret = new TabletFiles();
 +    public VolumeReplacements(TabletMetadata tabletMeta) {
 +      this.tabletMeta = tabletMeta;
 +    }
 +  }
  
 -    for (LogEntry logEntry : tabletFiles.logEntries) {
 +  public static VolumeReplacements
 +      computeVolumeReplacements(final List<Pair<Path,Path>> replacements, 
final TabletMetadata tm) {
 +    var vr = new VolumeReplacements(tm);
 +    volumeReplacementEvaluation(replacements, tm, vr.logsToRemove::add, 
vr.logsToAdd::add,
 +        vr.filesToRemove::add, vr.filesToAdd::put);
 +    return vr;
 +  }
 +
 +  public static void volumeReplacementEvaluation(final List<Pair<Path,Path>> 
replacements,
 +      final TabletMetadata tm, final Consumer<LogEntry> logsToRemove,
 +      final Consumer<LogEntry> logsToAdd, final Consumer<StoredTabletFile> 
filesToRemove,
 +      final BiConsumer<ReferencedTabletFile,DataFileValue> filesToAdd) {
 +    if (replacements.isEmpty() || (tm.getFilesMap().isEmpty() && 
tm.getLogs().isEmpty())) {
 +      return;
 +    }
 +
 +    log.trace("Using volume replacements: {}", replacements);
 +    for (LogEntry logEntry : tm.getLogs()) {
 +      log.trace("Evaluating walog {} for replacement.", logEntry);
        LogEntry switchedLogEntry = switchVolumes(logEntry, replacements);
        if (switchedLogEntry != null) {
 -        logsToRemove.add(logEntry);
 -        logsToAdd.add(switchedLogEntry);
 -        ret.logEntries.add(switchedLogEntry);
 -        log.debug("Replacing volume {} : {} -> {}", extent, 
logEntry.getFilePath(),
 +        logsToRemove.accept(logEntry);
 +        logsToAdd.accept(switchedLogEntry);
-         log.trace("Replacing volume {} : {} -> {}", tm.getExtent(), 
logEntry.getLogReference(),
-             switchedLogEntry.getLogReference());
++        log.trace("Replacing volume {} : {} -> {}", tm.getExtent(), 
logEntry.getFilePath(),
+             switchedLogEntry.getFilePath());
 -      } else {
 -        ret.logEntries.add(logEntry);
        }
      }
  
diff --cc 
server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index 3c533c2691,807ba9f690..493b64afb4
--- 
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ 
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@@ -294,15 -286,16 +294,15 @@@ public class GarbageCollectWriteAheadLo
        }
        // Tablet is being recovered and has WAL references, remove all the 
WALs for the dead server
        // that made the WALs.
 -      for (Collection<String> wals : state.walogs) {
 -        for (String wal : wals) {
 -          UUID walUUID = path2uuid(new Path(wal));
 -          TServerInstance dead = result.get(walUUID);
 -          // There's a reference to a log file, so skip that server's logs
 -          Set<UUID> idsToIgnore = candidates.remove(dead);
 -          if (idsToIgnore != null) {
 -            result.keySet().removeAll(idsToIgnore);
 -            recoveryLogs.keySet().removeAll(idsToIgnore);
 -          }
 +      for (LogEntry wals : tabletMetadata.getLogs()) {
-         String wal = wals.getLogReference();
++        String wal = wals.getFilePath();
 +        UUID walUUID = path2uuid(new Path(wal));
 +        TServerInstance dead = result.get(walUUID);
 +        // There's a reference to a log file, so skip that server's logs
 +        Set<UUID> idsToIgnore = candidates.remove(dead);
 +        if (idsToIgnore != null) {
 +          result.keySet().removeAll(idsToIgnore);
 +          recoveryLogs.keySet().removeAll(idsToIgnore);
          }
        }
      }
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java
index 3c8d7922f2,86bc06930c..9cb41a90c1
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java
@@@ -158,70 -156,72 +158,70 @@@ public class RecoveryManager 
      }
    }
  
 -  public boolean recoverLogs(KeyExtent extent, Collection<Collection<String>> 
walogs)
 -      throws IOException {
 +  public boolean recoverLogs(KeyExtent extent, Collection<LogEntry> walogs) 
throws IOException {
      boolean recoveryNeeded = false;
  
 -    for (Collection<String> logs : walogs) {
 -      for (String walog : logs) {
 -
 -        Path switchedWalog = VolumeUtil.switchVolume(new Path(walog), 
FileType.WAL,
 -            manager.getContext().getVolumeReplacements());
 -        if (switchedWalog != null) {
 -          // replaces the volume used for sorting, but do not change entry in 
metadata table. When
 -          // the tablet loads it will change the metadata table entry. If
 -          // the tablet has the same replacement config, then it will find 
the sorted log.
 -          log.info("Volume replaced {} -> {}", walog, switchedWalog);
 -          walog = switchedWalog.toString();
 -        }
 +    for (LogEntry entry : walogs) {
-       String walog = entry.getLogReference();
++      String walog = entry.getFilePath();
 +
 +      Path switchedWalog = VolumeUtil.switchVolume(new Path(walog), 
FileType.WAL,
 +          manager.getContext().getVolumeReplacements());
 +      if (switchedWalog != null) {
 +        // replaces the volume used for sorting, but do not change entry in 
metadata table. When
 +        // the tablet loads it will change the metadata table entry. If
 +        // the tablet has the same replacement config, then it will find the 
sorted log.
 +        log.info("Volume replaced {} -> {}", walog, switchedWalog);
 +        walog = switchedWalog.toString();
 +      }
  
 -        String[] parts = walog.split("/");
 -        String sortId = parts[parts.length - 1];
 -        String filename = new Path(walog).toString();
 -        String dest = RecoveryPath.getRecoveryPath(new 
Path(filename)).toString();
 +      String[] parts = walog.split("/");
 +      String sortId = parts[parts.length - 1];
 +      String filename = new Path(walog).toString();
 +      String dest = RecoveryPath.getRecoveryPath(new 
Path(filename)).toString();
 +
 +      boolean sortQueued;
 +      synchronized (this) {
 +        sortQueued = sortsQueued.contains(sortId);
 +      }
  
 -        boolean sortQueued;
 +      if (sortQueued
 +          && zooCache.get(manager.getZooKeeperRoot() + Constants.ZRECOVERY + 
"/" + sortId)
 +              == null) {
          synchronized (this) {
 -          sortQueued = sortsQueued.contains(sortId);
 +          sortsQueued.remove(sortId);
          }
 +      }
  
 -        if (sortQueued
 -            && zooCache.get(manager.getZooKeeperRoot() + Constants.ZRECOVERY 
+ "/" + sortId)
 -                == null) {
 -          synchronized (this) {
 -            sortsQueued.remove(sortId);
 -          }
 +      if (exists(SortedLogState.getFinishedMarkerPath(dest))) {
 +        synchronized (this) {
 +          closeTasksQueued.remove(sortId);
 +          recoveryDelay.remove(sortId);
 +          sortsQueued.remove(sortId);
          }
 +        continue;
 +      }
  
 -        if (exists(SortedLogState.getFinishedMarkerPath(dest))) {
 -          synchronized (this) {
 -            closeTasksQueued.remove(sortId);
 -            recoveryDelay.remove(sortId);
 -            sortsQueued.remove(sortId);
 +      recoveryNeeded = true;
 +      synchronized (this) {
 +        if (!closeTasksQueued.contains(sortId) && 
!sortsQueued.contains(sortId)) {
 +          AccumuloConfiguration aconf = manager.getConfiguration();
 +          LogCloser closer = Property.createInstanceFromPropertyName(aconf,
 +              Property.MANAGER_WAL_CLOSER_IMPLEMENTATION, LogCloser.class, 
new HadoopLogCloser());
 +          Long delay = recoveryDelay.get(sortId);
 +          if (delay == null) {
 +            delay = aconf.getTimeInMillis(Property.MANAGER_RECOVERY_DELAY);
 +          } else {
 +            delay = Math.min(2 * delay, 1000 * 60 * 5L);
            }
 -          continue;
 -        }
  
 -        recoveryNeeded = true;
 -        synchronized (this) {
 -          if (!closeTasksQueued.contains(sortId) && 
!sortsQueued.contains(sortId)) {
 -            AccumuloConfiguration aconf = manager.getConfiguration();
 -            LogCloser closer = Property.createInstanceFromPropertyName(aconf,
 -                Property.MANAGER_WAL_CLOSER_IMPLEMENTATION, LogCloser.class, 
new HadoopLogCloser());
 -            Long delay = recoveryDelay.get(sortId);
 -            if (delay == null) {
 -              delay = aconf.getTimeInMillis(Property.MANAGER_RECOVERY_DELAY);
 -            } else {
 -              delay = Math.min(2 * delay, 1000 * 60 * 5L);
 -            }
 -
 -            log.info("Starting recovery of {} (in : {}s), tablet {} holds a 
reference", filename,
 -                (delay / 1000), extent);
 -
 -            ScheduledFuture<?> future = executor.schedule(
 -                new LogSortTask(closer, filename, dest, sortId), delay, 
TimeUnit.MILLISECONDS);
 -            ThreadPools.watchNonCriticalScheduledTask(future);
 -            closeTasksQueued.add(sortId);
 -            recoveryDelay.put(sortId, delay);
 -          }
 +          log.info("Starting recovery of {} (in : {}s), tablet {} holds a 
reference", filename,
 +              (delay / 1000), extent);
 +
 +          ScheduledFuture<?> future = executor.schedule(
 +              new LogSortTask(closer, filename, dest, sortId), delay, 
TimeUnit.MILLISECONDS);
 +          ThreadPools.watchNonCriticalScheduledTask(future);
 +          closeTasksQueued.add(sortId);
 +          recoveryDelay.put(sortId, delay);
          }
        }
      }
diff --cc test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
index 5091d0be0b,d4da1ebae4..b7ae2a13ad
--- a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
@@@ -272,11 -276,10 +276,11 @@@ public class FateIT 
        long txid = fate.startTransaction();
        LOG.debug("Starting test testCancelWhileSubmitted with {}", 
FateTxId.formatTid(txid));
        assertEquals(NEW, getTxStatus(zk, txid));
-       fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID), 
true, "Test Op");
-       assertEquals(SUBMITTED, getTxStatus(zk, txid));
+       fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID), 
false, "Test Op");
+       Wait.waitFor(() -> IN_PROGRESS == getTxStatus(zk, txid));
        // This is false because the transaction runner has reserved the FaTe
        // transaction.
 +      Wait.waitFor(() -> IN_PROGRESS == getTxStatus(zk, txid));
        assertFalse(fate.cancel(txid));
        callStarted.await();
        finishCall.countDown();

Reply via email to