This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 1451-external-compactions-feature in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push: new e62773c Adds external compaction id type e62773c is described below commit e62773c04249211c4a1b7a817c58e5ea74ec96fa Author: Keith Turner <ktur...@apache.org> AuthorDate: Fri Mar 12 16:34:59 2021 -0500 Adds external compaction id type --- .../server/compaction/ExternalCompactionId.java | 59 ++++++++++++++++++++++ .../coordinator/CompactionCoordinator.java | 20 ++++---- .../org/apache/accumulo/compactor/Compactor.java | 2 +- .../accumulo/tserver/ThriftClientHandler.java | 7 +-- .../accumulo/tserver/compactions/Compactable.java | 4 +- .../tserver/compactions/CompactionManager.java | 11 ++-- .../tserver/compactions/ExternalCompactionJob.java | 10 ++-- .../accumulo/tserver/tablet/CompactableImpl.java | 9 ++-- 8 files changed, 93 insertions(+), 29 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionId.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionId.java new file mode 100644 index 0000000..e6880c4 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionId.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.compaction; + +import java.util.UUID; + +import org.apache.accumulo.core.data.AbstractId; + +public class ExternalCompactionId extends AbstractId<ExternalCompactionId> { + + // A common prefix is nice when grepping logs for external compaction ids. The prefix also serves + // as a nice sanity check on data coming in over the network and from persistent storage. + private static final String PREFIX = "ECID:"; + + private ExternalCompactionId(UUID uuid) { + super(PREFIX + uuid); + } + + private ExternalCompactionId(String id) { + super(id); + } + + private static final long serialVersionUID = 1L; + + public static ExternalCompactionId generate() { + return new ExternalCompactionId(UUID.randomUUID()); + } + + public static ExternalCompactionId of(String id) { + if (!id.startsWith(PREFIX)) { + throw new IllegalArgumentException("Not a valid external compaction id " + id); + } + + try { + UUID.fromString(id.substring(PREFIX.length())); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Not a valid external compaction id " + id, e); + } + + return new ExternalCompactionId(id); + } + +} diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index bc5d4c6..d72bfaa 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -56,6 +56,7 @@ import org.apache.accumulo.server.AbstractServer; import org.apache.accumulo.server.GarbageCollectionLogger; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.ServerOpts; +import org.apache.accumulo.server.compaction.ExternalCompactionId; import org.apache.accumulo.server.compaction.ExternalCompactionUtil; import org.apache.accumulo.server.compaction.RetryableThriftCall; import org.apache.accumulo.server.compaction.RetryableThriftCall.RetriesExceededException; @@ -86,7 +87,8 @@ public class CompactionCoordinator extends AbstractServer /* index of tserver to queue and priority, exists to provide O(1) lookup into QUEUES */ private static final Map<TServerInstance,HashSet<QueueAndPriority>> INDEX = new HashMap<>(); /* Map of compactionId to RunningCompactions */ - private static final Map<String,RunningCompaction> RUNNING = new ConcurrentHashMap<>(); + private static final Map<ExternalCompactionId,RunningCompaction> RUNNING = + new ConcurrentHashMap<>(); private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger(); private final AccumuloConfiguration aconf; @@ -311,10 +313,9 @@ public class CompactionCoordinator extends AbstractServer client = getTabletServerConnection(tserver); TExternalCompactionJob job = client.reserveCompactionJob(TraceUtil.traceInfo(), getContext().rpcCreds(), queue, priority, compactorAddress); - RUNNING.put(job.getExternalCompactionId(), + RUNNING.put(ExternalCompactionId.of(job.getExternalCompactionId()), new RunningCompaction(job, compactorAddress, tserver)); - LOG.debug( - "Returning external job id:" + job.externalCompactionId + " to " + compactorAddress); + LOG.debug("Returning external job {} to {}", job.externalCompactionId, compactorAddress); return job; } finally { ThriftUtil.returnClient(client); @@ -341,7 +342,7 @@ public class CompactionCoordinator extends AbstractServer */ @Override public void cancelCompaction(String externalCompactionId) throws TException { - RunningCompaction rc = RUNNING.get(externalCompactionId); + RunningCompaction rc = RUNNING.get(ExternalCompactionId.of(externalCompactionId)); if (null == rc) { throw new UnknownCompactionIdException(); } @@ -371,7 +372,7 @@ public class CompactionCoordinator extends AbstractServer @Override public List<Status> getCompactionStatus(String externalCompactionId) throws TException { - RunningCompaction rc = RUNNING.get(externalCompactionId); + RunningCompaction rc = RUNNING.get(ExternalCompactionId.of(externalCompactionId)); if (null == rc) { throw new UnknownCompactionIdException(); } @@ -394,7 +395,8 @@ public class CompactionCoordinator extends AbstractServer @Override public void compactionCompleted(String externalCompactionId, CompactionStats stats) throws TException { - RunningCompaction rc = RUNNING.get(externalCompactionId); + var ecid = ExternalCompactionId.of(externalCompactionId); + RunningCompaction rc = RUNNING.get(ecid); if (null != rc) { rc.setStats(stats); } else { @@ -414,7 +416,7 @@ public class CompactionCoordinator extends AbstractServer client = getTabletServerConnection(rc.getTserver()); client.compactionJobFinished(TraceUtil.traceInfo(), getContext().rpcCreds(), externalCompactionId, stats.fileSize, stats.entriesWritten); - RUNNING.remove(externalCompactionId, rc); + RUNNING.remove(ecid, rc); return null; } catch (TException e) { throw e; @@ -462,7 +464,7 @@ public class CompactionCoordinator extends AbstractServer @Override public void updateCompactionStatus(String externalCompactionId, CompactionState state, String message, long timestamp) throws TException { - RunningCompaction rc = RUNNING.get(externalCompactionId); + RunningCompaction rc = RUNNING.get(ExternalCompactionId.of(externalCompactionId)); if (null != rc) { rc.addUpdate(timestamp, message, state); } else { diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 73e790d..b86de52 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -407,7 +407,7 @@ public class Compactor extends AbstractServer cs.setEntriesWritten(stat.getEntriesWritten()); cs.setFileSize(stat.getFileSize()); jobHolder.setStats(cs); - LOG.info("Compaction completed successfully"); + LOG.info("Compaction completed successfully {} ", job.getExternalCompactionId()); // Update state when completed updateCompactionState(job, CompactionState.SUCCEEDED, "Compaction completed successfully"); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java index e1897b2..f640e37 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java @@ -34,7 +34,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.UUID; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -123,6 +122,7 @@ import org.apache.accumulo.fate.zookeeper.ZooUtil; import org.apache.accumulo.server.client.ClientServiceHandler; import org.apache.accumulo.server.compaction.CompactionInfo; import org.apache.accumulo.server.compaction.Compactor; +import org.apache.accumulo.server.compaction.ExternalCompactionId; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.data.ServerMutation; import org.apache.accumulo.server.fs.TooManyFilesException; @@ -1701,8 +1701,9 @@ class ThriftClientHandler extends ClientServiceHandler implements TabletClientSe SecurityErrorCode.PERMISSION_DENIED).asThriftException(); } - server.getCompactionManager().commitExternalCompaction(UUID.fromString(externalCompactionId), - server.getOnlineTablets(), fileSize, entries); + server.getCompactionManager().commitExternalCompaction( + ExternalCompactionId.of(externalCompactionId), server.getOnlineTablets(), fileSize, + entries); } @Override diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/Compactable.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/Compactable.java index cf92d8d..f8cc442 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/Compactable.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/Compactable.java @@ -24,7 +24,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.SortedMap; -import java.util.UUID; import java.util.stream.Collectors; import org.apache.accumulo.core.client.admin.compaction.CompactableFile; @@ -37,6 +36,7 @@ import org.apache.accumulo.core.spi.compaction.CompactionJob; import org.apache.accumulo.core.spi.compaction.CompactionKind; import org.apache.accumulo.core.spi.compaction.CompactionServiceId; import org.apache.accumulo.core.util.ratelimit.RateLimiter; +import org.apache.accumulo.server.compaction.ExternalCompactionId; /** * Interface between compaction service and tablet. @@ -93,5 +93,5 @@ public interface Compactable { ExternalCompactionJob reserveExternalCompaction(CompactionServiceId service, CompactionJob job, String compactorId); - void commitExternalCompaction(UUID extCompactionId, long fileSize, long entries); + void commitExternalCompaction(ExternalCompactionId extCompactionId, long fileSize, long entries); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java index 392c7c1..db4d97d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -42,6 +41,7 @@ import org.apache.accumulo.core.tabletserver.thrift.TCompactionQueueSummary; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.fate.util.Retry; import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.compaction.ExternalCompactionId; import org.apache.accumulo.tserver.metrics.CompactionExecutorsMetrics; import org.apache.accumulo.tserver.tablet.Tablet; import org.slf4j.Logger; @@ -75,7 +75,7 @@ public class CompactionManager { private Map<CompactionExecutorId,ExternalCompactionExecutor> externalExecutors; // TODO this may need to be garbage collected... also will need to be populated when tablet load - private Map<UUID,KeyExtent> runningExternalCompactions; + private Map<ExternalCompactionId,KeyExtent> runningExternalCompactions; private class Config { Map<String,String> planners = new HashMap<>(); @@ -405,6 +405,7 @@ public class CompactionManager { return services.values().stream().mapToInt(CompactionService::getCompactionsQueued).sum(); } + // CBUG would be nice to create a CompactorId type and use that instead of string. public ExternalCompactionJob reserveExternalCompaction(String queueName, long priority, String compactorId) { log.debug("Attempting to reserved external compaction queue:{} priority:{} compactor:{}", @@ -414,7 +415,7 @@ public class CompactionManager { var ecJob = extCE.reserveExternalCompaction(priority, compactorId); if (ecJob != null) { runningExternalCompactions.put(ecJob.getExternalCompactionId(), ecJob.getExtent()); - log.debug("Reserved external compaction ecid:{}", ecJob.getExternalCompactionId()); + log.debug("Reserved external compaction {}", ecJob.getExternalCompactionId()); } return ecJob; } @@ -427,8 +428,8 @@ public class CompactionManager { return getExternalExecutor(CompactionExecutorId.externalId(queueName)); } - public void commitExternalCompaction(UUID extCompactionId, Map<KeyExtent,Tablet> currentTablets, - long fileSize, long entries) { + public void commitExternalCompaction(ExternalCompactionId extCompactionId, + Map<KeyExtent,Tablet> currentTablets, long fileSize, long entries) { KeyExtent extent = runningExternalCompactions.get(extCompactionId); if (extent != null) { Tablet tablet = currentTablets.get(extent); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java index 4e30ea3..e2b04d9 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java @@ -21,7 +21,6 @@ package org.apache.accumulo.tserver.compactions; import java.util.List; import java.util.Objects; import java.util.Set; -import java.util.UUID; import java.util.stream.Collectors; import org.apache.accumulo.core.client.IteratorSetting; @@ -35,6 +34,7 @@ import org.apache.accumulo.core.tabletserver.thrift.CompactionType; import org.apache.accumulo.core.tabletserver.thrift.InputFile; import org.apache.accumulo.core.tabletserver.thrift.IteratorConfig; import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; +import org.apache.accumulo.server.compaction.ExternalCompactionId; public class ExternalCompactionJob { @@ -42,14 +42,14 @@ public class ExternalCompactionJob { private boolean propogateDeletes; private TabletFile compactTmpName; private KeyExtent extent; - private UUID externalCompactionId; + private ExternalCompactionId externalCompactionId; private long priority; private CompactionKind kind; private List<IteratorSetting> iters; public ExternalCompactionJob(Set<StoredTabletFile> jobFiles, boolean propogateDeletes, - TabletFile compactTmpName, KeyExtent extent, UUID externalCompactionId, long priority, - CompactionKind kind, List<IteratorSetting> iters) { + TabletFile compactTmpName, KeyExtent extent, ExternalCompactionId externalCompactionId, + long priority, CompactionKind kind, List<IteratorSetting> iters) { this.jobFiles = Objects.requireNonNull(jobFiles); this.propogateDeletes = propogateDeletes; this.compactTmpName = Objects.requireNonNull(compactTmpName); @@ -99,7 +99,7 @@ public class ExternalCompactionJob { org.apache.accumulo.core.tabletserver.thrift.CompactionKind.valueOf(kind.name())); } - public UUID getExternalCompactionId() { + public ExternalCompactionId getExternalCompactionId() { return externalCompactionId; } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java index 3d1f5d5..ecd7083 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java @@ -30,7 +30,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.SortedMap; -import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -61,6 +60,7 @@ import org.apache.accumulo.core.util.ratelimit.RateLimiter; import org.apache.accumulo.server.ServiceEnvironmentImpl; import org.apache.accumulo.server.compaction.CompactionStats; import org.apache.accumulo.server.compaction.Compactor.CompactionCanceledException; +import org.apache.accumulo.server.compaction.ExternalCompactionId; import org.apache.accumulo.server.util.MetadataTableUtil; import org.apache.accumulo.tserver.compactions.Compactable; import org.apache.accumulo.tserver.compactions.CompactionManager; @@ -721,7 +721,7 @@ public class CompactableImpl implements Compactable { } // TODO move to top of class - private Map<UUID,CompactionInfo> externalCompactions = new ConcurrentHashMap<>(); + private Map<ExternalCompactionId,CompactionInfo> externalCompactions = new ConcurrentHashMap<>(); @Override public ExternalCompactionJob reserveExternalCompaction(CompactionServiceId service, @@ -737,7 +737,7 @@ public class CompactableImpl implements Compactable { cInfo.newFile = tablet.getNextMapFilename(!cInfo.propogateDeletes ? "A" : "C"); cInfo.compactTmpName = new TabletFile(new Path(cInfo.newFile.getMetaInsert() + "_tmp")); - UUID externalCompactionId = UUID.randomUUID(); + ExternalCompactionId externalCompactionId = ExternalCompactionId.generate(); cInfo.job = job; @@ -755,7 +755,8 @@ public class CompactableImpl implements Compactable { } @Override - public void commitExternalCompaction(UUID extCompactionId, long fileSize, long entries) { + public void commitExternalCompaction(ExternalCompactionId extCompactionId, long fileSize, + long entries) { // CBUG double check w/ java docs that only one thread can remove CompactionInfo cInfo = externalCompactions.remove(extCompactionId);