This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 212e3cd27c updates bulk import and compaction logging (#4440)
212e3cd27c is described below

commit 212e3cd27cf9701e750ca8c3a2566a74d88867f3
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Wed Apr 10 10:56:01 2024 -0400

    updates bulk import and compaction logging (#4440)
---
 .../coordinator/commit/CommitCompaction.java       | 20 ++++----
 .../manager/tableOps/bulkVer2/LoadFiles.java       | 54 +++++++++++++---------
 .../manager/tableOps/compact/CompactionDriver.java |  2 -
 3 files changed, 41 insertions(+), 35 deletions(-)

diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
index cc0432d4a0..7add060466 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
@@ -130,10 +130,11 @@ public class CommitCompaction extends ManagerRepo {
         tabletMutator
             .submit(tabletMetadata -> 
!tabletMetadata.getExternalCompactions().containsKey(ecid));
 
-        // TODO expensive logging
-        LOG.debug("Compaction completed {} added {} removed {}", 
tablet.getExtent(), newDatafile,
-            ecm.getJobFiles().stream().map(AbstractTabletFile::getFileName)
-                .collect(Collectors.toList()));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Compaction completed {} added {} removed {}", 
tablet.getExtent(), newDatafile,
+              ecm.getJobFiles().stream().map(AbstractTabletFile::getFileName)
+                  .collect(Collectors.toList()));
+        }
 
         var result = tabletsMutator.process().get(getExtent());
         if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) {
@@ -159,7 +160,7 @@ public class CommitCompaction extends ManagerRepo {
   private void updateTabletForCompaction(TCompactionStats stats, 
ExternalCompactionId ecid,
       TabletMetadata tablet, Optional<ReferencedTabletFile> newDatafile, 
CompactionMetadata ecm,
       Ample.ConditionalTabletMutator tabletMutator) {
-    // ELASTICITY_TODO improve logging adapt to use existing tablet files 
logging
+
     if (ecm.getKind() == CompactionKind.USER) {
       if (tablet.getSelectedFiles().getFiles().equals(ecm.getJobFiles())) {
         // all files selected for the user compactions are finished, so the 
tablet is finish and
@@ -171,8 +172,7 @@ public class CommitCompaction extends ManagerRepo {
             "Tablet %s unexpected has selected files and compacted columns for 
%s",
             tablet.getExtent(), fateId);
 
-        // TODO set to trace
-        LOG.debug("All selected files compacted for {} setting compacted for 
{}",
+        LOG.trace("All selected files compacted for {} setting compacted for 
{}",
             tablet.getExtent(), tablet.getSelectedFiles().getFateId());
 
         tabletMutator.deleteSelectedFiles();
@@ -187,14 +187,12 @@ public class CommitCompaction extends ManagerRepo {
         newSelectedFileSet.removeAll(ecm.getJobFiles());
 
         if (newDatafile.isPresent()) {
-          // TODO set to trace
-          LOG.debug(
+          LOG.trace(
               "Not all selected files for {} are done, adding new selected 
file {} from compaction",
               tablet.getExtent(), 
newDatafile.orElseThrow().getPath().getName());
           newSelectedFileSet.add(newDatafile.orElseThrow().insert());
         } else {
-          // TODO set to trace
-          LOG.debug(
+          LOG.trace(
               "Not all selected files for {} are done, compaction produced no 
output so not adding to selected set.",
               tablet.getExtent());
         }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
index d336358700..1689ae4359 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.clientImpl.bulk.Bulk;
@@ -44,9 +45,11 @@ import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
 import org.apache.accumulo.core.fate.FateId;
 import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.core.logging.TabletLogger;
 import org.apache.accumulo.core.manager.thrift.BulkImportState;
 import org.apache.accumulo.core.metadata.ReferencedTabletFile;
 import org.apache.accumulo.core.metadata.StoredTabletFile;
+import org.apache.accumulo.core.metadata.TabletFile;
 import org.apache.accumulo.core.metadata.schema.Ample;
 import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
@@ -67,6 +70,7 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
 import com.google.common.net.HostAndPort;
 
 /**
@@ -112,6 +116,7 @@ class LoadFiles extends ManagerRepo {
     protected FateId fateId;
     protected boolean setTime;
     Ample.ConditionalTabletsMutator conditionalMutator;
+    private Map<KeyExtent,List<TabletFile>> loadingFiles;
 
     private long skipped = 0;
 
@@ -122,6 +127,7 @@ class LoadFiles extends ManagerRepo {
       this.setTime = setTime;
       conditionalMutator = 
manager.getContext().getAmple().conditionallyMutateTablets();
       this.skipped = 0;
+      this.loadingFiles = new HashMap<>();
     }
 
     void load(List<TabletMetadata> tablets, Files files) {
@@ -213,6 +219,11 @@ class LoadFiles extends ManagerRepo {
             tabletMutator.putTime(tabletTime.getMetadataTime());
           }
 
+          // Hang on to for logging purposes in the case where the update is a
+          // success.
+          Preconditions.checkState(
+              loadingFiles.put(tablet.getExtent(), 
List.copyOf(filesToLoad.keySet())) == null);
+
           tabletMutator.submit(tm -> false);
         }
       }
@@ -279,30 +290,29 @@ class LoadFiles extends ManagerRepo {
     long finish() {
       var results = conditionalMutator.process();
 
-      boolean allDone =
-          results.values().stream().allMatch(result -> result.getStatus() == 
Status.ACCEPTED)
-              && skipped == 0;
-
-      long sleepTime = 0;
-      if (!allDone) {
-        sleepTime = 1000;
-
-        results.forEach((extent, condResult) -> {
-          if (condResult.getStatus() != Status.ACCEPTED) {
-            var metadata = condResult.readMetadata();
-            if (metadata == null) {
-              log.debug("Tablet update failed, tablet is gone {} {} {}", 
fateId, extent,
-                  condResult.getStatus());
-            } else {
-              log.debug("Tablet update failed {} {} {} {} {} {}", fateId, 
extent,
-                  condResult.getStatus(), metadata.getOperationId(), 
metadata.getLocation(),
-                  metadata.getLoaded());
-            }
+      AtomicBoolean seenFailure = new AtomicBoolean(false);
+      results.forEach((extent, condResult) -> {
+        if (condResult.getStatus() == Status.ACCEPTED) {
+          loadingFiles.get(extent).forEach(file -> 
TabletLogger.bulkImported(extent, file));
+        } else {
+          seenFailure.set(true);
+          var metadata = condResult.readMetadata();
+          if (metadata == null) {
+            log.debug("Tablet update failed, tablet is gone {} {} {}", fateId, 
extent,
+                condResult.getStatus());
+          } else {
+            log.debug("Tablet update failed {} {} {} {} {} {}", fateId, extent,
+                condResult.getStatus(), metadata.getOperationId(), 
metadata.getLocation(),
+                metadata.getLoaded());
           }
-        });
-      }
+        }
+      });
 
-      return sleepTime;
+      if (seenFailure.get() || skipped != 0) {
+        return 1000;
+      } else {
+        return 0;
+      }
     }
   }
 
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
index 200a48a089..d9424b68fe 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
@@ -346,8 +346,6 @@ class CompactionDriver extends ManagerRepo {
   private void cleanupTabletMetadata(FateId fateId, Manager manager) throws 
Exception {
     var ample = manager.getContext().getAmple();
 
-    // ELASTICITY_TODO use existing compaction logging
-
     boolean allCleanedUp = false;
 
     Retry retry = 
Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100))

Reply via email to