This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit ac49b40019da00668241b6ba43c71e6a5c0846cb
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Thu Mar 11 20:05:14 2021 -0500

    With these changes was able to run an external compaction
---
 .../core/spi/compaction/CompactionJob.java         |  1 +
 .../server/compaction/RetryableThriftCall.java     |  3 +-
 .../coordinator/CompactionCoordinator.java         | 20 +++++++++++--
 .../accumulo/coordinator/QueueAndPriority.java     |  3 +-
 .../org/apache/accumulo/compactor/Compactor.java   | 11 ++++---
 .../accumulo/tserver/ThriftClientHandler.java      |  1 +
 .../tserver/compactions/CompactionManager.java     |  4 +++
 .../tserver/compactions/CompactionService.java     |  5 ++--
 .../tserver/compactions/ExternalCompactionJob.java | 18 ++++++-----
 .../accumulo/tserver/tablet/CompactableImpl.java   | 35 +++++++++++++---------
 10 files changed, 68 insertions(+), 33 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionJob.java 
b/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionJob.java
index d651a70..8809018 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionJob.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionJob.java
@@ -30,6 +30,7 @@ import 
org.apache.accumulo.core.client.admin.compaction.CompactableFile;
  */
 public interface CompactionJob {
 
+  // CBUG use a lower cardinality type for priority
   long getPriority();
 
   /**
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftCall.java
 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftCall.java
index 6e38662..ae5af71 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftCall.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftCall.java
@@ -101,8 +101,7 @@ public class RetryableThriftCall<T> {
       try {
         result = function.execute();
       } catch (TException e) {
-        LOG.error("Error in Thrift function, retrying in {}ms. Error: {}", 
waitTime,
-            e.getMessage());
+        LOG.error("Error in Thrift function, retrying in {}ms. Error: {}", 
waitTime, e);
         if (!retryForever) {
           numRetries++;
           if (numRetries > maxNumRetries) {
diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index b7f54b4..bc5d4c6 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -194,15 +194,16 @@ public class CompactionCoordinator extends AbstractServer
           synchronized (QUEUES) {
             TabletClientService.Client client = null;
             try {
+              LOG.debug("contacting tserver " + tsi.getHostPort());
               client = getTabletServerConnection(tsi);
               List<TCompactionQueueSummary> summaries =
                   client.getCompactionQueueInfo(TraceUtil.traceInfo(), 
getContext().rpcCreds());
               summaries.forEach(summary -> {
                 QueueAndPriority qp =
                     QueueAndPriority.get(summary.getQueue().intern(), 
summary.getPriority());
-                QUEUES.putIfAbsent(qp.getQueue(), new TreeMap<>())
-                    .putIfAbsent(qp.getPriority(), new 
LinkedHashSet<>()).add(tsi);
-                INDEX.putIfAbsent(tsi, new HashSet<>()).add(qp);
+                QUEUES.computeIfAbsent(qp.getQueue(), k -> new TreeMap<>())
+                    .computeIfAbsent(qp.getPriority(), k -> new 
LinkedHashSet<>()).add(tsi);
+                INDEX.computeIfAbsent(tsi, k -> new HashSet<>()).add(qp);
               });
             } finally {
               ThriftUtil.returnClient(client);
@@ -264,6 +265,8 @@ public class CompactionCoordinator extends AbstractServer
   @Override
   public TExternalCompactionJob getCompactionJob(String queueName, String 
compactorAddress)
       throws TException {
+    // CBUG need to use and check for system credentials
+    LOG.debug("getCompactionJob " + queueName + " " + compactorAddress);
     String queue = queueName.intern();
     TServerInstance tserver = null;
     Long priority = null;
@@ -310,6 +313,8 @@ public class CompactionCoordinator extends AbstractServer
           getContext().rpcCreds(), queue, priority, compactorAddress);
       RUNNING.put(job.getExternalCompactionId(),
           new RunningCompaction(job, compactorAddress, tserver));
+      LOG.debug(
+          "Returning external job id:" + job.externalCompactionId + " to " + 
compactorAddress);
       return job;
     } finally {
       ThriftUtil.returnClient(client);
@@ -430,6 +435,15 @@ public class CompactionCoordinator extends AbstractServer
 
       // TODO: If the call above fails, the RUNNING entry will be orphaned
 
+      /**
+       * One possible way to handle tserver down is to fall back to writing a 
completion entry to
+       * the metadata table. Could be something like row=~extcomp:<uuid> 
family=status
+       * qualifier=complete The Coordinator can periodically scan this portion 
of the metadata table
+       * and let tablets know. For expediency could still make RPC first to 
let tserver know its
+       * done and if that fails could fall back to writing to metadata table. 
The coordinator could
+       * read and write to the metadata table section.
+       */
+
     }
   }
 
diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueAndPriority.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueAndPriority.java
index 641fa97..d8e7fdc 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueAndPriority.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueAndPriority.java
@@ -27,7 +27,8 @@ public class QueueAndPriority implements 
Comparable<QueueAndPriority> {
   private static WeakHashMap<Pair<String,Long>,QueueAndPriority> CACHE = new 
WeakHashMap<>();
 
   public static QueueAndPriority get(String queue, Long priority) {
-    return CACHE.putIfAbsent(new Pair<>(queue, priority), new 
QueueAndPriority(queue, priority));
+    return CACHE.computeIfAbsent(new Pair<>(queue, priority),
+        k -> new QueueAndPriority(queue, priority));
   }
 
   private final String queue;
diff --git 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 44a397b..73e790d 100644
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -256,15 +256,18 @@ public class Compactor extends AbstractServer
    */
   protected void updateCompactionState(TExternalCompactionJob job, 
CompactionState state,
       String message) throws RetriesExceededException {
-    RetryableThriftCall<Void> thriftCall = new RetryableThriftCall<>(1000,
-        RetryableThriftCall.MAX_WAIT_TIME, 25, new 
RetryableThriftFunction<Void>() {
+    // CBUG the return type was changed from Void to String just to make this 
work. When type was
+    // Void and returned null, it would retry forever. Could specialize 
RetryableThriftCall for case
+    // w/ not return type.
+    RetryableThriftCall<String> thriftCall = new RetryableThriftCall<>(1000,
+        RetryableThriftCall.MAX_WAIT_TIME, 25, new 
RetryableThriftFunction<String>() {
           @Override
-          public Void execute() throws TException {
+          public String execute() throws TException {
             try {
               coordinatorClient.compareAndSet(null, getCoordinatorClient());
               
coordinatorClient.get().updateCompactionStatus(job.getExternalCompactionId(), 
state,
                   message, System.currentTimeMillis());
-              return null;
+              return "";
             } catch (TException e) {
               ThriftUtil.returnClient(coordinatorClient.getAndSet(null));
               throw e;
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java
index 674cdef..e1897b2 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java
@@ -1687,6 +1687,7 @@ class ThriftClientHandler extends ClientServiceHandler 
implements TabletClientSe
       return extCompaction.toThrift();
     }
 
+    // CBUG thrift may not support null return types 
https://thrift.apache.org/docs/features.html
     return null;
   }
 
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
index 8626e14..392c7c1 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
@@ -407,10 +407,14 @@ public class CompactionManager {
 
   public ExternalCompactionJob reserveExternalCompaction(String queueName, 
long priority,
       String compactorId) {
+    log.debug("Attempting to reserved external compaction queue:{} priority:{} 
compactor:{}",
+        queueName, priority, compactorId);
+
     ExternalCompactionExecutor extCE = getExternalExecutor(queueName);
     var ecJob = extCE.reserveExternalCompaction(priority, compactorId);
     if (ecJob != null) {
       runningExternalCompactions.put(ecJob.getExternalCompactionId(), 
ecJob.getExtent());
+      log.debug("Reserved external compaction ecid:{}", 
ecJob.getExternalCompactionId());
     }
     return ecJob;
   }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
index a1febbe..c7fa6c1 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
@@ -158,8 +158,9 @@ public class CompactionService {
 
     this.rateLimit.set(maxRate);
 
-    // TODO it may make sense to move the rate limit config to the planner and 
executors... it makes
-    // no sense at the service level for a mix of internal and external 
compactions
+    // CBUG it may make sense to move the rate limit config to the planner and 
executors... it makes
+    // no sense at the service level for a mix of internal and external 
compactions... makes a lot
+    // more sense at the executor level
     this.readLimiter = 
SharedRateLimiterFactory.getInstance(this.serverCtx.getConfiguration())
         .create("CS_" + serviceName + "_read", () -> rateLimit.get());
     this.writeLimiter = 
SharedRateLimiterFactory.getInstance(this.serverCtx.getConfiguration())
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java
index 8c0b794..4e30ea3 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java
@@ -19,6 +19,7 @@
 package org.apache.accumulo.tserver.compactions;
 
 import java.util.List;
+import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
@@ -49,14 +50,14 @@ public class ExternalCompactionJob {
   public ExternalCompactionJob(Set<StoredTabletFile> jobFiles, boolean 
propogateDeletes,
       TabletFile compactTmpName, KeyExtent extent, UUID externalCompactionId, 
long priority,
       CompactionKind kind, List<IteratorSetting> iters) {
-    this.jobFiles = jobFiles;
+    this.jobFiles = Objects.requireNonNull(jobFiles);
     this.propogateDeletes = propogateDeletes;
-    this.compactTmpName = compactTmpName;
-    this.extent = extent;
-    this.externalCompactionId = externalCompactionId;
+    this.compactTmpName = Objects.requireNonNull(compactTmpName);
+    this.extent = Objects.requireNonNull(extent);
+    this.externalCompactionId = Objects.requireNonNull(externalCompactionId);
     this.priority = priority;
-    this.kind = kind;
-    this.iters = iters;
+    this.kind = Objects.requireNonNull(kind);
+    this.iters = Objects.requireNonNull(iters);
   }
 
   public TExternalCompactionJob toThrift() {
@@ -89,10 +90,13 @@ public class ExternalCompactionJob {
     List<InputFile> files = jobFiles.stream().map(stf -> new 
InputFile(stf.getPathStr(), 0, 0, 0))
         .collect(Collectors.toList());
 
+    // CBUG there seem to be two CompactionKind thrift types
+    // CBUG rename CompactionKind thrift type to TCompactionKind
     // TODO priority cast and compactionId cast... compactionId could be null 
I think
     return new TExternalCompactionJob(externalCompactionId.toString(), 
extent.toThrift(), files,
         (int) priority, readRate, writeRate, iteratorSettings, type, reason,
-        compactTmpName.getPathStr(), propogateDeletes, null);
+        compactTmpName.getPathStr(), propogateDeletes,
+        
org.apache.accumulo.core.tabletserver.thrift.CompactionKind.valueOf(kind.name()));
   }
 
   public UUID getExternalCompactionId() {
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
index 8938027..3d1f5d5 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
@@ -561,7 +561,7 @@ public class CompactableImpl implements Compactable {
   private CompactionInfo reserveFilesForCompaction(CompactionServiceId 
service, CompactionJob job) {
     CompactionInfo cInfo = new CompactionInfo();
 
-    Set<StoredTabletFile> jobFiles = job.getFiles().stream()
+    cInfo.jobFiles = job.getFiles().stream()
         .map(cf -> ((CompactableFileImpl) 
cf).getStortedTabletFile()).collect(Collectors.toSet());
 
     if (job.getKind() == CompactionKind.USER)
@@ -584,9 +584,10 @@ public class CompactableImpl implements Compactable {
         case SELECTED: {
           if (job.getKind() == CompactionKind.USER || job.getKind() == 
CompactionKind.SELECTOR) {
             if (selectKind == job.getKind()) {
-              if (!selectedFiles.containsAll(jobFiles)) {
+              if (!selectedFiles.containsAll(cInfo.jobFiles)) {
                 log.error("Ignoring {} compaction that does not contain 
selected files {} {} {}",
-                    job.getKind(), getExtent(), asFileNames(selectedFiles), 
asFileNames(jobFiles));
+                    job.getKind(), getExtent(), asFileNames(selectedFiles),
+                    asFileNames(cInfo.jobFiles));
                 return null;
               }
             } else {
@@ -594,9 +595,9 @@ public class CompactableImpl implements Compactable {
                   getExtent());
               return null;
             }
-          } else if (!Collections.disjoint(selectedFiles, jobFiles)) {
+          } else if (!Collections.disjoint(selectedFiles, cInfo.jobFiles)) {
             log.trace("Ingoring compaction that overlaps with selected files 
{} {} {}", getExtent(),
-                job.getKind(), asFileNames(Sets.intersection(selectedFiles, 
jobFiles)));
+                job.getKind(), asFileNames(Sets.intersection(selectedFiles, 
cInfo.jobFiles)));
             return null;
           }
           break;
@@ -614,8 +615,8 @@ public class CompactableImpl implements Compactable {
           throw new AssertionError();
       }
 
-      if (Collections.disjoint(allCompactingFiles, jobFiles)) {
-        allCompactingFiles.addAll(jobFiles);
+      if (Collections.disjoint(allCompactingFiles, cInfo.jobFiles)) {
+        allCompactingFiles.addAll(cInfo.jobFiles);
         runnningJobs.add(job);
       } else {
         return null;
@@ -627,7 +628,8 @@ public class CompactableImpl implements Compactable {
         case SELECTOR:
         case USER:
           Preconditions.checkState(selectStatus == SpecialStatus.SELECTED);
-          if (job.getKind() == selectKind && selectedAll && 
jobFiles.containsAll(selectedFiles)) {
+          if (job.getKind() == selectKind && selectedAll
+              && cInfo.jobFiles.containsAll(selectedFiles)) {
             cInfo.propogateDeletes = false;
           }
           break;
@@ -640,7 +642,7 @@ public class CompactableImpl implements Compactable {
       }
 
       if (job.getKind() == CompactionKind.USER && selectKind == job.getKind()
-          && selectedFiles.equals(jobFiles)) {
+          && selectedFiles.equals(cInfo.jobFiles)) {
         cInfo.compactionId = this.compactionId;
       }
 
@@ -729,9 +731,9 @@ public class CompactableImpl implements Compactable {
     if (cInfo == null)
       return null;
 
-    // TODO add external compaction info to metadata table
+    // CBUG add external compaction info to metadata table
     try {
-      // TODO share code w/ CompactableUtil and/or move there
+      // CBUG share code w/ CompactableUtil and/or move there
       cInfo.newFile = tablet.getNextMapFilename(!cInfo.propogateDeletes ? "A" 
: "C");
       cInfo.compactTmpName = new TabletFile(new 
Path(cInfo.newFile.getMetaInsert() + "_tmp"));
 
@@ -741,35 +743,40 @@ public class CompactableImpl implements Compactable {
 
       externalCompactions.put(externalCompactionId, cInfo);
 
-      // TODO because this is an RPC the return may never get to the caller... 
however the caller
+      // CBUG because this is an RPC the return may never get to the caller... 
however the caller
       // may be alive.... maybe the caller can set the externalCompactionId it 
working on in ZK
       return new ExternalCompactionJob(cInfo.jobFiles, cInfo.propogateDeletes, 
cInfo.compactTmpName,
           getExtent(), externalCompactionId, job.getPriority(), job.getKind(), 
cInfo.iters);
 
     } catch (Exception e) {
-      // TODO unreserve files for compaction!
+      // CBUG unreserve files for compaction!
       throw new RuntimeException(e);
     }
   }
 
   @Override
   public void commitExternalCompaction(UUID extCompactionId, long fileSize, 
long entries) {
-    // TODO double check w/ java docs that only one thread can remove
+    // CBUG double check w/ java docs that only one thread can remove
     CompactionInfo cInfo = externalCompactions.remove(extCompactionId);
 
     if (cInfo != null) {
+      log.debug("Attempting to commit external compaction {}", 
extCompactionId);
       // TODO do a sanity check that files exists in dfs?
       StoredTabletFile metaFile = null;
       try {
         metaFile = 
tablet.getDatafileManager().bringMajorCompactionOnline(cInfo.jobFiles,
             cInfo.compactTmpName, cInfo.newFile, compactionId,
             new DataFileValue(fileSize, entries));
+        TabletLogger.compacted(getExtent(), cInfo.job, metaFile);
       } catch (Exception e) {
         metaFile = null;
         throw new RuntimeException(e);
       } finally {
         completeCompaction(cInfo.job, cInfo.jobFiles, metaFile);
       }
+    } else {
+      log.debug("Ignoring request to commit external compaction that is 
unknown {}",
+          extCompactionId);
     }
   }
 

Reply via email to