This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 72110813107f14949bc082a109b4e35b06ea39ad
Merge: bd024372a1 ba296913b4
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Tue May 28 18:31:40 2024 +0000

    Merge branch '2.1'

 .../org/apache/accumulo/core/fate/AdminUtil.java   |  62 +++++++-----
 .../accumulo/core/file/BloomFilterLayer.java       |   6 ++
 .../apache/accumulo/core/file/FileSKVIterator.java |  11 +++
 .../core/file/rfile/MultiIndexIterator.java        |   6 ++
 .../org/apache/accumulo/core/file/rfile/RFile.java |  60 ++++++++++--
 .../iteratorsImpl/system/SequenceFileIterator.java |   6 ++
 .../org/apache/accumulo/compactor/Compactor.java   |  38 +++++++-
 .../org/apache/accumulo/test/FateSummaryIT.java    | 105 ++++++++++++++++++++-
 .../compaction/ExternalCompactionProgressIT.java   | 103 ++++++++++++++++++--
 9 files changed, 358 insertions(+), 39 deletions(-)

diff --cc core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
index 7cc0a9c004,b7afea1b84..6a5873f2b3
--- a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
@@@ -341,37 -341,57 +341,57 @@@ public class AdminUtil<T> 
      List<TransactionStatus> statuses = new ArrayList<>(transactions.size());
  
      for (Long tid : transactions) {
+       try {
+         zs.reserve(tid);
  
-       zs.reserve(tid);
- 
-       String txName = (String) zs.getTransactionInfo(tid, 
Fate.TxInfo.TX_NAME);
+         String txName = (String) zs.getTransactionInfo(tid, 
Fate.TxInfo.TX_NAME);
  
-       List<String> hlocks = heldLocks.remove(tid);
+         List<String> hlocks = heldLocks.remove(tid);
  
-       if (hlocks == null) {
-         hlocks = Collections.emptyList();
-       }
+         if (hlocks == null) {
+           hlocks = Collections.emptyList();
+         }
  
-       List<String> wlocks = waitingLocks.remove(tid);
+         List<String> wlocks = waitingLocks.remove(tid);
  
-       if (wlocks == null) {
-         wlocks = Collections.emptyList();
-       }
+         if (wlocks == null) {
+           wlocks = Collections.emptyList();
+         }
  
-       String top = null;
-       ReadOnlyRepo<T> repo = zs.top(tid);
-       if (repo != null) {
-         top = repo.getName();
-       }
+         String top = null;
+         ReadOnlyRepo<T> repo = zs.top(tid);
+         if (repo != null) {
+           top = repo.getName();
+         }
  
-       TStatus status = zs.getStatus(tid);
+         TStatus status = zs.getStatus(tid);
  
-       long timeCreated = zs.timeCreated(tid);
+         long timeCreated = zs.timeCreated(tid);
  
-       zs.unreserve(tid, Duration.ZERO);
 -        zs.unreserve(tid, 0, TimeUnit.MILLISECONDS);
++        zs.unreserve(tid, Duration.ZERO);
  
-       if (includeByStatus(status, filterStatus) && includeByTxid(tid, 
filterTxid)) {
-         statuses.add(new TransactionStatus(tid, status, txName, hlocks, 
wlocks, top, timeCreated));
+         if (includeByStatus(status, filterStatus) && includeByTxid(tid, 
filterTxid)) {
+           statuses
+               .add(new TransactionStatus(tid, status, txName, hlocks, wlocks, 
top, timeCreated));
+         }
+       } catch (Exception e) {
+         // If the cause of the Exception is a NoNodeException, it should be 
ignored as this
+         // indicates the transaction has completed between the time the list 
of transactions was
+         // acquired and the time the transaction was probed for info.
+         boolean nne = false;
+         Throwable cause = e;
+         while (cause != null) {
+           if (cause instanceof KeeperException.NoNodeException) {
+             nne = true;
+             break;
+           }
+           cause = cause.getCause();
+         }
+         if (!nne) {
+           throw e;
+         }
+         log.debug("Tried to get info on a since completed transaction - 
ignoring "
+             + FateTxId.formatTid(tid));
        }
      }
  
diff --cc core/src/main/java/org/apache/accumulo/core/file/FileSKVIterator.java
index 65c0c0bd42,23da429535..b55f1bab69
--- a/core/src/main/java/org/apache/accumulo/core/file/FileSKVIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/FileSKVIterator.java
@@@ -21,6 -21,8 +21,7 @@@ package org.apache.accumulo.core.file
  import java.io.DataInputStream;
  import java.io.IOException;
  
 -import org.apache.accumulo.core.data.Key;
+ import org.apache.accumulo.core.dataImpl.KeyExtent;
  import org.apache.accumulo.core.file.blockfile.impl.CacheProvider;
  import org.apache.accumulo.core.iteratorsImpl.system.InterruptibleIterator;
  import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
diff --cc core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
index c99d137b22,059eac98ca..2772e91659
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
@@@ -1165,9 -1159,14 +1166,14 @@@ public class RFile 
      public void setCacheProvider(CacheProvider cacheProvider) {
        throw new UnsupportedOperationException();
      }
+ 
+     @Override
+     public long estimateOverlappingEntries(KeyExtent extent) throws 
IOException {
+       throw new UnsupportedOperationException();
+     }
    }
  
 -  public static class Reader extends HeapIterator implements FileSKVIterator {
 +  public static class Reader extends HeapIterator implements RFileSKVIterator 
{
  
      private final CachableBlockFile.Reader reader;
  
@@@ -1565,229 -1562,34 +1571,269 @@@
        reader.setCacheProvider(cacheProvider);
      }
  
+     @Override
+     public long estimateOverlappingEntries(KeyExtent extent) throws 
IOException {
+       long totalEntries = 0;
+       Key startKey = extent.toDataRange().getStartKey();
+       IndexEntry indexEntry;
+ 
+       for (LocalityGroupReader lgr : currentReaders) {
+         boolean prevEntryOverlapped = false;
+         var indexIter = startKey == null ? lgr.getIndex() : 
lgr.index.lookup(startKey);
+ 
+         while (indexIter.hasNext()) {
+           indexEntry = indexIter.next();
+           if (extent.contains(indexEntry.getKey().getRow())) {
+             totalEntries += indexEntry.getNumEntries();
+             prevEntryOverlapped = true;
+           } else if (prevEntryOverlapped) {
+             // The last index entry included in the count is the one after 
the last contained by the
+             // extent. This is because it is possible for the extent to 
overlap this index entry
+             // but there is no way to check whether it does or not. The index 
entry only contains
+             // info about the last key, but the extent may overlap but not 
with the last key.
+             totalEntries += indexEntry.getNumEntries();
+             prevEntryOverlapped = false;
+             break;
+           }
+         }
+       }
+ 
+       return totalEntries;
+     }
++
 +    @Override
 +    public void reset() {
 +      clear();
 +    }
 +  }
 +
 +  public interface RFileSKVIterator extends FileSKVIterator {
 +    FileSKVIterator getIndex() throws IOException;
 +
 +    void reset();
 +  }
 +
 +  static abstract class FencedFileSKVIterator implements FileSKVIterator {
 +
 +    private final FileSKVIterator reader;
 +    protected final Range fence;
 +    private final Key fencedStartKey;
 +    private final Supplier<Key> fencedEndKey;
 +
 +    public FencedFileSKVIterator(FileSKVIterator reader, Range fence) {
 +      this.reader = Objects.requireNonNull(reader);
 +      this.fence = Objects.requireNonNull(fence);
 +      this.fencedStartKey = fence.getStartKey();
 +      this.fencedEndKey = Suppliers.memoize(() -> 
getEndKey(fence.getEndKey()));
 +    }
 +
 +    @Override
 +    public void init(SortedKeyValueIterator<Key,Value> source, 
Map<String,String> options,
 +        IteratorEnvironment env) throws IOException {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public boolean hasTop() {
 +      return reader.hasTop();
 +    }
 +
 +    @Override
 +    public void next() throws IOException {
 +      reader.next();
 +    }
 +
 +    @Override
 +    public Key getTopKey() {
 +      return reader.getTopKey();
 +    }
 +
 +    @Override
 +    public Value getTopValue() {
 +      return reader.getTopValue();
 +    }
 +
 +    @Override
 +    public Text getFirstRow() throws IOException {
 +      var row = reader.getFirstRow();
 +      if (row != null && fence.beforeStartKey(new Key(row))) {
 +        return fencedStartKey.getRow();
 +      } else {
 +        return row;
 +      }
 +    }
 +
 +    @Override
 +    public Text getLastRow() throws IOException {
 +      var row = reader.getLastRow();
 +      if (row != null && fence.afterEndKey(new Key(row))) {
 +        return fencedEndKey.get().getRow();
 +      } else {
 +        return row;
 +      }
 +    }
 +
 +    @Override
 +    public boolean isRunningLowOnMemory() {
 +      return reader.isRunningLowOnMemory();
 +    }
 +
 +    @Override
 +    public void setInterruptFlag(AtomicBoolean flag) {
 +      reader.setInterruptFlag(flag);
 +    }
 +
 +    @Override
 +    public DataInputStream getMetaStore(String name) throws IOException {
 +      return reader.getMetaStore(name);
 +    }
 +
 +    @Override
 +    public void closeDeepCopies() throws IOException {
 +      reader.closeDeepCopies();
 +    }
 +
 +    @Override
 +    public void setCacheProvider(CacheProvider cacheProvider) {
 +      reader.setCacheProvider(cacheProvider);
 +    }
 +
 +    @Override
 +    public void close() throws IOException {
 +      reader.close();
 +    }
 +
 +    private Key getEndKey(Key key) {
 +      // If they key is infinite it will be null or if inclusive we can just 
use it as is
 +      // as it would be the correct value for getLastKey()
 +      if (fence.isInfiniteStopKey() || fence.isEndKeyInclusive()) {
 +        return key;
 +      }
 +
 +      // If exclusive we need to strip the last byte to get the last key that 
is part of the
 +      // actual range to return
 +      final byte[] ba = key.getRow().getBytes();
 +      Preconditions.checkArgument(ba.length > 0 && ba[ba.length - 1] == 
(byte) 0x00);
 +      byte[] fba = new byte[ba.length - 1];
 +      System.arraycopy(ba, 0, fba, 0, ba.length - 1);
 +
 +      return new Key(fba);
 +    }
 +
 +  }
 +
 +  static class FencedIndex extends FencedFileSKVIterator {
 +    private final FileSKVIterator source;
 +
 +    public FencedIndex(FileSKVIterator source, Range seekFence) {
 +      super(source, seekFence);
 +      this.source = source;
 +    }
 +
 +    @Override
 +    public boolean hasTop() {
 +      // this code filters out data because the rfile index iterators do not 
support seek
 +
 +      // If startKey is set then discard everything until we reach the start
 +      // of the range
 +      if (fence.getStartKey() != null) {
 +
 +        while (source.hasTop() && fence.beforeStartKey(source.getTopKey())) {
 +          try {
 +            source.next();
 +          } catch (IOException e) {
 +            throw new UncheckedIOException(e);
 +          }
 +        }
 +      }
 +
 +      // If endKey is set then ensure that the current key is not passed the 
end of the range
 +      return source.hasTop() && !fence.afterEndKey(source.getTopKey());
 +    }
 +
++    @Override
++    public long estimateOverlappingEntries(KeyExtent extent) throws 
IOException {
++      throw new UnsupportedOperationException();
++    }
++
 +    @Override
 +    public FileSKVIterator getSample(SamplerConfigurationImpl sampleConfig) {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public void seek(Range range, Collection<ByteSequence> columnFamilies, 
boolean inclusive)
 +        throws IOException {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment 
env) {
 +      throw new UnsupportedOperationException();
 +    }
 +  }
 +
 +  static class FencedReader extends FencedFileSKVIterator implements 
RFileSKVIterator {
 +
 +    private final Reader reader;
 +
 +    public FencedReader(Reader reader, Range seekFence) {
 +      super(reader, seekFence);
 +      this.reader = reader;
 +    }
 +
 +    @Override
 +    public void seek(Range range, Collection<ByteSequence> columnFamilies, 
boolean inclusive)
 +        throws IOException {
 +      reader.reset();
 +
 +      if (fence != null) {
 +        range = fence.clip(range, true);
 +        if (range == null) {
 +          return;
 +        }
 +      }
 +
 +      reader.seek(range, columnFamilies, inclusive);
 +    }
 +
 +    @Override
 +    public FencedReader deepCopy(IteratorEnvironment env) {
 +      return new FencedReader(reader.deepCopy(env), fence);
 +    }
 +
 +    @Override
 +    public FileSKVIterator getIndex() throws IOException {
 +      return new FencedIndex(reader.getIndex(), fence);
 +    }
 +
++    @Override
++    public long estimateOverlappingEntries(KeyExtent extent) throws 
IOException {
++      return reader.estimateOverlappingEntries(extent);
++    }
++
 +    @Override
 +    public FileSKVIterator getSample(SamplerConfigurationImpl sampleConfig) {
 +      final Reader sample = reader.getSample(sampleConfig);
 +      return sample != null ? new FencedReader(sample, fence) : null;
 +    }
 +
 +    @Override
 +    public void reset() {
 +      reader.reset();
 +    }
 +  }
 +
 +  public static RFileSKVIterator getReader(final CachableBuilder cb, final 
TabletFile dataFile)
 +      throws IOException {
 +    final RFile.Reader reader = new RFile.Reader(Objects.requireNonNull(cb));
 +    return dataFile.hasRange() ? new FencedReader(reader, 
dataFile.getRange()) : reader;
 +  }
 +
 +  public static RFileSKVIterator getReader(final CachableBuilder cb, Range 
range)
 +      throws IOException {
 +    final RFile.Reader reader = new RFile.Reader(Objects.requireNonNull(cb));
 +    return !range.isInfiniteStartKey() || !range.isInfiniteStopKey()
 +        ? new FencedReader(reader, range) : reader;
    }
  }
diff --cc 
server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index ebd195a22f,2b85401516..fbba00bab6
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@@ -18,13 -18,14 +18,14 @@@
   */
  package org.apache.accumulo.compactor;
  
 +import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
  import static java.nio.charset.StandardCharsets.UTF_8;
  import static java.util.concurrent.TimeUnit.MINUTES;
 -import static 
org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
 +import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
  
  import java.io.IOException;
+ import java.io.UncheckedIOException;
  import java.net.UnknownHostException;
 -import java.security.SecureRandom;
  import java.util.ArrayList;
  import java.util.List;
  import java.util.Map;
@@@ -61,18 -61,16 +62,20 @@@ import org.apache.accumulo.core.conf.Pr
  import org.apache.accumulo.core.data.NamespaceId;
  import org.apache.accumulo.core.data.TableId;
  import org.apache.accumulo.core.dataImpl.KeyExtent;
 -import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
 -import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason;
 -import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockWatcher;
  import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
  import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+ import org.apache.accumulo.core.file.FileOperations;
+ import org.apache.accumulo.core.file.FileSKVIterator;
  import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil;
 +import org.apache.accumulo.core.lock.ServiceLock;
 +import org.apache.accumulo.core.lock.ServiceLock.LockLossReason;
 +import org.apache.accumulo.core.lock.ServiceLock.LockWatcher;
 +import org.apache.accumulo.core.lock.ServiceLockData;
 +import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor;
 +import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors;
 +import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
 +import org.apache.accumulo.core.metadata.ReferencedTabletFile;
  import org.apache.accumulo.core.metadata.StoredTabletFile;
 -import org.apache.accumulo.core.metadata.TabletFile;
  import org.apache.accumulo.core.metadata.schema.DataFileValue;
  import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
  import org.apache.accumulo.core.metadata.schema.TabletMetadata;
@@@ -105,7 -108,7 +109,8 @@@ import org.apache.accumulo.server.fs.Vo
  import org.apache.accumulo.server.rpc.ServerAddress;
  import org.apache.accumulo.server.rpc.TServerUtils;
  import org.apache.accumulo.server.rpc.ThriftProcessorTypes;
 +import org.apache.accumulo.server.zookeeper.TransactionWatcher;
+ import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
  import org.apache.thrift.TException;
  import org.apache.thrift.transport.TTransportException;
@@@ -537,6 -555,27 +548,27 @@@ public class Compactor extends Abstract
      };
    }
  
+   /**
+    * @param extent the extent
+    * @param file the file to read from
+    * @param tableConf the table configuration
+    * @param cryptoService the crypto service
+    * @return an estimate of the number of key/value entries in the file that 
overlap the extent
+    */
+   private long estimateOverlappingEntries(KeyExtent extent, StoredTabletFile 
file,
+       AccumuloConfiguration tableConf, CryptoService cryptoService) {
+     FileOperations fileFactory = FileOperations.getInstance();
+     FileSystem fs = 
getContext().getVolumeManager().getFileSystemByPath(file.getPath());
+ 
+     try (FileSKVIterator reader =
 -        fileFactory.newReaderBuilder().forFile(file.getPathStr(), fs, 
fs.getConf(), cryptoService)
++        fileFactory.newReaderBuilder().forFile(file, fs, fs.getConf(), 
cryptoService)
+             .withTableConfiguration(tableConf).dropCachesBehind().build()) {
+       return reader.estimateOverlappingEntries(extent);
+     } catch (IOException ioe) {
+       throw new UncheckedIOException(ioe);
+     }
+   }
+ 
    /**
     * Returns the number of seconds to wait in between progress checks based 
on input file sizes
     *

Reply via email to