This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 1451-external-compactions-feature in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit ac49b40019da00668241b6ba43c71e6a5c0846cb Author: Keith Turner <ktur...@apache.org> AuthorDate: Thu Mar 11 20:05:14 2021 -0500 With these changes was able to run an external compaction --- .../core/spi/compaction/CompactionJob.java | 1 + .../server/compaction/RetryableThriftCall.java | 3 +- .../coordinator/CompactionCoordinator.java | 20 +++++++++++-- .../accumulo/coordinator/QueueAndPriority.java | 3 +- .../org/apache/accumulo/compactor/Compactor.java | 11 ++++--- .../accumulo/tserver/ThriftClientHandler.java | 1 + .../tserver/compactions/CompactionManager.java | 4 +++ .../tserver/compactions/CompactionService.java | 5 ++-- .../tserver/compactions/ExternalCompactionJob.java | 18 ++++++----- .../accumulo/tserver/tablet/CompactableImpl.java | 35 +++++++++++++--------- 10 files changed, 68 insertions(+), 33 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionJob.java b/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionJob.java index d651a70..8809018 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionJob.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionJob.java @@ -30,6 +30,7 @@ import org.apache.accumulo.core.client.admin.compaction.CompactableFile; */ public interface CompactionJob { + // CBUG use a lower cardinality type for priority long getPriority(); /** diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftCall.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftCall.java index 6e38662..ae5af71 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftCall.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftCall.java @@ -101,8 +101,7 @@ public class RetryableThriftCall<T> { try { result = function.execute(); } catch (TException e) { - LOG.error("Error in Thrift function, retrying in {}ms. Error: {}", waitTime, - e.getMessage()); + LOG.error("Error in Thrift function, retrying in {}ms. Error: {}", waitTime, e); if (!retryForever) { numRetries++; if (numRetries > maxNumRetries) { diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index b7f54b4..bc5d4c6 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -194,15 +194,16 @@ public class CompactionCoordinator extends AbstractServer synchronized (QUEUES) { TabletClientService.Client client = null; try { + LOG.debug("contacting tserver " + tsi.getHostPort()); client = getTabletServerConnection(tsi); List<TCompactionQueueSummary> summaries = client.getCompactionQueueInfo(TraceUtil.traceInfo(), getContext().rpcCreds()); summaries.forEach(summary -> { QueueAndPriority qp = QueueAndPriority.get(summary.getQueue().intern(), summary.getPriority()); - QUEUES.putIfAbsent(qp.getQueue(), new TreeMap<>()) - .putIfAbsent(qp.getPriority(), new LinkedHashSet<>()).add(tsi); - INDEX.putIfAbsent(tsi, new HashSet<>()).add(qp); + QUEUES.computeIfAbsent(qp.getQueue(), k -> new TreeMap<>()) + .computeIfAbsent(qp.getPriority(), k -> new LinkedHashSet<>()).add(tsi); + INDEX.computeIfAbsent(tsi, k -> new HashSet<>()).add(qp); }); } finally { ThriftUtil.returnClient(client); @@ -264,6 +265,8 @@ public class CompactionCoordinator extends AbstractServer @Override public TExternalCompactionJob getCompactionJob(String queueName, String compactorAddress) throws TException { + // CBUG need to use and check for system credentials + LOG.debug("getCompactionJob " + queueName + " " + compactorAddress); String queue = queueName.intern(); TServerInstance tserver = null; Long priority = null; @@ -310,6 +313,8 @@ public class CompactionCoordinator extends AbstractServer getContext().rpcCreds(), queue, priority, compactorAddress); RUNNING.put(job.getExternalCompactionId(), new RunningCompaction(job, compactorAddress, tserver)); + LOG.debug( + "Returning external job id:" + job.externalCompactionId + " to " + compactorAddress); return job; } finally { ThriftUtil.returnClient(client); @@ -430,6 +435,15 @@ public class CompactionCoordinator extends AbstractServer // TODO: If the call above fails, the RUNNING entry will be orphaned + /** + * One possible way to handle tserver down is to fall back to writing a completion entry to + * the metadata table. Could be something like row=~extcomp:<uuid> family=status + * qualifier=complete The Coordinator can periodically scan this portion of the metadata table + * and let tablets know. For expediency could still make RPC first to let tserver know its + * done and if that fails could fall back to writing to metadata table. The coordinator could + * read and write to the metadata table section. + */ + } } diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueAndPriority.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueAndPriority.java index 641fa97..d8e7fdc 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueAndPriority.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueAndPriority.java @@ -27,7 +27,8 @@ public class QueueAndPriority implements Comparable<QueueAndPriority> { private static WeakHashMap<Pair<String,Long>,QueueAndPriority> CACHE = new WeakHashMap<>(); public static QueueAndPriority get(String queue, Long priority) { - return CACHE.putIfAbsent(new Pair<>(queue, priority), new QueueAndPriority(queue, priority)); + return CACHE.computeIfAbsent(new Pair<>(queue, priority), + k -> new QueueAndPriority(queue, priority)); } private final String queue; diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 44a397b..73e790d 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -256,15 +256,18 @@ public class Compactor extends AbstractServer */ protected void updateCompactionState(TExternalCompactionJob job, CompactionState state, String message) throws RetriesExceededException { - RetryableThriftCall<Void> thriftCall = new RetryableThriftCall<>(1000, - RetryableThriftCall.MAX_WAIT_TIME, 25, new RetryableThriftFunction<Void>() { + // CBUG the return type was changed from Void to String just to make this work. When type was + // Void and returned null, it would retry forever. Could specialize RetryableThriftCall for case + // w/ not return type. + RetryableThriftCall<String> thriftCall = new RetryableThriftCall<>(1000, + RetryableThriftCall.MAX_WAIT_TIME, 25, new RetryableThriftFunction<String>() { @Override - public Void execute() throws TException { + public String execute() throws TException { try { coordinatorClient.compareAndSet(null, getCoordinatorClient()); coordinatorClient.get().updateCompactionStatus(job.getExternalCompactionId(), state, message, System.currentTimeMillis()); - return null; + return ""; } catch (TException e) { ThriftUtil.returnClient(coordinatorClient.getAndSet(null)); throw e; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java index 674cdef..e1897b2 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java @@ -1687,6 +1687,7 @@ class ThriftClientHandler extends ClientServiceHandler implements TabletClientSe return extCompaction.toThrift(); } + // CBUG thrift may not support null return types https://thrift.apache.org/docs/features.html return null; } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java index 8626e14..392c7c1 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java @@ -407,10 +407,14 @@ public class CompactionManager { public ExternalCompactionJob reserveExternalCompaction(String queueName, long priority, String compactorId) { + log.debug("Attempting to reserved external compaction queue:{} priority:{} compactor:{}", + queueName, priority, compactorId); + ExternalCompactionExecutor extCE = getExternalExecutor(queueName); var ecJob = extCE.reserveExternalCompaction(priority, compactorId); if (ecJob != null) { runningExternalCompactions.put(ecJob.getExternalCompactionId(), ecJob.getExtent()); + log.debug("Reserved external compaction ecid:{}", ecJob.getExternalCompactionId()); } return ecJob; } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java index a1febbe..c7fa6c1 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java @@ -158,8 +158,9 @@ public class CompactionService { this.rateLimit.set(maxRate); - // TODO it may make sense to move the rate limit config to the planner and executors... it makes - // no sense at the service level for a mix of internal and external compactions + // CBUG it may make sense to move the rate limit config to the planner and executors... it makes + // no sense at the service level for a mix of internal and external compactions... makes a lot + // more sense at the executor level this.readLimiter = SharedRateLimiterFactory.getInstance(this.serverCtx.getConfiguration()) .create("CS_" + serviceName + "_read", () -> rateLimit.get()); this.writeLimiter = SharedRateLimiterFactory.getInstance(this.serverCtx.getConfiguration()) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java index 8c0b794..4e30ea3 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java @@ -19,6 +19,7 @@ package org.apache.accumulo.tserver.compactions; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; @@ -49,14 +50,14 @@ public class ExternalCompactionJob { public ExternalCompactionJob(Set<StoredTabletFile> jobFiles, boolean propogateDeletes, TabletFile compactTmpName, KeyExtent extent, UUID externalCompactionId, long priority, CompactionKind kind, List<IteratorSetting> iters) { - this.jobFiles = jobFiles; + this.jobFiles = Objects.requireNonNull(jobFiles); this.propogateDeletes = propogateDeletes; - this.compactTmpName = compactTmpName; - this.extent = extent; - this.externalCompactionId = externalCompactionId; + this.compactTmpName = Objects.requireNonNull(compactTmpName); + this.extent = Objects.requireNonNull(extent); + this.externalCompactionId = Objects.requireNonNull(externalCompactionId); this.priority = priority; - this.kind = kind; - this.iters = iters; + this.kind = Objects.requireNonNull(kind); + this.iters = Objects.requireNonNull(iters); } public TExternalCompactionJob toThrift() { @@ -89,10 +90,13 @@ public class ExternalCompactionJob { List<InputFile> files = jobFiles.stream().map(stf -> new InputFile(stf.getPathStr(), 0, 0, 0)) .collect(Collectors.toList()); + // CBUG there seem to be two CompactionKind thrift types + // CBUG rename CompactionKind thrift type to TCompactionKind // TODO priority cast and compactionId cast... compactionId could be null I think return new TExternalCompactionJob(externalCompactionId.toString(), extent.toThrift(), files, (int) priority, readRate, writeRate, iteratorSettings, type, reason, - compactTmpName.getPathStr(), propogateDeletes, null); + compactTmpName.getPathStr(), propogateDeletes, + org.apache.accumulo.core.tabletserver.thrift.CompactionKind.valueOf(kind.name())); } public UUID getExternalCompactionId() { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java index 8938027..3d1f5d5 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java @@ -561,7 +561,7 @@ public class CompactableImpl implements Compactable { private CompactionInfo reserveFilesForCompaction(CompactionServiceId service, CompactionJob job) { CompactionInfo cInfo = new CompactionInfo(); - Set<StoredTabletFile> jobFiles = job.getFiles().stream() + cInfo.jobFiles = job.getFiles().stream() .map(cf -> ((CompactableFileImpl) cf).getStortedTabletFile()).collect(Collectors.toSet()); if (job.getKind() == CompactionKind.USER) @@ -584,9 +584,10 @@ public class CompactableImpl implements Compactable { case SELECTED: { if (job.getKind() == CompactionKind.USER || job.getKind() == CompactionKind.SELECTOR) { if (selectKind == job.getKind()) { - if (!selectedFiles.containsAll(jobFiles)) { + if (!selectedFiles.containsAll(cInfo.jobFiles)) { log.error("Ignoring {} compaction that does not contain selected files {} {} {}", - job.getKind(), getExtent(), asFileNames(selectedFiles), asFileNames(jobFiles)); + job.getKind(), getExtent(), asFileNames(selectedFiles), + asFileNames(cInfo.jobFiles)); return null; } } else { @@ -594,9 +595,9 @@ public class CompactableImpl implements Compactable { getExtent()); return null; } - } else if (!Collections.disjoint(selectedFiles, jobFiles)) { + } else if (!Collections.disjoint(selectedFiles, cInfo.jobFiles)) { log.trace("Ingoring compaction that overlaps with selected files {} {} {}", getExtent(), - job.getKind(), asFileNames(Sets.intersection(selectedFiles, jobFiles))); + job.getKind(), asFileNames(Sets.intersection(selectedFiles, cInfo.jobFiles))); return null; } break; @@ -614,8 +615,8 @@ public class CompactableImpl implements Compactable { throw new AssertionError(); } - if (Collections.disjoint(allCompactingFiles, jobFiles)) { - allCompactingFiles.addAll(jobFiles); + if (Collections.disjoint(allCompactingFiles, cInfo.jobFiles)) { + allCompactingFiles.addAll(cInfo.jobFiles); runnningJobs.add(job); } else { return null; @@ -627,7 +628,8 @@ public class CompactableImpl implements Compactable { case SELECTOR: case USER: Preconditions.checkState(selectStatus == SpecialStatus.SELECTED); - if (job.getKind() == selectKind && selectedAll && jobFiles.containsAll(selectedFiles)) { + if (job.getKind() == selectKind && selectedAll + && cInfo.jobFiles.containsAll(selectedFiles)) { cInfo.propogateDeletes = false; } break; @@ -640,7 +642,7 @@ public class CompactableImpl implements Compactable { } if (job.getKind() == CompactionKind.USER && selectKind == job.getKind() - && selectedFiles.equals(jobFiles)) { + && selectedFiles.equals(cInfo.jobFiles)) { cInfo.compactionId = this.compactionId; } @@ -729,9 +731,9 @@ public class CompactableImpl implements Compactable { if (cInfo == null) return null; - // TODO add external compaction info to metadata table + // CBUG add external compaction info to metadata table try { - // TODO share code w/ CompactableUtil and/or move there + // CBUG share code w/ CompactableUtil and/or move there cInfo.newFile = tablet.getNextMapFilename(!cInfo.propogateDeletes ? "A" : "C"); cInfo.compactTmpName = new TabletFile(new Path(cInfo.newFile.getMetaInsert() + "_tmp")); @@ -741,35 +743,40 @@ public class CompactableImpl implements Compactable { externalCompactions.put(externalCompactionId, cInfo); - // TODO because this is an RPC the return may never get to the caller... however the caller + // CBUG because this is an RPC the return may never get to the caller... however the caller // may be alive.... maybe the caller can set the externalCompactionId it working on in ZK return new ExternalCompactionJob(cInfo.jobFiles, cInfo.propogateDeletes, cInfo.compactTmpName, getExtent(), externalCompactionId, job.getPriority(), job.getKind(), cInfo.iters); } catch (Exception e) { - // TODO unreserve files for compaction! + // CBUG unreserve files for compaction! throw new RuntimeException(e); } } @Override public void commitExternalCompaction(UUID extCompactionId, long fileSize, long entries) { - // TODO double check w/ java docs that only one thread can remove + // CBUG double check w/ java docs that only one thread can remove CompactionInfo cInfo = externalCompactions.remove(extCompactionId); if (cInfo != null) { + log.debug("Attempting to commit external compaction {}", extCompactionId); // TODO do a sanity check that files exists in dfs? StoredTabletFile metaFile = null; try { metaFile = tablet.getDatafileManager().bringMajorCompactionOnline(cInfo.jobFiles, cInfo.compactTmpName, cInfo.newFile, compactionId, new DataFileValue(fileSize, entries)); + TabletLogger.compacted(getExtent(), cInfo.job, metaFile); } catch (Exception e) { metaFile = null; throw new RuntimeException(e); } finally { completeCompaction(cInfo.job, cInfo.jobFiles, metaFile); } + } else { + log.debug("Ignoring request to commit external compaction that is unknown {}", + extCompactionId); } }