This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 9144377ba57350b176778b2c2e43a396d86bfa20 Merge: afed77365e 7211081310 Author: Keith Turner <ktur...@apache.org> AuthorDate: Tue May 28 22:06:36 2024 +0000 Merge branch 'main' into elasticity .../org/apache/accumulo/core/fate/AdminUtil.java | 60 +++++++++----- .../accumulo/core/file/BloomFilterLayer.java | 6 ++ .../apache/accumulo/core/file/FileSKVIterator.java | 11 +++ .../core/file/rfile/MultiIndexIterator.java | 6 ++ .../org/apache/accumulo/core/file/rfile/RFile.java | 60 ++++++++++++-- .../iteratorsImpl/system/SequenceFileIterator.java | 6 ++ .../org/apache/accumulo/compactor/Compactor.java | 38 ++++++++- .../compaction/ExternalCompactionProgressIT.java | 93 ++++++++++++++++++++-- 8 files changed, 242 insertions(+), 38 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java index 516daef480,6a5873f2b3..bad466ba75 --- a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java @@@ -351,77 -333,89 +351,95 @@@ public class AdminUtil<T> * @param waitingLocks populated list of locks held by transaction - or an empty map if none. * @return current fate and lock status */ - private FateStatus getTransactionStatus(ReadOnlyTStore<T> zs, Set<Long> filterTxid, - EnumSet<TStatus> filterStatus, Map<Long,List<String>> heldLocks, - Map<Long,List<String>> waitingLocks) { + private FateStatus getTransactionStatus(Map<FateInstanceType,ReadOnlyFateStore<T>> fateStores, + Set<FateId> fateIdFilter, EnumSet<TStatus> statusFilter, + EnumSet<FateInstanceType> typesFilter, Map<FateId,List<String>> heldLocks, + Map<FateId,List<String>> waitingLocks) { + final List<TransactionStatus> statuses = new ArrayList<>(); - List<Long> transactions = zs.list(); - List<TransactionStatus> statuses = new ArrayList<>(transactions.size()); + fateStores.forEach((type, store) -> { + try (Stream<FateId> fateIds = store.list().map(FateIdStatus::getFateId)) { + fateIds.forEach(fateId -> { - for (Long tid : transactions) { - try { - zs.reserve(tid); - - String txName = (String) zs.getTransactionInfo(tid, Fate.TxInfo.TX_NAME); + ReadOnlyFateTxStore<T> txStore = store.read(fateId); ++ try { ++ String txName = (String) txStore.getTransactionInfo(Fate.TxInfo.TX_NAME); - String txName = (String) txStore.getTransactionInfo(Fate.TxInfo.TX_NAME); - - List<String> hlocks = heldLocks.remove(fateId); - List<String> hlocks = heldLocks.remove(tid); ++ List<String> hlocks = heldLocks.remove(fateId); - if (hlocks == null) { - hlocks = Collections.emptyList(); - } - if (hlocks == null) { - hlocks = Collections.emptyList(); - } - - List<String> wlocks = waitingLocks.remove(tid); ++ if (hlocks == null) { ++ hlocks = Collections.emptyList(); ++ } - List<String> wlocks = waitingLocks.remove(fateId); - if (wlocks == null) { - wlocks = Collections.emptyList(); - } ++ List<String> wlocks = waitingLocks.remove(fateId); - if (wlocks == null) { - wlocks = Collections.emptyList(); - } - String top = null; - ReadOnlyRepo<T> repo = zs.top(tid); - if (repo != null) { - top = repo.getName(); - } ++ if (wlocks == null) { ++ wlocks = Collections.emptyList(); ++ } - String top = null; - ReadOnlyRepo<T> repo = txStore.top(); - if (repo != null) { - top = repo.getName(); - } - TStatus status = zs.getStatus(tid); ++ String top = null; ++ ReadOnlyRepo<T> repo = txStore.top(); ++ if (repo != null) { ++ top = repo.getName(); ++ } - TStatus status = txStore.getStatus(); - long timeCreated = zs.timeCreated(tid); ++ TStatus status = txStore.getStatus(); - long timeCreated = txStore.timeCreated(); - zs.unreserve(tid, Duration.ZERO); ++ long timeCreated = txStore.timeCreated(); - if (includeByStatus(status, statusFilter) && includeByFateId(fateId, fateIdFilter) - && includeByInstanceType(fateId.getType(), typesFilter)) { - statuses.add(new TransactionStatus(fateId, type, status, txName, hlocks, wlocks, top, - timeCreated)); - if (includeByStatus(status, filterStatus) && includeByTxid(tid, filterTxid)) { - statuses - .add(new TransactionStatus(tid, status, txName, hlocks, wlocks, top, timeCreated)); - } - } catch (Exception e) { - // If the cause of the Exception is a NoNodeException, it should be ignored as this - // indicates the transaction has completed between the time the list of transactions was - // acquired and the time the transaction was probed for info. - boolean nne = false; - Throwable cause = e; - while (cause != null) { - if (cause instanceof KeeperException.NoNodeException) { - nne = true; - break; ++ if (includeByStatus(status, statusFilter) && includeByFateId(fateId, fateIdFilter) ++ && includeByInstanceType(fateId.getType(), typesFilter)) { ++ statuses.add(new TransactionStatus(fateId, type, status, txName, hlocks, wlocks, top, ++ timeCreated)); ++ } ++ } catch (Exception e) { ++ // If the cause of the Exception is a NoNodeException, it should be ignored as this ++ // indicates the transaction has completed between the time the list of transactions was ++ // acquired and the time the transaction was probed for info. ++ boolean nne = false; ++ Throwable cause = e; ++ while (cause != null) { ++ if (cause instanceof KeeperException.NoNodeException) { ++ nne = true; ++ break; ++ } ++ cause = cause.getCause(); ++ } ++ if (!nne) { ++ throw e; ++ } ++ log.debug("Tried to get info on a since completed transaction - ignoring {} ", fateId); } - cause = cause.getCause(); - } - if (!nne) { - throw e; - } - log.debug("Tried to get info on a since completed transaction - ignoring " - + FateTxId.formatTid(tid)); + }); } - } - + }); return new FateStatus(statuses, heldLocks, waitingLocks); + } + private boolean includeByStatus(TStatus status, EnumSet<TStatus> statusFilter) { + return statusFilter == null || statusFilter.isEmpty() || statusFilter.contains(status); } - private boolean includeByStatus(TStatus status, EnumSet<TStatus> filterStatus) { - return (filterStatus == null) || filterStatus.contains(status); + private boolean includeByFateId(FateId fateId, Set<FateId> fateIdFilter) { + return fateIdFilter == null || fateIdFilter.isEmpty() || fateIdFilter.contains(fateId); } - private boolean includeByTxid(Long tid, Set<Long> filterTxid) { - return (filterTxid == null) || filterTxid.isEmpty() || filterTxid.contains(tid); + private boolean includeByInstanceType(FateInstanceType type, + EnumSet<FateInstanceType> typesFilter) { + return typesFilter == null || typesFilter.isEmpty() || typesFilter.contains(type); } - public void printAll(ReadOnlyTStore<T> zs, ZooReader zk, + public void printAll(Map<FateInstanceType,ReadOnlyFateStore<T>> fateStores, ZooReader zk, ServiceLock.ServiceLockPath tableLocksPath) throws KeeperException, InterruptedException { - print(zs, zk, tableLocksPath, new Formatter(System.out), null, null); + print(fateStores, zk, tableLocksPath, new Formatter(System.out), null, null, null); } - public void print(ReadOnlyTStore<T> zs, ZooReader zk, ServiceLock.ServiceLockPath tableLocksPath, - Formatter fmt, Set<Long> filterTxid, EnumSet<TStatus> filterStatus) + public void print(Map<FateInstanceType,ReadOnlyFateStore<T>> fateStores, ZooReader zk, + ServiceLock.ServiceLockPath tableLocksPath, Formatter fmt, Set<FateId> fateIdFilter, + EnumSet<TStatus> statusFilter, EnumSet<FateInstanceType> typesFilter) throws KeeperException, InterruptedException { - FateStatus fateStatus = getStatus(zs, zk, tableLocksPath, filterTxid, filterStatus); + FateStatus fateStatus = + getStatus(fateStores, zk, tableLocksPath, fateIdFilter, statusFilter, typesFilter); for (TransactionStatus txStatus : fateStatus.getTransactions()) { fmt.format( diff --cc server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index a07b934f4c,fbba00bab6..2922a3999e --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@@ -61,9 -62,10 +62,11 @@@ import org.apache.accumulo.core.conf.Si import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; + import org.apache.accumulo.core.file.FileOperations; + import org.apache.accumulo.core.file.FileSKVIterator; import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; @@@ -107,7 -109,8 +111,8 @@@ import org.apache.accumulo.server.fs.Vo import org.apache.accumulo.server.rpc.ServerAddress; import org.apache.accumulo.server.rpc.TServerUtils; import org.apache.accumulo.server.rpc.ThriftProcessorTypes; -import org.apache.accumulo.server.zookeeper.TransactionWatcher; +import org.apache.accumulo.tserver.log.LogSorter; + import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.thrift.TException; import org.apache.thrift.transport.TTransportException; diff --cc test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java index b77ec14a8e,0d7936823b..1bcd075577 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java @@@ -37,9 -38,15 +39,14 @@@ import java.util.concurrent.atomic.Atom import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.IteratorSetting; + import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.compaction.thrift.TCompactionState; + import org.apache.accumulo.core.conf.Property; + import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.iterators.IteratorUtil; + import org.apache.accumulo.core.metadata.schema.TabletMetadata; + import org.apache.accumulo.core.metadata.schema.TabletsMetadata; +import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.core.util.compaction.RunningCompactionInfo; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.harness.AccumuloClusterHarness; @@@ -97,7 -103,7 +104,7 @@@ public class ExternalCompactionProgress client.tableOperations().attachIterator(table1, setting, EnumSet.of(IteratorUtil.IteratorScope.majc)); log.info("Compacting table"); - compact(client, table1, 2, "DCQ1", true); - compact(client, table1, 2, QUEUE1, true); ++ compact(client, table1, 2, GROUP1, true); verify(client, table1, 2, ROWS); log.info("Done Compacting table"); @@@ -108,6 -114,81 +115,72 @@@ } } + @Test + public void testProgressWithBulkImport() throws Exception { + /* + * Tests the progress of an external compaction done on a table with bulk imported files. + * Progress should stay 0-100. There was previously a bug with the Compactor showing a >100% + * progress for compactions with bulk import files. + */ + String[] tableNames = getUniqueNames(2); + String tableName1 = tableNames[0]; + String tableName2 = tableNames[1]; + + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + log.info("Creating table " + tableName1); + createTable(client, tableName1, "cs1"); + log.info("Creating table " + tableName2); + createTable(client, tableName2, "cs1"); + log.info("Writing " + ROWS + " rows to table " + tableName1); + writeData(client, tableName1, ROWS); + log.info("Writing " + ROWS + " rows to table " + tableName2); + writeData(client, tableName2, ROWS); + // This is done to avoid system compactions + client.tableOperations().setProperty(tableName1, Property.TABLE_MAJC_RATIO.getKey(), "1000"); + client.tableOperations().setProperty(tableName2, Property.TABLE_MAJC_RATIO.getKey(), "1000"); + - getCluster().getClusterControl().startCoordinator(CompactionCoordinator.class); - getCluster().getClusterControl().startCompactors(Compactor.class, 1, QUEUE1); - + String dir = getDir(client, tableName1); + + log.info("Bulk importing files in dir " + dir + " to table " + tableName2); + client.tableOperations().importDirectory(dir).to(tableName2).load(); + log.info("Finished bulk import"); + + log.info("Starting a compaction progress checker thread"); + Thread checkerThread = startChecker(); + checkerThread.start(); + + log.info("Attaching a slow iterator to table " + tableName2); + IteratorSetting setting = new IteratorSetting(50, "Slow", SlowIterator.class); + SlowIterator.setSleepTime(setting, 1); + + log.info("Compacting table " + tableName2); + client.tableOperations().compact(tableName2, + new CompactionConfig().setWait(true).setIterators(List.of(setting))); + log.info("Finished compacting table " + tableName2); + compactionFinished.set(true); + + log.info("Waiting on progress checker thread"); + checkerThread.join(); + + verifyProgress(); - } finally { - getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR); - getCluster().getClusterControl().stopAllServers(ServerType.COMPACTION_COORDINATOR); + } + } + + /** + * @param client an AccumuloClient + * @param tableName the table name + * @return the directory of the files for the table + */ + private String getDir(AccumuloClient client, String tableName) { + var tableId = TableId.of(client.tableOperations().tableIdMap().get(tableName)); + + try (var tabletsMeta = TabletsMetadata.builder(client).forTable(tableId) + .fetch(TabletMetadata.ColumnType.FILES).build()) { + return tabletsMeta.iterator().next().getFiles().iterator().next().getPath().getParent() + .toUri().getRawPath(); + } + } + public Thread startChecker() { return Threads.createThread("RC checker", () -> { try {