This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 72110813107f14949bc082a109b4e35b06ea39ad Merge: bd024372a1 ba296913b4 Author: Keith Turner <ktur...@apache.org> AuthorDate: Tue May 28 18:31:40 2024 +0000 Merge branch '2.1' .../org/apache/accumulo/core/fate/AdminUtil.java | 62 +++++++----- .../accumulo/core/file/BloomFilterLayer.java | 6 ++ .../apache/accumulo/core/file/FileSKVIterator.java | 11 +++ .../core/file/rfile/MultiIndexIterator.java | 6 ++ .../org/apache/accumulo/core/file/rfile/RFile.java | 60 ++++++++++-- .../iteratorsImpl/system/SequenceFileIterator.java | 6 ++ .../org/apache/accumulo/compactor/Compactor.java | 38 +++++++- .../org/apache/accumulo/test/FateSummaryIT.java | 105 ++++++++++++++++++++- .../compaction/ExternalCompactionProgressIT.java | 103 ++++++++++++++++++-- 9 files changed, 358 insertions(+), 39 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java index 7cc0a9c004,b7afea1b84..6a5873f2b3 --- a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java @@@ -341,37 -341,57 +341,57 @@@ public class AdminUtil<T> List<TransactionStatus> statuses = new ArrayList<>(transactions.size()); for (Long tid : transactions) { + try { + zs.reserve(tid); - zs.reserve(tid); - - String txName = (String) zs.getTransactionInfo(tid, Fate.TxInfo.TX_NAME); + String txName = (String) zs.getTransactionInfo(tid, Fate.TxInfo.TX_NAME); - List<String> hlocks = heldLocks.remove(tid); + List<String> hlocks = heldLocks.remove(tid); - if (hlocks == null) { - hlocks = Collections.emptyList(); - } + if (hlocks == null) { + hlocks = Collections.emptyList(); + } - List<String> wlocks = waitingLocks.remove(tid); + List<String> wlocks = waitingLocks.remove(tid); - if (wlocks == null) { - wlocks = Collections.emptyList(); - } + if (wlocks == null) { + wlocks = Collections.emptyList(); + } - String top = null; - ReadOnlyRepo<T> repo = zs.top(tid); - if (repo != null) { - top = repo.getName(); - } + String top = null; + ReadOnlyRepo<T> repo = zs.top(tid); + if (repo != null) { + top = repo.getName(); + } - TStatus status = zs.getStatus(tid); + TStatus status = zs.getStatus(tid); - long timeCreated = zs.timeCreated(tid); + long timeCreated = zs.timeCreated(tid); - zs.unreserve(tid, Duration.ZERO); - zs.unreserve(tid, 0, TimeUnit.MILLISECONDS); ++ zs.unreserve(tid, Duration.ZERO); - if (includeByStatus(status, filterStatus) && includeByTxid(tid, filterTxid)) { - statuses.add(new TransactionStatus(tid, status, txName, hlocks, wlocks, top, timeCreated)); + if (includeByStatus(status, filterStatus) && includeByTxid(tid, filterTxid)) { + statuses + .add(new TransactionStatus(tid, status, txName, hlocks, wlocks, top, timeCreated)); + } + } catch (Exception e) { + // If the cause of the Exception is a NoNodeException, it should be ignored as this + // indicates the transaction has completed between the time the list of transactions was + // acquired and the time the transaction was probed for info. + boolean nne = false; + Throwable cause = e; + while (cause != null) { + if (cause instanceof KeeperException.NoNodeException) { + nne = true; + break; + } + cause = cause.getCause(); + } + if (!nne) { + throw e; + } + log.debug("Tried to get info on a since completed transaction - ignoring " + + FateTxId.formatTid(tid)); } } diff --cc core/src/main/java/org/apache/accumulo/core/file/FileSKVIterator.java index 65c0c0bd42,23da429535..b55f1bab69 --- a/core/src/main/java/org/apache/accumulo/core/file/FileSKVIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/file/FileSKVIterator.java @@@ -21,6 -21,8 +21,7 @@@ package org.apache.accumulo.core.file import java.io.DataInputStream; import java.io.IOException; -import org.apache.accumulo.core.data.Key; + import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.file.blockfile.impl.CacheProvider; import org.apache.accumulo.core.iteratorsImpl.system.InterruptibleIterator; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; diff --cc core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java index c99d137b22,059eac98ca..2772e91659 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java @@@ -1165,9 -1159,14 +1166,14 @@@ public class RFile public void setCacheProvider(CacheProvider cacheProvider) { throw new UnsupportedOperationException(); } + + @Override + public long estimateOverlappingEntries(KeyExtent extent) throws IOException { + throw new UnsupportedOperationException(); + } } - public static class Reader extends HeapIterator implements FileSKVIterator { + public static class Reader extends HeapIterator implements RFileSKVIterator { private final CachableBlockFile.Reader reader; @@@ -1565,229 -1562,34 +1571,269 @@@ reader.setCacheProvider(cacheProvider); } + @Override + public long estimateOverlappingEntries(KeyExtent extent) throws IOException { + long totalEntries = 0; + Key startKey = extent.toDataRange().getStartKey(); + IndexEntry indexEntry; + + for (LocalityGroupReader lgr : currentReaders) { + boolean prevEntryOverlapped = false; + var indexIter = startKey == null ? lgr.getIndex() : lgr.index.lookup(startKey); + + while (indexIter.hasNext()) { + indexEntry = indexIter.next(); + if (extent.contains(indexEntry.getKey().getRow())) { + totalEntries += indexEntry.getNumEntries(); + prevEntryOverlapped = true; + } else if (prevEntryOverlapped) { + // The last index entry included in the count is the one after the last contained by the + // extent. This is because it is possible for the extent to overlap this index entry + // but there is no way to check whether it does or not. The index entry only contains + // info about the last key, but the extent may overlap but not with the last key. + totalEntries += indexEntry.getNumEntries(); + prevEntryOverlapped = false; + break; + } + } + } + + return totalEntries; + } ++ + @Override + public void reset() { + clear(); + } + } + + public interface RFileSKVIterator extends FileSKVIterator { + FileSKVIterator getIndex() throws IOException; + + void reset(); + } + + static abstract class FencedFileSKVIterator implements FileSKVIterator { + + private final FileSKVIterator reader; + protected final Range fence; + private final Key fencedStartKey; + private final Supplier<Key> fencedEndKey; + + public FencedFileSKVIterator(FileSKVIterator reader, Range fence) { + this.reader = Objects.requireNonNull(reader); + this.fence = Objects.requireNonNull(fence); + this.fencedStartKey = fence.getStartKey(); + this.fencedEndKey = Suppliers.memoize(() -> getEndKey(fence.getEndKey())); + } + + @Override + public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, + IteratorEnvironment env) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean hasTop() { + return reader.hasTop(); + } + + @Override + public void next() throws IOException { + reader.next(); + } + + @Override + public Key getTopKey() { + return reader.getTopKey(); + } + + @Override + public Value getTopValue() { + return reader.getTopValue(); + } + + @Override + public Text getFirstRow() throws IOException { + var row = reader.getFirstRow(); + if (row != null && fence.beforeStartKey(new Key(row))) { + return fencedStartKey.getRow(); + } else { + return row; + } + } + + @Override + public Text getLastRow() throws IOException { + var row = reader.getLastRow(); + if (row != null && fence.afterEndKey(new Key(row))) { + return fencedEndKey.get().getRow(); + } else { + return row; + } + } + + @Override + public boolean isRunningLowOnMemory() { + return reader.isRunningLowOnMemory(); + } + + @Override + public void setInterruptFlag(AtomicBoolean flag) { + reader.setInterruptFlag(flag); + } + + @Override + public DataInputStream getMetaStore(String name) throws IOException { + return reader.getMetaStore(name); + } + + @Override + public void closeDeepCopies() throws IOException { + reader.closeDeepCopies(); + } + + @Override + public void setCacheProvider(CacheProvider cacheProvider) { + reader.setCacheProvider(cacheProvider); + } + + @Override + public void close() throws IOException { + reader.close(); + } + + private Key getEndKey(Key key) { + // If they key is infinite it will be null or if inclusive we can just use it as is + // as it would be the correct value for getLastKey() + if (fence.isInfiniteStopKey() || fence.isEndKeyInclusive()) { + return key; + } + + // If exclusive we need to strip the last byte to get the last key that is part of the + // actual range to return + final byte[] ba = key.getRow().getBytes(); + Preconditions.checkArgument(ba.length > 0 && ba[ba.length - 1] == (byte) 0x00); + byte[] fba = new byte[ba.length - 1]; + System.arraycopy(ba, 0, fba, 0, ba.length - 1); + + return new Key(fba); + } + + } + + static class FencedIndex extends FencedFileSKVIterator { + private final FileSKVIterator source; + + public FencedIndex(FileSKVIterator source, Range seekFence) { + super(source, seekFence); + this.source = source; + } + + @Override + public boolean hasTop() { + // this code filters out data because the rfile index iterators do not support seek + + // If startKey is set then discard everything until we reach the start + // of the range + if (fence.getStartKey() != null) { + + while (source.hasTop() && fence.beforeStartKey(source.getTopKey())) { + try { + source.next(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } + + // If endKey is set then ensure that the current key is not passed the end of the range + return source.hasTop() && !fence.afterEndKey(source.getTopKey()); + } + ++ @Override ++ public long estimateOverlappingEntries(KeyExtent extent) throws IOException { ++ throw new UnsupportedOperationException(); ++ } ++ + @Override + public FileSKVIterator getSample(SamplerConfigurationImpl sampleConfig) { + throw new UnsupportedOperationException(); + } + + @Override + public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) + throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { + throw new UnsupportedOperationException(); + } + } + + static class FencedReader extends FencedFileSKVIterator implements RFileSKVIterator { + + private final Reader reader; + + public FencedReader(Reader reader, Range seekFence) { + super(reader, seekFence); + this.reader = reader; + } + + @Override + public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) + throws IOException { + reader.reset(); + + if (fence != null) { + range = fence.clip(range, true); + if (range == null) { + return; + } + } + + reader.seek(range, columnFamilies, inclusive); + } + + @Override + public FencedReader deepCopy(IteratorEnvironment env) { + return new FencedReader(reader.deepCopy(env), fence); + } + + @Override + public FileSKVIterator getIndex() throws IOException { + return new FencedIndex(reader.getIndex(), fence); + } + ++ @Override ++ public long estimateOverlappingEntries(KeyExtent extent) throws IOException { ++ return reader.estimateOverlappingEntries(extent); ++ } ++ + @Override + public FileSKVIterator getSample(SamplerConfigurationImpl sampleConfig) { + final Reader sample = reader.getSample(sampleConfig); + return sample != null ? new FencedReader(sample, fence) : null; + } + + @Override + public void reset() { + reader.reset(); + } + } + + public static RFileSKVIterator getReader(final CachableBuilder cb, final TabletFile dataFile) + throws IOException { + final RFile.Reader reader = new RFile.Reader(Objects.requireNonNull(cb)); + return dataFile.hasRange() ? new FencedReader(reader, dataFile.getRange()) : reader; + } + + public static RFileSKVIterator getReader(final CachableBuilder cb, Range range) + throws IOException { + final RFile.Reader reader = new RFile.Reader(Objects.requireNonNull(cb)); + return !range.isInfiniteStartKey() || !range.isInfiniteStopKey() + ? new FencedReader(reader, range) : reader; } } diff --cc server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index ebd195a22f,2b85401516..fbba00bab6 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@@ -18,13 -18,14 +18,14 @@@ */ package org.apache.accumulo.compactor; +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.concurrent.TimeUnit.MINUTES; -import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly; +import static org.apache.accumulo.core.util.LazySingletons.RANDOM; import java.io.IOException; + import java.io.UncheckedIOException; import java.net.UnknownHostException; -import java.security.SecureRandom; import java.util.ArrayList; import java.util.List; import java.util.Map; @@@ -61,18 -61,16 +62,20 @@@ import org.apache.accumulo.core.conf.Pr import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.fate.zookeeper.ServiceLock; -import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason; -import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockWatcher; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; + import org.apache.accumulo.core.file.FileOperations; + import org.apache.accumulo.core.file.FileSKVIterator; import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; +import org.apache.accumulo.core.lock.ServiceLock; +import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; +import org.apache.accumulo.core.lock.ServiceLock.LockWatcher; +import org.apache.accumulo.core.lock.ServiceLockData; +import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor; +import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors; +import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; +import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; -import org.apache.accumulo.core.metadata.TabletFile; import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metadata.schema.TabletMetadata; @@@ -105,7 -108,7 +109,8 @@@ import org.apache.accumulo.server.fs.Vo import org.apache.accumulo.server.rpc.ServerAddress; import org.apache.accumulo.server.rpc.TServerUtils; import org.apache.accumulo.server.rpc.ThriftProcessorTypes; +import org.apache.accumulo.server.zookeeper.TransactionWatcher; + import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.thrift.TException; import org.apache.thrift.transport.TTransportException; @@@ -537,6 -555,27 +548,27 @@@ public class Compactor extends Abstract }; } + /** + * @param extent the extent + * @param file the file to read from + * @param tableConf the table configuration + * @param cryptoService the crypto service + * @return an estimate of the number of key/value entries in the file that overlap the extent + */ + private long estimateOverlappingEntries(KeyExtent extent, StoredTabletFile file, + AccumuloConfiguration tableConf, CryptoService cryptoService) { + FileOperations fileFactory = FileOperations.getInstance(); + FileSystem fs = getContext().getVolumeManager().getFileSystemByPath(file.getPath()); + + try (FileSKVIterator reader = - fileFactory.newReaderBuilder().forFile(file.getPathStr(), fs, fs.getConf(), cryptoService) ++ fileFactory.newReaderBuilder().forFile(file, fs, fs.getConf(), cryptoService) + .withTableConfiguration(tableConf).dropCachesBehind().build()) { + return reader.estimateOverlappingEntries(extent); + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } + } + /** * Returns the number of seconds to wait in between progress checks based on input file sizes *