This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 212e3cd27c updates bulk import and compaction logging (#4440) 212e3cd27c is described below commit 212e3cd27cf9701e750ca8c3a2566a74d88867f3 Author: Keith Turner <ktur...@apache.org> AuthorDate: Wed Apr 10 10:56:01 2024 -0400 updates bulk import and compaction logging (#4440) --- .../coordinator/commit/CommitCompaction.java | 20 ++++---- .../manager/tableOps/bulkVer2/LoadFiles.java | 54 +++++++++++++--------- .../manager/tableOps/compact/CompactionDriver.java | 2 - 3 files changed, 41 insertions(+), 35 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java index cc0432d4a0..7add060466 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java @@ -130,10 +130,11 @@ public class CommitCompaction extends ManagerRepo { tabletMutator .submit(tabletMetadata -> !tabletMetadata.getExternalCompactions().containsKey(ecid)); - // TODO expensive logging - LOG.debug("Compaction completed {} added {} removed {}", tablet.getExtent(), newDatafile, - ecm.getJobFiles().stream().map(AbstractTabletFile::getFileName) - .collect(Collectors.toList())); + if (LOG.isDebugEnabled()) { + LOG.debug("Compaction completed {} added {} removed {}", tablet.getExtent(), newDatafile, + ecm.getJobFiles().stream().map(AbstractTabletFile::getFileName) + .collect(Collectors.toList())); + } var result = tabletsMutator.process().get(getExtent()); if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) { @@ -159,7 +160,7 @@ public class CommitCompaction extends ManagerRepo { private void updateTabletForCompaction(TCompactionStats stats, ExternalCompactionId ecid, TabletMetadata tablet, Optional<ReferencedTabletFile> newDatafile, CompactionMetadata ecm, Ample.ConditionalTabletMutator tabletMutator) { - // ELASTICITY_TODO improve logging adapt to use existing tablet files logging + if (ecm.getKind() == CompactionKind.USER) { if (tablet.getSelectedFiles().getFiles().equals(ecm.getJobFiles())) { // all files selected for the user compactions are finished, so the tablet is finish and @@ -171,8 +172,7 @@ public class CommitCompaction extends ManagerRepo { "Tablet %s unexpected has selected files and compacted columns for %s", tablet.getExtent(), fateId); - // TODO set to trace - LOG.debug("All selected files compacted for {} setting compacted for {}", + LOG.trace("All selected files compacted for {} setting compacted for {}", tablet.getExtent(), tablet.getSelectedFiles().getFateId()); tabletMutator.deleteSelectedFiles(); @@ -187,14 +187,12 @@ public class CommitCompaction extends ManagerRepo { newSelectedFileSet.removeAll(ecm.getJobFiles()); if (newDatafile.isPresent()) { - // TODO set to trace - LOG.debug( + LOG.trace( "Not all selected files for {} are done, adding new selected file {} from compaction", tablet.getExtent(), newDatafile.orElseThrow().getPath().getName()); newSelectedFileSet.add(newDatafile.orElseThrow().insert()); } else { - // TODO set to trace - LOG.debug( + LOG.trace( "Not all selected files for {} are done, compaction produced no output so not adding to selected set.", tablet.getExtent()); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java index d336358700..1689ae4359 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import org.apache.accumulo.core.clientImpl.bulk.Bulk; @@ -44,9 +45,11 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.logging.TabletLogger; import org.apache.accumulo.core.manager.thrift.BulkImportState; import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.TabletFile; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; import org.apache.accumulo.core.metadata.schema.DataFileValue; @@ -67,6 +70,7 @@ import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; import com.google.common.net.HostAndPort; /** @@ -112,6 +116,7 @@ class LoadFiles extends ManagerRepo { protected FateId fateId; protected boolean setTime; Ample.ConditionalTabletsMutator conditionalMutator; + private Map<KeyExtent,List<TabletFile>> loadingFiles; private long skipped = 0; @@ -122,6 +127,7 @@ class LoadFiles extends ManagerRepo { this.setTime = setTime; conditionalMutator = manager.getContext().getAmple().conditionallyMutateTablets(); this.skipped = 0; + this.loadingFiles = new HashMap<>(); } void load(List<TabletMetadata> tablets, Files files) { @@ -213,6 +219,11 @@ class LoadFiles extends ManagerRepo { tabletMutator.putTime(tabletTime.getMetadataTime()); } + // Hang on to for logging purposes in the case where the update is a + // success. + Preconditions.checkState( + loadingFiles.put(tablet.getExtent(), List.copyOf(filesToLoad.keySet())) == null); + tabletMutator.submit(tm -> false); } } @@ -279,30 +290,29 @@ class LoadFiles extends ManagerRepo { long finish() { var results = conditionalMutator.process(); - boolean allDone = - results.values().stream().allMatch(result -> result.getStatus() == Status.ACCEPTED) - && skipped == 0; - - long sleepTime = 0; - if (!allDone) { - sleepTime = 1000; - - results.forEach((extent, condResult) -> { - if (condResult.getStatus() != Status.ACCEPTED) { - var metadata = condResult.readMetadata(); - if (metadata == null) { - log.debug("Tablet update failed, tablet is gone {} {} {}", fateId, extent, - condResult.getStatus()); - } else { - log.debug("Tablet update failed {} {} {} {} {} {}", fateId, extent, - condResult.getStatus(), metadata.getOperationId(), metadata.getLocation(), - metadata.getLoaded()); - } + AtomicBoolean seenFailure = new AtomicBoolean(false); + results.forEach((extent, condResult) -> { + if (condResult.getStatus() == Status.ACCEPTED) { + loadingFiles.get(extent).forEach(file -> TabletLogger.bulkImported(extent, file)); + } else { + seenFailure.set(true); + var metadata = condResult.readMetadata(); + if (metadata == null) { + log.debug("Tablet update failed, tablet is gone {} {} {}", fateId, extent, + condResult.getStatus()); + } else { + log.debug("Tablet update failed {} {} {} {} {} {}", fateId, extent, + condResult.getStatus(), metadata.getOperationId(), metadata.getLocation(), + metadata.getLoaded()); } - }); - } + } + }); - return sleepTime; + if (seenFailure.get() || skipped != 0) { + return 1000; + } else { + return 0; + } } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java index 200a48a089..d9424b68fe 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java @@ -346,8 +346,6 @@ class CompactionDriver extends ManagerRepo { private void cleanupTabletMetadata(FateId fateId, Manager manager) throws Exception { var ample = manager.getContext().getAmple(); - // ELASTICITY_TODO use existing compaction logging - boolean allCleanedUp = false; Retry retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100))