This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 9144377ba57350b176778b2c2e43a396d86bfa20
Merge: afed77365e 7211081310
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Tue May 28 22:06:36 2024 +0000

    Merge branch 'main' into elasticity

 .../org/apache/accumulo/core/fate/AdminUtil.java   | 60 +++++++++-----
 .../accumulo/core/file/BloomFilterLayer.java       |  6 ++
 .../apache/accumulo/core/file/FileSKVIterator.java | 11 +++
 .../core/file/rfile/MultiIndexIterator.java        |  6 ++
 .../org/apache/accumulo/core/file/rfile/RFile.java | 60 ++++++++++++--
 .../iteratorsImpl/system/SequenceFileIterator.java |  6 ++
 .../org/apache/accumulo/compactor/Compactor.java   | 38 ++++++++-
 .../compaction/ExternalCompactionProgressIT.java   | 93 ++++++++++++++++++++--
 8 files changed, 242 insertions(+), 38 deletions(-)

diff --cc core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
index 516daef480,6a5873f2b3..bad466ba75
--- a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
@@@ -351,77 -333,89 +351,95 @@@ public class AdminUtil<T> 
     * @param waitingLocks populated list of locks held by transaction - or an 
empty map if none.
     * @return current fate and lock status
     */
 -  private FateStatus getTransactionStatus(ReadOnlyTStore<T> zs, Set<Long> 
filterTxid,
 -      EnumSet<TStatus> filterStatus, Map<Long,List<String>> heldLocks,
 -      Map<Long,List<String>> waitingLocks) {
 +  private FateStatus 
getTransactionStatus(Map<FateInstanceType,ReadOnlyFateStore<T>> fateStores,
 +      Set<FateId> fateIdFilter, EnumSet<TStatus> statusFilter,
 +      EnumSet<FateInstanceType> typesFilter, Map<FateId,List<String>> 
heldLocks,
 +      Map<FateId,List<String>> waitingLocks) {
 +    final List<TransactionStatus> statuses = new ArrayList<>();
  
 -    List<Long> transactions = zs.list();
 -    List<TransactionStatus> statuses = new ArrayList<>(transactions.size());
 +    fateStores.forEach((type, store) -> {
 +      try (Stream<FateId> fateIds = 
store.list().map(FateIdStatus::getFateId)) {
 +        fateIds.forEach(fateId -> {
  
 -    for (Long tid : transactions) {
 -      try {
 -        zs.reserve(tid);
 -
 -        String txName = (String) zs.getTransactionInfo(tid, 
Fate.TxInfo.TX_NAME);
 +          ReadOnlyFateTxStore<T> txStore = store.read(fateId);
++          try {
++            String txName = (String) 
txStore.getTransactionInfo(Fate.TxInfo.TX_NAME);
  
-           String txName = (String) 
txStore.getTransactionInfo(Fate.TxInfo.TX_NAME);
- 
-           List<String> hlocks = heldLocks.remove(fateId);
 -        List<String> hlocks = heldLocks.remove(tid);
++            List<String> hlocks = heldLocks.remove(fateId);
  
-           if (hlocks == null) {
-             hlocks = Collections.emptyList();
-           }
 -        if (hlocks == null) {
 -          hlocks = Collections.emptyList();
 -        }
 -
 -        List<String> wlocks = waitingLocks.remove(tid);
++            if (hlocks == null) {
++              hlocks = Collections.emptyList();
++            }
  
-           List<String> wlocks = waitingLocks.remove(fateId);
 -        if (wlocks == null) {
 -          wlocks = Collections.emptyList();
 -        }
++            List<String> wlocks = waitingLocks.remove(fateId);
  
-           if (wlocks == null) {
-             wlocks = Collections.emptyList();
-           }
 -        String top = null;
 -        ReadOnlyRepo<T> repo = zs.top(tid);
 -        if (repo != null) {
 -          top = repo.getName();
 -        }
++            if (wlocks == null) {
++              wlocks = Collections.emptyList();
++            }
  
-           String top = null;
-           ReadOnlyRepo<T> repo = txStore.top();
-           if (repo != null) {
-             top = repo.getName();
-           }
 -        TStatus status = zs.getStatus(tid);
++            String top = null;
++            ReadOnlyRepo<T> repo = txStore.top();
++            if (repo != null) {
++              top = repo.getName();
++            }
  
-           TStatus status = txStore.getStatus();
 -        long timeCreated = zs.timeCreated(tid);
++            TStatus status = txStore.getStatus();
  
-           long timeCreated = txStore.timeCreated();
 -        zs.unreserve(tid, Duration.ZERO);
++            long timeCreated = txStore.timeCreated();
  
-           if (includeByStatus(status, statusFilter) && 
includeByFateId(fateId, fateIdFilter)
-               && includeByInstanceType(fateId.getType(), typesFilter)) {
-             statuses.add(new TransactionStatus(fateId, type, status, txName, 
hlocks, wlocks, top,
-                 timeCreated));
 -        if (includeByStatus(status, filterStatus) && includeByTxid(tid, 
filterTxid)) {
 -          statuses
 -              .add(new TransactionStatus(tid, status, txName, hlocks, wlocks, 
top, timeCreated));
 -        }
 -      } catch (Exception e) {
 -        // If the cause of the Exception is a NoNodeException, it should be 
ignored as this
 -        // indicates the transaction has completed between the time the list 
of transactions was
 -        // acquired and the time the transaction was probed for info.
 -        boolean nne = false;
 -        Throwable cause = e;
 -        while (cause != null) {
 -          if (cause instanceof KeeperException.NoNodeException) {
 -            nne = true;
 -            break;
++            if (includeByStatus(status, statusFilter) && 
includeByFateId(fateId, fateIdFilter)
++                && includeByInstanceType(fateId.getType(), typesFilter)) {
++              statuses.add(new TransactionStatus(fateId, type, status, 
txName, hlocks, wlocks, top,
++                  timeCreated));
++            }
++          } catch (Exception e) {
++            // If the cause of the Exception is a NoNodeException, it should 
be ignored as this
++            // indicates the transaction has completed between the time the 
list of transactions was
++            // acquired and the time the transaction was probed for info.
++            boolean nne = false;
++            Throwable cause = e;
++            while (cause != null) {
++              if (cause instanceof KeeperException.NoNodeException) {
++                nne = true;
++                break;
++              }
++              cause = cause.getCause();
++            }
++            if (!nne) {
++              throw e;
++            }
++            log.debug("Tried to get info on a since completed transaction - 
ignoring {} ", fateId);
            }
 -          cause = cause.getCause();
 -        }
 -        if (!nne) {
 -          throw e;
 -        }
 -        log.debug("Tried to get info on a since completed transaction - 
ignoring "
 -            + FateTxId.formatTid(tid));
 +        });
        }
 -    }
 -
 +    });
      return new FateStatus(statuses, heldLocks, waitingLocks);
 +  }
  
 +  private boolean includeByStatus(TStatus status, EnumSet<TStatus> 
statusFilter) {
 +    return statusFilter == null || statusFilter.isEmpty() || 
statusFilter.contains(status);
    }
  
 -  private boolean includeByStatus(TStatus status, EnumSet<TStatus> 
filterStatus) {
 -    return (filterStatus == null) || filterStatus.contains(status);
 +  private boolean includeByFateId(FateId fateId, Set<FateId> fateIdFilter) {
 +    return fateIdFilter == null || fateIdFilter.isEmpty() || 
fateIdFilter.contains(fateId);
    }
  
 -  private boolean includeByTxid(Long tid, Set<Long> filterTxid) {
 -    return (filterTxid == null) || filterTxid.isEmpty() || 
filterTxid.contains(tid);
 +  private boolean includeByInstanceType(FateInstanceType type,
 +      EnumSet<FateInstanceType> typesFilter) {
 +    return typesFilter == null || typesFilter.isEmpty() || 
typesFilter.contains(type);
    }
  
 -  public void printAll(ReadOnlyTStore<T> zs, ZooReader zk,
 +  public void printAll(Map<FateInstanceType,ReadOnlyFateStore<T>> fateStores, 
ZooReader zk,
        ServiceLock.ServiceLockPath tableLocksPath) throws KeeperException, 
InterruptedException {
 -    print(zs, zk, tableLocksPath, new Formatter(System.out), null, null);
 +    print(fateStores, zk, tableLocksPath, new Formatter(System.out), null, 
null, null);
    }
  
 -  public void print(ReadOnlyTStore<T> zs, ZooReader zk, 
ServiceLock.ServiceLockPath tableLocksPath,
 -      Formatter fmt, Set<Long> filterTxid, EnumSet<TStatus> filterStatus)
 +  public void print(Map<FateInstanceType,ReadOnlyFateStore<T>> fateStores, 
ZooReader zk,
 +      ServiceLock.ServiceLockPath tableLocksPath, Formatter fmt, Set<FateId> 
fateIdFilter,
 +      EnumSet<TStatus> statusFilter, EnumSet<FateInstanceType> typesFilter)
        throws KeeperException, InterruptedException {
 -    FateStatus fateStatus = getStatus(zs, zk, tableLocksPath, filterTxid, 
filterStatus);
 +    FateStatus fateStatus =
 +        getStatus(fateStores, zk, tableLocksPath, fateIdFilter, statusFilter, 
typesFilter);
  
      for (TransactionStatus txStatus : fateStatus.getTransactions()) {
        fmt.format(
diff --cc 
server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index a07b934f4c,fbba00bab6..2922a3999e
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@@ -61,9 -62,10 +62,11 @@@ import org.apache.accumulo.core.conf.Si
  import org.apache.accumulo.core.data.NamespaceId;
  import org.apache.accumulo.core.data.TableId;
  import org.apache.accumulo.core.dataImpl.KeyExtent;
 +import org.apache.accumulo.core.fate.FateId;
  import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
  import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+ import org.apache.accumulo.core.file.FileOperations;
+ import org.apache.accumulo.core.file.FileSKVIterator;
  import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil;
  import org.apache.accumulo.core.lock.ServiceLock;
  import org.apache.accumulo.core.lock.ServiceLock.LockLossReason;
@@@ -107,7 -109,8 +111,8 @@@ import org.apache.accumulo.server.fs.Vo
  import org.apache.accumulo.server.rpc.ServerAddress;
  import org.apache.accumulo.server.rpc.TServerUtils;
  import org.apache.accumulo.server.rpc.ThriftProcessorTypes;
 -import org.apache.accumulo.server.zookeeper.TransactionWatcher;
 +import org.apache.accumulo.tserver.log.LogSorter;
+ import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
  import org.apache.thrift.TException;
  import org.apache.thrift.transport.TTransportException;
diff --cc 
test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
index b77ec14a8e,0d7936823b..1bcd075577
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
@@@ -37,9 -38,15 +39,14 @@@ import java.util.concurrent.atomic.Atom
  import org.apache.accumulo.core.client.Accumulo;
  import org.apache.accumulo.core.client.AccumuloClient;
  import org.apache.accumulo.core.client.IteratorSetting;
+ import org.apache.accumulo.core.client.admin.CompactionConfig;
  import org.apache.accumulo.core.compaction.thrift.TCompactionState;
+ import org.apache.accumulo.core.conf.Property;
+ import org.apache.accumulo.core.data.TableId;
  import org.apache.accumulo.core.iterators.IteratorUtil;
+ import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+ import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 +import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
  import org.apache.accumulo.core.util.compaction.RunningCompactionInfo;
  import org.apache.accumulo.core.util.threads.Threads;
  import org.apache.accumulo.harness.AccumuloClusterHarness;
@@@ -97,7 -103,7 +104,7 @@@ public class ExternalCompactionProgress
        client.tableOperations().attachIterator(table1, setting,
            EnumSet.of(IteratorUtil.IteratorScope.majc));
        log.info("Compacting table");
-       compact(client, table1, 2, "DCQ1", true);
 -      compact(client, table1, 2, QUEUE1, true);
++      compact(client, table1, 2, GROUP1, true);
        verify(client, table1, 2, ROWS);
  
        log.info("Done Compacting table");
@@@ -108,6 -114,81 +115,72 @@@
      }
    }
  
+   @Test
+   public void testProgressWithBulkImport() throws Exception {
+     /*
+      * Tests the progress of an external compaction done on a table with bulk 
imported files.
+      * Progress should stay 0-100. There was previously a bug with the 
Compactor showing a >100%
+      * progress for compactions with bulk import files.
+      */
+     String[] tableNames = getUniqueNames(2);
+     String tableName1 = tableNames[0];
+     String tableName2 = tableNames[1];
+ 
+     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+       log.info("Creating table " + tableName1);
+       createTable(client, tableName1, "cs1");
+       log.info("Creating table " + tableName2);
+       createTable(client, tableName2, "cs1");
+       log.info("Writing " + ROWS + " rows to table " + tableName1);
+       writeData(client, tableName1, ROWS);
+       log.info("Writing " + ROWS + " rows to table " + tableName2);
+       writeData(client, tableName2, ROWS);
+       // This is done to avoid system compactions
+       client.tableOperations().setProperty(tableName1, 
Property.TABLE_MAJC_RATIO.getKey(), "1000");
+       client.tableOperations().setProperty(tableName2, 
Property.TABLE_MAJC_RATIO.getKey(), "1000");
+ 
 -      
getCluster().getClusterControl().startCoordinator(CompactionCoordinator.class);
 -      getCluster().getClusterControl().startCompactors(Compactor.class, 1, 
QUEUE1);
 -
+       String dir = getDir(client, tableName1);
+ 
+       log.info("Bulk importing files in dir " + dir + " to table " + 
tableName2);
+       client.tableOperations().importDirectory(dir).to(tableName2).load();
+       log.info("Finished bulk import");
+ 
+       log.info("Starting a compaction progress checker thread");
+       Thread checkerThread = startChecker();
+       checkerThread.start();
+ 
+       log.info("Attaching a slow iterator to table " + tableName2);
+       IteratorSetting setting = new IteratorSetting(50, "Slow", 
SlowIterator.class);
+       SlowIterator.setSleepTime(setting, 1);
+ 
+       log.info("Compacting table " + tableName2);
+       client.tableOperations().compact(tableName2,
+           new 
CompactionConfig().setWait(true).setIterators(List.of(setting)));
+       log.info("Finished compacting table " + tableName2);
+       compactionFinished.set(true);
+ 
+       log.info("Waiting on progress checker thread");
+       checkerThread.join();
+ 
+       verifyProgress();
 -    } finally {
 -      getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
 -      
getCluster().getClusterControl().stopAllServers(ServerType.COMPACTION_COORDINATOR);
+     }
+   }
+ 
+   /**
+    * @param client an AccumuloClient
+    * @param tableName the table name
+    * @return the directory of the files for the table
+    */
+   private String getDir(AccumuloClient client, String tableName) {
+     var tableId = 
TableId.of(client.tableOperations().tableIdMap().get(tableName));
+ 
+     try (var tabletsMeta = TabletsMetadata.builder(client).forTable(tableId)
+         .fetch(TabletMetadata.ColumnType.FILES).build()) {
+       return 
tabletsMeta.iterator().next().getFiles().iterator().next().getPath().getParent()
+           .toUri().getRawPath();
+     }
+   }
+ 
    public Thread startChecker() {
      return Threads.createThread("RC checker", () -> {
        try {

Reply via email to