This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit fabecbd856ccde226f776f6af493ae9b531338f1 Merge: 12166db314 9414041323 Author: Keith Turner <ktur...@apache.org> AuthorDate: Tue Jun 18 14:31:54 2024 -0400 Merge branch 'main' into elasticity .../apache/accumulo/core/util/cache/Caches.java | 1 + .../util/compaction/ExternalCompactionUtil.java | 9 + .../thrift/CompactionCoordinatorService.java | 44 +- .../core/compaction/thrift/TNextCompactionJob.java | 511 +++++++++++++++++++++ core/src/main/thrift/compaction-coordinator.thrift | 9 +- .../org/apache/accumulo/compactor/Compactor.java | 15 +- .../coordinator/CompactionCoordinator.java | 15 +- .../compaction/CompactionCoordinatorTest.java | 15 +- 8 files changed, 582 insertions(+), 37 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/util/cache/Caches.java index 3927f4d0c0,0000000000..5767f8b508 mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/util/cache/Caches.java +++ b/core/src/main/java/org/apache/accumulo/core/util/cache/Caches.java @@@ -1,108 -1,0 +1,109 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.util.cache; + +import static com.google.common.base.Suppliers.memoize; + +import java.util.function.Supplier; + +import org.apache.accumulo.core.metrics.MetricsProducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.benmanes.caffeine.cache.Caffeine; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.binder.cache.CaffeineStatsCounter; + +public class Caches implements MetricsProducer { + + public enum CacheName { + BULK_IMPORT_FILE_LENGTHS, + CLASSLOADERS, + COMBINER_LOGGED_MSGS, + COMPACTIONS_COMPLETED, + COMPACTION_CONFIGS, ++ COMPACTOR_COUNTS, + COMPACTION_DIR_CACHE, + COMPACTION_DISPATCHERS, + COMPACTION_SERVICE_ID, + COMPACTOR_GROUP_ID, + COMPRESSION_ALGORITHM, + CRYPT_PASSWORDS, + HOST_REGEX_BALANCER_TABLE_REGEX, + INSTANCE_ID, + NAMESPACE_ID, + NAMESPACE_CONFIGS, + PROP_CACHE, + RECOVERY_MANAGER_PATH_CACHE, + SCAN_SERVER_TABLET_METADATA, + SERVICE_ENVIRONMENT_TABLE_CONFIGS, + SPACE_AWARE_VOLUME_CHOICE, + SPLITTER_FILES, + SPLITTER_STARTING, + SPLITTER_UNSPLITTABLE, + TABLE_CONFIGS, + TABLE_ID, + TABLE_PARENT_CONFIGS, + TABLE_ZOO_HELPER_CACHE, + TSRM_FILE_LENGTHS, + TINYLFU_BLOCK_CACHE, + VOLUME_HDFS_CONFIGS + } + + private static final Logger LOG = LoggerFactory.getLogger(Caches.class); + private static final Supplier<Caches> CACHES = memoize(() -> new Caches()); + + public static Caches getInstance() { + return CACHES.get(); + } + + private MeterRegistry registry = null; + + private Caches() {} + + @Override + public void registerMetrics(MeterRegistry registry) { + this.registry = registry; + } + + private boolean setupMicrometerMetrics(Caffeine<Object,Object> cacheBuilder, String name) { + if (registry != null) { + try { + cacheBuilder.recordStats(() -> new CaffeineStatsCounter(registry, name)); + LOG.trace("Metrics enabled for {} cache.", name); + return true; + } catch (IllegalStateException e) { + // recordStats was already called by the cacheBuilder. + } + } + return false; + } + + public Caffeine<Object,Object> createNewBuilder(CacheName name, boolean emitMetricsIfEnabled) { + Caffeine<Object,Object> cacheBuilder = Caffeine.newBuilder(); + boolean metricsConfigured = false; + if (emitMetricsIfEnabled) { + metricsConfigured = setupMicrometerMetrics(cacheBuilder, name.name()); + } + LOG.trace("Caffeine builder created for {}, metrics enabled: {}", name, metricsConfigured); + return cacheBuilder; + } + +} diff --cc core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java index 48895192bd,bbcc3f1529..b52ed9f867 --- a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java @@@ -45,6 -46,6 +45,7 @@@ import org.apache.accumulo.core.tablets import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.threads.ThreadPools; ++import org.apache.accumulo.core.util.time.NanoTime; import org.apache.thrift.TException; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; @@@ -274,9 -274,10 +275,10 @@@ public class ExternalCompactionUtil return runningIds; } - public static int countCompactors(String queueName, ClientContext context) { - long start = System.nanoTime(); - String queueRoot = context.getZooKeeperRoot() + Constants.ZCOMPACTORS + "/" + queueName; - List<String> children = context.getZooCache().getChildren(queueRoot); + public static int countCompactors(String groupName, ClientContext context) { ++ var start = NanoTime.now(); + String groupRoot = context.getZooKeeperRoot() + Constants.ZCOMPACTORS + "/" + groupName; + List<String> children = context.getZooCache().getChildren(groupRoot); if (children == null) { return 0; } @@@ -290,6 -291,13 +292,13 @@@ } } - long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); ++ long elapsed = start.elapsed().toMillis(); + if (elapsed > 100) { - LOG.debug("Took {} ms to count {} compactors for {}", elapsed, count, queueName); ++ LOG.debug("Took {} ms to count {} compactors for {}", elapsed, count, groupName); + } else { - LOG.trace("Took {} ms to count {} compactors for {}", elapsed, count, queueName); ++ LOG.trace("Took {} ms to count {} compactors for {}", elapsed, count, groupName); + } + return count; } diff --cc core/src/main/thrift-gen-java/org/apache/accumulo/core/compaction/thrift/CompactionCoordinatorService.java index c7da84ef01,7b8af0438b..2870290607 --- a/core/src/main/thrift-gen-java/org/apache/accumulo/core/compaction/thrift/CompactionCoordinatorService.java +++ b/core/src/main/thrift-gen-java/org/apache/accumulo/core/compaction/thrift/CompactionCoordinatorService.java @@@ -31,7 -31,7 +31,7 @@@ public class CompactionCoordinatorServi public void compactionCompleted(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.lang.String externalCompactionId, org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent, org.apache.accumulo.core.tabletserver.thrift.TCompactionStats stats) throws org.apache.thrift.TException; - public org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob getCompactionJob(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.lang.String groupName, java.lang.String compactor, java.lang.String externalCompactionId) throws org.apache.thrift.TException; - public TNextCompactionJob getCompactionJob(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.lang.String queueName, java.lang.String compactor, java.lang.String externalCompactionId) throws org.apache.thrift.TException; ++ public TNextCompactionJob getCompactionJob(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.lang.String groupName, java.lang.String compactor, java.lang.String externalCompactionId) throws org.apache.thrift.TException; public void updateCompactionStatus(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.lang.String externalCompactionId, TCompactionStatusUpdate status, long timestamp) throws org.apache.thrift.TException; @@@ -49,7 -49,7 +49,7 @@@ public void compactionCompleted(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.lang.String externalCompactionId, org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent, org.apache.accumulo.core.tabletserver.thrift.TCompactionStats stats, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws org.apache.thrift.TException; - public void getCompactionJob(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.lang.String groupName, java.lang.String compactor, java.lang.String externalCompactionId, org.apache.thrift.async.AsyncMethodCallback<org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob> resultHandler) throws org.apache.thrift.TException; - public void getCompactionJob(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.lang.String queueName, java.lang.String compactor, java.lang.String externalCompactionId, org.apache.thrift.async.AsyncMethodCallback<TNextCompactionJob> resultHandler) throws org.apache.thrift.TException; ++ public void getCompactionJob(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.lang.String groupName, java.lang.String compactor, java.lang.String externalCompactionId, org.apache.thrift.async.AsyncMethodCallback<TNextCompactionJob> resultHandler) throws org.apache.thrift.TException; public void updateCompactionStatus(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.lang.String externalCompactionId, TCompactionStatusUpdate status, long timestamp, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws org.apache.thrift.TException; @@@ -111,9 -111,9 +111,9 @@@ } @Override - public org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob getCompactionJob(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.lang.String groupName, java.lang.String compactor, java.lang.String externalCompactionId) throws org.apache.thrift.TException - public TNextCompactionJob getCompactionJob(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.lang.String queueName, java.lang.String compactor, java.lang.String externalCompactionId) throws org.apache.thrift.TException ++ public TNextCompactionJob getCompactionJob(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.lang.String groupName, java.lang.String compactor, java.lang.String externalCompactionId) throws org.apache.thrift.TException { - send_getCompactionJob(tinfo, credentials, queueName, compactor, externalCompactionId); + send_getCompactionJob(tinfo, credentials, groupName, compactor, externalCompactionId); return recv_getCompactionJob(); } @@@ -328,20 -328,20 +328,20 @@@ } @Override - public void getCompactionJob(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.lang.String groupName, java.lang.String compactor, java.lang.String externalCompactionId, org.apache.thrift.async.AsyncMethodCallback<org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob> resultHandler) throws org.apache.thrift.TException { - public void getCompactionJob(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.lang.String queueName, java.lang.String compactor, java.lang.String externalCompactionId, org.apache.thrift.async.AsyncMethodCallback<TNextCompactionJob> resultHandler) throws org.apache.thrift.TException { ++ public void getCompactionJob(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.lang.String groupName, java.lang.String compactor, java.lang.String externalCompactionId, org.apache.thrift.async.AsyncMethodCallback<TNextCompactionJob> resultHandler) throws org.apache.thrift.TException { checkReady(); - getCompactionJob_call method_call = new getCompactionJob_call(tinfo, credentials, queueName, compactor, externalCompactionId, resultHandler, this, ___protocolFactory, ___transport); + getCompactionJob_call method_call = new getCompactionJob_call(tinfo, credentials, groupName, compactor, externalCompactionId, resultHandler, this, ___protocolFactory, ___transport); this.___currentMethod = method_call; ___manager.call(method_call); } - public static class getCompactionJob_call extends org.apache.thrift.async.TAsyncMethodCall<org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob> { + public static class getCompactionJob_call extends org.apache.thrift.async.TAsyncMethodCall<TNextCompactionJob> { private org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo; private org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials; - private java.lang.String queueName; + private java.lang.String groupName; private java.lang.String compactor; private java.lang.String externalCompactionId; - public getCompactionJob_call(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.lang.String groupName, java.lang.String compactor, java.lang.String externalCompactionId, org.apache.thrift.async.AsyncMethodCallback<org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, [...] - public getCompactionJob_call(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.lang.String queueName, java.lang.String compactor, java.lang.String externalCompactionId, org.apache.thrift.async.AsyncMethodCallback<TNextCompactionJob> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport [...] ++ public getCompactionJob_call(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.lang.String groupName, java.lang.String compactor, java.lang.String externalCompactionId, org.apache.thrift.async.AsyncMethodCallback<TNextCompactionJob> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport [...] super(client, protocolFactory, transport, resultHandler, false); this.tinfo = tinfo; this.credentials = credentials; @@@ -955,8 -955,8 +955,8 @@@ } @Override - public void start(I iface, getCompactionJob_args args, org.apache.thrift.async.AsyncMethodCallback<org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob> resultHandler) throws org.apache.thrift.TException { + public void start(I iface, getCompactionJob_args args, org.apache.thrift.async.AsyncMethodCallback<TNextCompactionJob> resultHandler) throws org.apache.thrift.TException { - iface.getCompactionJob(args.tinfo, args.credentials, args.queueName, args.compactor, args.externalCompactionId,resultHandler); + iface.getCompactionJob(args.tinfo, args.credentials, args.groupName, args.compactor, args.externalCompactionId,resultHandler); } } diff --cc core/src/main/thrift/compaction-coordinator.thrift index 8f9dbe9ce6,e5ff211393..d8d000bbd1 --- a/core/src/main/thrift/compaction-coordinator.thrift +++ b/core/src/main/thrift/compaction-coordinator.thrift @@@ -77,10 -84,10 +84,10 @@@ service CompactionCoordinatorService /* * Called by Compactor to get the next compaction job */ - tabletserver.TExternalCompactionJob getCompactionJob( + TNextCompactionJob getCompactionJob( 1:client.TInfo tinfo 2:security.TCredentials credentials - 3:string queueName + 3:string groupName 4:string compactor 5:string externalCompactionId ) diff --cc server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index a39a6ca1c7,57f9f87978..5e2b0e6cd0 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@@ -667,10 -653,8 +668,7 @@@ public class Compactor extends Abstract return UUID::randomUUID; } - protected long getWaitTimeBetweenCompactionChecks() { - // get the total number of compactors assigned to this group - int numCompactors = - ExternalCompactionUtil.countCompactors(this.getResourceGroup(), getContext()); + protected long getWaitTimeBetweenCompactionChecks(int numCompactors) { - // get the total number of compactors assigned to this queue long minWait = getConfiguration().getTimeInMillis(Property.COMPACTOR_MIN_JOB_WAIT_TIME); // Aim for around 3 compactors checking in per min wait time. long sleepTime = numCompactors * minWait / 3; @@@ -736,20 -707,13 +734,21 @@@ err.set(null); JOB_HOLDER.reset(); + if (System.currentTimeMillis() > nextSortLogsCheckTime) { + // Attempt to process all existing log sorting work serially in this thread. + // When no work remains, this call will return so that we can look for compaction + // work. + LOG.debug("Checking to see if any recovery logs need sorting"); + nextSortLogsCheckTime = logSorter.sortLogsIfNeeded(); + } + TExternalCompactionJob job; try { - job = getNextJob(getNextId()); + TNextCompactionJob next = getNextJob(getNextId()); + job = next.getJob(); if (!job.isSetExternalCompactionId()) { - LOG.trace("No external compactions in queue {}", this.queueName); + LOG.trace("No external compactions in group {}", this.getResourceGroup()); - UtilWaitThread.sleep(getWaitTimeBetweenCompactionChecks()); + UtilWaitThread.sleep(getWaitTimeBetweenCompactionChecks(next.getCompactorCount())); continue; } if (!job.getExternalCompactionId().equals(currentCompactionId.get().toString())) { diff --cc server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index f9872c3687,0000000000..355d5a8a5e mode 100644,000000..100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@@ -1,1109 -1,0 +1,1118 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.compaction.coordinator; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.toList; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACTED; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.TableDeletedException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.CompactionConfig; +import org.apache.accumulo.core.client.admin.compaction.CompactableFile; +import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode; +import org.apache.accumulo.core.clientImpl.thrift.TInfo; +import org.apache.accumulo.core.clientImpl.thrift.TableOperation; +import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType; +import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException; +import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService; +import org.apache.accumulo.core.compaction.thrift.TCompactionState; +import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate; +import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; +import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList; ++import org.apache.accumulo.core.compaction.thrift.TNextCompactionJob; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.NamespaceId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; +import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; +import org.apache.accumulo.core.fate.FateKey; +import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; +import org.apache.accumulo.core.logging.TabletLogger; +import org.apache.accumulo.core.manager.state.tables.TableState; +import org.apache.accumulo.core.metadata.CompactableFileImpl; +import org.apache.accumulo.core.metadata.ReferencedTabletFile; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; +import org.apache.accumulo.core.metadata.schema.Ample.RejectionHandler; +import org.apache.accumulo.core.metadata.schema.CompactionMetadata; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata; +import org.apache.accumulo.core.metadata.schema.filters.HasExternalCompactionsFilter; +import org.apache.accumulo.core.metrics.MetricsProducer; +import org.apache.accumulo.core.securityImpl.thrift.TCredentials; +import org.apache.accumulo.core.spi.compaction.CompactionJob; +import org.apache.accumulo.core.spi.compaction.CompactionKind; +import org.apache.accumulo.core.spi.compaction.CompactorGroupId; +import org.apache.accumulo.core.tabletserver.thrift.InputFile; +import org.apache.accumulo.core.tabletserver.thrift.IteratorConfig; +import org.apache.accumulo.core.tabletserver.thrift.TCompactionKind; +import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats; +import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; +import org.apache.accumulo.core.util.Retry; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.core.util.cache.Caches.CacheName; +import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; +import org.apache.accumulo.core.util.compaction.RunningCompaction; +import org.apache.accumulo.core.util.threads.ThreadPools; +import org.apache.accumulo.core.util.threads.Threads; +import org.apache.accumulo.core.util.time.SteadyTime; +import org.apache.accumulo.core.volume.Volume; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.compaction.coordinator.commit.CommitCompaction; +import org.apache.accumulo.manager.compaction.coordinator.commit.CompactionCommitData; +import org.apache.accumulo.manager.compaction.coordinator.commit.RenameCompactionFile; +import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.compaction.CompactionConfigStorage; +import org.apache.accumulo.server.compaction.CompactionPluginUtils; +import org.apache.accumulo.server.security.SecurityOperation; +import org.apache.accumulo.server.tablets.TabletNameGenerator; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.thrift.TException; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.CacheLoader; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.Weigher; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; +import com.google.common.net.HostAndPort; + +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; + +public class CompactionCoordinator + implements CompactionCoordinatorService.Iface, Runnable, MetricsProducer { + + private static final Logger LOG = LoggerFactory.getLogger(CompactionCoordinator.class); + + /* + * Map of compactionId to RunningCompactions. This is an informational cache of what external + * compactions may be running. Its possible it may contain external compactions that are not + * actually running. It may not contain compactions that are actually running. The metadata table + * is the most authoritative source of what external compactions are currently running, but it + * does not have the stats that this map has. + */ + protected final Map<ExternalCompactionId,RunningCompaction> RUNNING_CACHE = + new ConcurrentHashMap<>(); + + /* Map of group name to last time compactor called to get a compaction job */ + // ELASTICITY_TODO #4403 need to clean out groups that are no longer configured.. + private final Map<CompactorGroupId,Long> TIME_COMPACTOR_LAST_CHECKED = new ConcurrentHashMap<>(); + + private final ServerContext ctx; + private final SecurityOperation security; + private final String resourceGroupName; + private final CompactionJobQueues jobQueues; + private final AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances; + // Exposed for tests + protected CountDownLatch shutdown = new CountDownLatch(1); + + private final ScheduledThreadPoolExecutor schedExecutor; + + private final Cache<ExternalCompactionId,RunningCompaction> completed; + private final LoadingCache<FateId,CompactionConfig> compactionConfigCache; + private final Cache<Path,Integer> tabletDirCache; + private final DeadCompactionDetector deadCompactionDetector; + + private QueueMetrics queueMetrics; + private final Manager manager; + ++ private final LoadingCache<String,Integer> compactorCounts; ++ + public CompactionCoordinator(ServerContext ctx, SecurityOperation security, + AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances, + final String resourceGroupName, Manager manager) { + this.ctx = ctx; + this.schedExecutor = this.ctx.getScheduledExecutor(); + this.security = security; + this.resourceGroupName = resourceGroupName; + this.manager = Objects.requireNonNull(manager); + + this.jobQueues = new CompactionJobQueues( + ctx.getConfiguration().getCount(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE)); + + this.queueMetrics = new QueueMetrics(jobQueues); + + this.fateInstances = fateInstances; + + completed = ctx.getCaches().createNewBuilder(CacheName.COMPACTIONS_COMPLETED, true) + .maximumSize(200).expireAfterWrite(10, TimeUnit.MINUTES).build(); + + CacheLoader<FateId,CompactionConfig> loader = + fateId -> CompactionConfigStorage.getConfig(ctx, fateId); + + // Keep a small short lived cache of compaction config. Compaction config never changes, however + // when a compaction is canceled it is deleted which is why there is a time limit. It does not + // hurt to let a job that was canceled start, it will be canceled later. Caching this immutable + // config will help avoid reading the same data over and over. + compactionConfigCache = ctx.getCaches().createNewBuilder(CacheName.COMPACTION_CONFIGS, true) + .expireAfterWrite(30, SECONDS).maximumSize(100).build(loader); + + Weigher<Path,Integer> weigher = (path, count) -> { + return path.toUri().toString().length(); + }; + + tabletDirCache = ctx.getCaches().createNewBuilder(CacheName.COMPACTION_DIR_CACHE, true) + .maximumWeight(10485760L).weigher(weigher).build(); + + deadCompactionDetector = + new DeadCompactionDetector(this.ctx, this, schedExecutor, fateInstances); ++ ++ compactorCounts = ctx.getCaches().createNewBuilder(CacheName.COMPACTOR_COUNTS, false) ++ .expireAfterWrite(30, TimeUnit.SECONDS).build(this::countCompactors); + // At this point the manager does not have its lock so no actions should be taken yet + } + ++ protected int countCompactors(String groupName) { ++ return ExternalCompactionUtil.countCompactors(groupName, ctx); ++ } ++ + private volatile Thread serviceThread = null; + + public void start() { + serviceThread = Threads.createThread("CompactionCoordinator Thread", this); + serviceThread.start(); + } + + public void shutdown() { + shutdown.countDown(); + var localThread = serviceThread; + if (localThread != null) { + try { + localThread.join(); + } catch (InterruptedException e) { + LOG.error("Exception stopping compaction coordinator thread", e); + } + } + } + + protected void startCompactionCleaner(ScheduledThreadPoolExecutor schedExecutor) { + ScheduledFuture<?> future = + schedExecutor.scheduleWithFixedDelay(this::cleanUpCompactors, 0, 5, TimeUnit.MINUTES); + ThreadPools.watchNonCriticalScheduledTask(future); + } + + protected void startRunningCleaner(ScheduledThreadPoolExecutor schedExecutor) { + ScheduledFuture<?> future = + schedExecutor.scheduleWithFixedDelay(this::cleanUpRunning, 0, 5, TimeUnit.MINUTES); + ThreadPools.watchNonCriticalScheduledTask(future); + } + + protected void startIdleCompactionWatcher() { + + ScheduledFuture<?> future = schedExecutor.scheduleWithFixedDelay(this::idleCompactionWarning, + getTServerCheckInterval(), getTServerCheckInterval(), TimeUnit.MILLISECONDS); + ThreadPools.watchNonCriticalScheduledTask(future); + } + + private void idleCompactionWarning() { + + long now = System.currentTimeMillis(); + Map<String,Set<HostAndPort>> idleCompactors = getIdleCompactors(); + TIME_COMPACTOR_LAST_CHECKED.forEach((groupName, lastCheckTime) -> { + if ((now - lastCheckTime) > getMissingCompactorWarningTime() + && jobQueues.getQueuedJobs(groupName) > 0 + && idleCompactors.containsKey(groupName.canonical())) { + LOG.warn("No compactors have checked in with coordinator for group {} in {}ms", groupName, + getMissingCompactorWarningTime()); + } + }); + + } + + @Override + public void run() { + startCompactionCleaner(schedExecutor); + startRunningCleaner(schedExecutor); + + // On a re-start of the coordinator it's possible that external compactions are in-progress. + // Attempt to get the running compactions on the compactors and then resolve which tserver + // the external compaction came from to re-populate the RUNNING collection. + LOG.info("Checking for running external compactions"); + // On re-start contact the running Compactors to try and seed the list of running compactions + List<RunningCompaction> running = getCompactionsRunningOnCompactors(); + if (running.isEmpty()) { + LOG.info("No running external compactions found"); + } else { + LOG.info("Found {} running external compactions", running.size()); + running.forEach(rc -> { + TCompactionStatusUpdate update = new TCompactionStatusUpdate(); + update.setState(TCompactionState.IN_PROGRESS); + update.setMessage("Coordinator restarted, compaction found in progress"); + rc.addUpdate(System.currentTimeMillis(), update); + RUNNING_CACHE.put(ExternalCompactionId.of(rc.getJob().getExternalCompactionId()), rc); + }); + } + + startDeadCompactionDetector(); + + startIdleCompactionWatcher(); + + try { + shutdown.await(); + } catch (InterruptedException e) { + LOG.warn("Interrupted waiting for shutdown latch.", e); + } + + LOG.info("Shutting down"); + } + + private Map<String,Set<HostAndPort>> getIdleCompactors() { + + Map<String,Set<HostAndPort>> allCompactors = new HashMap<>(); + ExternalCompactionUtil.getCompactorAddrs(ctx) + .forEach((group, compactorList) -> allCompactors.put(group, new HashSet<>(compactorList))); + + Set<String> emptyQueues = new HashSet<>(); + + // Remove all of the compactors that are running a compaction + RUNNING_CACHE.values().forEach(rc -> { + Set<HostAndPort> busyCompactors = allCompactors.get(rc.getGroupName()); + if (busyCompactors != null + && busyCompactors.remove(HostAndPort.fromString(rc.getCompactorAddress()))) { + if (busyCompactors.isEmpty()) { + emptyQueues.add(rc.getGroupName()); + } + } + }); + // Remove entries with empty queues + emptyQueues.forEach(e -> allCompactors.remove(e)); + return allCompactors; + } + + protected void startDeadCompactionDetector() { + deadCompactionDetector.start(); + } + + protected long getMissingCompactorWarningTime() { + return this.ctx.getConfiguration().getTimeInMillis(Property.COMPACTOR_MAX_JOB_WAIT_TIME) * 3; + } + + protected long getTServerCheckInterval() { + return this.ctx.getConfiguration() + .getTimeInMillis(Property.COMPACTION_COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL); + } + + public long getNumRunningCompactions() { + return RUNNING_CACHE.size(); + } + + /** + * Return the next compaction job from the queue to a Compactor + * + * @param groupName group + * @param compactorAddress compactor address + * @throws ThriftSecurityException when permission error + * @return compaction job + */ + @Override - public TExternalCompactionJob getCompactionJob(TInfo tinfo, TCredentials credentials, ++ public TNextCompactionJob getCompactionJob(TInfo tinfo, TCredentials credentials, + String groupName, String compactorAddress, String externalCompactionId) + throws ThriftSecurityException { + + // do not expect users to call this directly, expect compactors to call this method + if (!security.canPerformSystemActions(credentials)) { + throw new AccumuloSecurityException(credentials.getPrincipal(), + SecurityErrorCode.PERMISSION_DENIED).asThriftException(); + } + CompactorGroupId groupId = CompactorGroupId.of(groupName); + LOG.trace("getCompactionJob called for group {} by compactor {}", groupId, compactorAddress); + TIME_COMPACTOR_LAST_CHECKED.put(groupId, System.currentTimeMillis()); + + TExternalCompactionJob result = null; + + CompactionJobQueues.MetaJob metaJob = jobQueues.poll(groupId); + + while (metaJob != null) { + + Optional<CompactionConfig> compactionConfig = getCompactionConfig(metaJob); + + // this method may reread the metadata, do not use the metadata in metaJob for anything after + // this method + CompactionMetadata ecm = null; + + var kind = metaJob.getJob().getKind(); + + // Only reserve user compactions when the config is present. When compactions are canceled the + // config is deleted. + var cid = ExternalCompactionId.from(externalCompactionId); + if (kind == CompactionKind.SYSTEM + || (kind == CompactionKind.USER && compactionConfig.isPresent())) { + ecm = reserveCompaction(metaJob, compactorAddress, cid); + } + + if (ecm != null) { + result = createThriftJob(externalCompactionId, ecm, metaJob, compactionConfig); + // It is possible that by the time this added that the the compactor that made this request + // is dead. In this cases the compaction is not actually running. + RUNNING_CACHE.put(ExternalCompactionId.of(result.getExternalCompactionId()), + new RunningCompaction(result, compactorAddress, groupName)); + TabletLogger.compacting(metaJob.getTabletMetadata(), cid, compactorAddress, + metaJob.getJob()); + break; + } else { + LOG.debug( + "Unable to reserve compaction job for {}, pulling another off the queue for group {}", + metaJob.getTabletMetadata().getExtent(), groupName); + metaJob = jobQueues.poll(CompactorGroupId.of(groupName)); + } + } + + if (metaJob == null) { + LOG.debug("No jobs found in group {} ", groupName); + } + + if (result == null) { + LOG.trace("No jobs found for group {}, returning empty job to compactor {}", groupName, + compactorAddress); + result = new TExternalCompactionJob(); + } + - return result; - ++ return new TNextCompactionJob(result, compactorCounts.get(groupName)); + } + + @VisibleForTesting + public static boolean canReserveCompaction(TabletMetadata tablet, CompactionKind kind, + Set<StoredTabletFile> jobFiles, ServerContext ctx, SteadyTime steadyTime) { + + if (tablet == null) { + // the tablet no longer exist + return false; + } + + if (tablet.getOperationId() != null) { + return false; + } + + if (ctx.getTableState(tablet.getTableId()) != TableState.ONLINE) { + return false; + } + + if (!tablet.getFiles().containsAll(jobFiles)) { + return false; + } + + var currentlyCompactingFiles = tablet.getExternalCompactions().values().stream() + .flatMap(ecm -> ecm.getJobFiles().stream()).collect(Collectors.toSet()); + + if (!Collections.disjoint(jobFiles, currentlyCompactingFiles)) { + return false; + } + + switch (kind) { + case SYSTEM: + var userRequestedCompactions = tablet.getUserCompactionsRequested().size(); + if (userRequestedCompactions > 0) { + LOG.debug( + "Unable to reserve {} for system compaction, tablet has {} pending requested user compactions", + tablet.getExtent(), userRequestedCompactions); + return false; + } else if (!Collections.disjoint(jobFiles, + getFilesReservedBySelection(tablet, steadyTime, ctx))) { + return false; + } + break; + case USER: + if (tablet.getSelectedFiles() == null + || !tablet.getSelectedFiles().getFiles().containsAll(jobFiles)) { + return false; + } + break; + default: + throw new UnsupportedOperationException("Not currently handling " + kind); + } + + return true; + } + + private void checkTabletDir(KeyExtent extent, Path path) { + try { + if (tabletDirCache.getIfPresent(path) == null) { + FileStatus[] files = null; + try { + files = ctx.getVolumeManager().listStatus(path); + } catch (FileNotFoundException ex) { + // ignored + } + + if (files == null) { + LOG.debug("Tablet {} had no dir, creating {}", extent, path); + + ctx.getVolumeManager().mkdirs(path); + } + tabletDirCache.put(path, 1); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + protected CompactionMetadata createExternalCompactionMetadata(CompactionJob job, + Set<StoredTabletFile> jobFiles, TabletMetadata tablet, String compactorAddress, + ExternalCompactionId externalCompactionId) { + boolean propDels; + + FateId fateId = null; + + switch (job.getKind()) { + case SYSTEM: { + boolean compactingAll = tablet.getFiles().equals(jobFiles); + propDels = !compactingAll; + } + break; + case USER: { + boolean compactingAll = tablet.getSelectedFiles().initiallySelectedAll() + && tablet.getSelectedFiles().getFiles().equals(jobFiles); + propDels = !compactingAll; + fateId = tablet.getSelectedFiles().getFateId(); + } + break; + default: + throw new IllegalArgumentException(); + } + + Consumer<String> directoryCreator = dir -> checkTabletDir(tablet.getExtent(), new Path(dir)); + ReferencedTabletFile newFile = TabletNameGenerator.getNextDataFilenameForMajc(propDels, ctx, + tablet, directoryCreator, externalCompactionId); + + return new CompactionMetadata(jobFiles, newFile, compactorAddress, job.getKind(), + job.getPriority(), job.getGroup(), propDels, fateId); + + } + + protected CompactionMetadata reserveCompaction(CompactionJobQueues.MetaJob metaJob, + String compactorAddress, ExternalCompactionId externalCompactionId) { + + Preconditions.checkArgument(metaJob.getJob().getKind() == CompactionKind.SYSTEM + || metaJob.getJob().getKind() == CompactionKind.USER); + + var tabletMetadata = metaJob.getTabletMetadata(); + + var jobFiles = metaJob.getJob().getFiles().stream().map(CompactableFileImpl::toStoredTabletFile) + .collect(Collectors.toSet()); + + Retry retry = Retry.builder().maxRetries(5).retryAfter(Duration.ofMillis(100)) + .incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(10)).backOffFactor(1.5) + .logInterval(Duration.ofMinutes(3)).createRetry(); + + while (retry.canRetry()) { + try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) { + var extent = metaJob.getTabletMetadata().getExtent(); + + if (!canReserveCompaction(tabletMetadata, metaJob.getJob().getKind(), jobFiles, ctx, + manager.getSteadyTime())) { + return null; + } + + var ecm = createExternalCompactionMetadata(metaJob.getJob(), jobFiles, tabletMetadata, + compactorAddress, externalCompactionId); + + // any data that is read from the tablet to make a decision about if it can compact or not + // must be included in the requireSame call + var tabletMutator = tabletsMutator.mutateTablet(extent).requireAbsentOperation() + .requireSame(tabletMetadata, FILES, SELECTED, ECOMP); + + if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) { + var selectedFiles = tabletMetadata.getSelectedFiles(); + var reserved = getFilesReservedBySelection(tabletMetadata, manager.getSteadyTime(), ctx); + + // If there is a selectedFiles column, and the reserved set is empty this means that + // either no user jobs were completed yet or the selection expiration time has passed + // so the column is eligible to be deleted so a system job can run instead + if (selectedFiles != null && reserved.isEmpty() + && !Collections.disjoint(jobFiles, selectedFiles.getFiles())) { + LOG.debug("Deleting user compaction selected files for {} {}", extent, + externalCompactionId); + tabletMutator.deleteSelectedFiles(); + } + } + + tabletMutator.putExternalCompaction(externalCompactionId, ecm); + tabletMutator.submit(tm -> tm.getExternalCompactions().containsKey(externalCompactionId)); + + var result = tabletsMutator.process().get(extent); + + if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) { + return ecm; + } else { + tabletMetadata = result.readMetadata(); + } + } + + retry.useRetry(); + try { + retry.waitForNextAttempt(LOG, + "Reserved compaction for " + metaJob.getTabletMetadata().getExtent()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + return null; + } + + protected TExternalCompactionJob createThriftJob(String externalCompactionId, + CompactionMetadata ecm, CompactionJobQueues.MetaJob metaJob, + Optional<CompactionConfig> compactionConfig) { + + Set<CompactableFile> selectedFiles; + if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) { + selectedFiles = Set.of(); + } else { + selectedFiles = metaJob.getTabletMetadata().getSelectedFiles().getFiles().stream() + .map(file -> new CompactableFileImpl(file, + metaJob.getTabletMetadata().getFilesMap().get(file))) + .collect(Collectors.toUnmodifiableSet()); + } + + Map<String,String> overrides = CompactionPluginUtils.computeOverrides(compactionConfig, ctx, + metaJob.getTabletMetadata().getExtent(), metaJob.getJob().getFiles(), selectedFiles); + + IteratorConfig iteratorSettings = SystemIteratorUtil + .toIteratorConfig(compactionConfig.map(CompactionConfig::getIterators).orElse(List.of())); + + var files = ecm.getJobFiles().stream().map(storedTabletFile -> { + var dfv = metaJob.getTabletMetadata().getFilesMap().get(storedTabletFile); + return new InputFile(storedTabletFile.getMetadata(), dfv.getSize(), dfv.getNumEntries(), + dfv.getTime()); + }).collect(toList()); + + // The fateId here corresponds to the Fate transaction that is driving a user initiated + // compaction. A system initiated compaction has no Fate transaction driving it so its ok to set + // it to null. If anything tries to use the id for a system compaction and triggers a NPE it's + // probably a bug that needs to be fixed. + FateId fateId = null; + if (metaJob.getJob().getKind() == CompactionKind.USER) { + fateId = metaJob.getTabletMetadata().getSelectedFiles().getFateId(); + } + + return new TExternalCompactionJob(externalCompactionId, + metaJob.getTabletMetadata().getExtent().toThrift(), files, iteratorSettings, + ecm.getCompactTmpName().getNormalizedPathStr(), ecm.getPropagateDeletes(), + TCompactionKind.valueOf(ecm.getKind().name()), fateId == null ? null : fateId.toThrift(), + overrides); + } + + @Override + public void registerMetrics(MeterRegistry registry) { + Gauge.builder(METRICS_MAJC_QUEUED, jobQueues, CompactionJobQueues::getQueuedJobCount) + .tag("subprocess", "compaction.coordinator") + .description("Number of queued major compactions").register(registry); + Gauge.builder(METRICS_MAJC_RUNNING, this, CompactionCoordinator::getNumRunningCompactions) + .tag("subprocess", "compaction.coordinator") + .description("Number of running major compactions").register(registry); + + queueMetrics.registerMetrics(registry); + } + + public void addJobs(TabletMetadata tabletMetadata, Collection<CompactionJob> jobs) { + jobQueues.add(tabletMetadata, jobs); + } + + public CompactionCoordinatorService.Iface getThriftService() { + return this; + } + + private Optional<CompactionConfig> getCompactionConfig(CompactionJobQueues.MetaJob metaJob) { + if (metaJob.getJob().getKind() == CompactionKind.USER + && metaJob.getTabletMetadata().getSelectedFiles() != null) { + var cconf = + compactionConfigCache.get(metaJob.getTabletMetadata().getSelectedFiles().getFateId()); + return Optional.ofNullable(cconf); + } + return Optional.empty(); + } + + /** + * Compactors calls this method when they have finished a compaction. This method does the + * following. + * + * <ol> + * <li>Reads the tablets metadata and determines if the compaction can commit. Its possible that + * things changed while the compaction was running and it can no longer commit.</li> + * <li>Commit the compaction using a conditional mutation. If the tablets files or location + * changed since reading the tablets metadata, then conditional mutation will fail. When this + * happens it will reread the metadata and go back to step 1 conceptually. When committing a + * compaction the compacted files are removed and scan entries are added to the tablet in case the + * files are in use, this prevents GC from deleting the files between updating tablet metadata and + * refreshing the tablet. The scan entries are only added when a tablet has a location.</li> + * <li>After successful commit a refresh request is sent to the tablet if it has a location. This + * will cause the tablet to start using the newly compacted files for future scans. Also the + * tablet can delete the scan entries if there are no active scans using them.</li> + * </ol> + * + * <p> + * User compactions will be refreshed as part of the fate operation. The user compaction fate + * operation will see the compaction was committed after this code updates the tablet metadata, + * however if it were to rely on this code to do the refresh it would not be able to know when the + * refresh was actually done. Therefore, user compactions will refresh as part of the fate + * operation so that it's known to be done before the fate operation returns. Since the fate + * operation will do it, there is no need to do it here for user compactions. + * + * @param tinfo trace info + * @param credentials tcredentials object + * @param externalCompactionId compaction id + * @param textent tablet extent + * @param stats compaction stats + * @throws ThriftSecurityException when permission error + */ + @Override + public void compactionCompleted(TInfo tinfo, TCredentials credentials, + String externalCompactionId, TKeyExtent textent, TCompactionStats stats) + throws ThriftSecurityException { + // do not expect users to call this directly, expect other tservers to call this method + if (!security.canPerformSystemActions(credentials)) { + throw new AccumuloSecurityException(credentials.getPrincipal(), + SecurityErrorCode.PERMISSION_DENIED).asThriftException(); + } + + // maybe fate has not started yet + var localFates = fateInstances.get(); + while (localFates == null) { + UtilWaitThread.sleep(100); + if (shutdown.getCount() == 0) { + return; + } + localFates = fateInstances.get(); + } + + var extent = KeyExtent.fromThrift(textent); + var localFate = localFates.get(FateInstanceType.fromTableId(extent.tableId())); + + LOG.info("Compaction completed, id: {}, stats: {}, extent: {}", externalCompactionId, stats, + extent); + final var ecid = ExternalCompactionId.of(externalCompactionId); + + var tabletMeta = + ctx.getAmple().readTablet(extent, ECOMP, SELECTED, LOCATION, FILES, COMPACTED, OPID); + + var tableState = manager.getContext().getTableState(extent.tableId()); + if (tableState != TableState.ONLINE) { + // Its important this check is done after the compaction id is set in the metadata table to + // avoid race conditions with the client code that waits for tables to go offline. That code + // looks for compaction ids in the metadata table after setting the table state. When that + // client code sees nothing for a tablet its important that nothing will changes the tablets + // files after that point in time which this check ensure. + LOG.debug("Not committing compaction {} for {} because of table state {}", ecid, extent, + tableState); + // cleanup metadata table and files related to the compaction + compactionsFailed(Map.of(ecid, extent)); + return; + } + + if (!CommitCompaction.canCommitCompaction(ecid, tabletMeta)) { + return; + } + + // Start a fate transaction to commit the compaction. + CompactionMetadata ecm = tabletMeta.getExternalCompactions().get(ecid); + var renameOp = new RenameCompactionFile(new CompactionCommitData(ecid, extent, ecm, stats)); + var txid = localFate.seedTransaction("COMMIT_COMPACTION", FateKey.forCompactionCommit(ecid), + renameOp, true, "Commit compaction " + ecid); + + txid.ifPresentOrElse(fateId -> LOG.debug("initiated compaction commit {} {}", ecid, fateId), + () -> LOG.debug("compaction commit already initiated for {}", ecid)); + } + + @Override + public void compactionFailed(TInfo tinfo, TCredentials credentials, String externalCompactionId, + TKeyExtent extent) throws ThriftSecurityException { + // do not expect users to call this directly, expect other tservers to call this method + if (!security.canPerformSystemActions(credentials)) { + throw new AccumuloSecurityException(credentials.getPrincipal(), + SecurityErrorCode.PERMISSION_DENIED).asThriftException(); + } + KeyExtent fromThriftExtent = KeyExtent.fromThrift(extent); + LOG.info("Compaction failed, id: {}, extent: {}", externalCompactionId, fromThriftExtent); + final var ecid = ExternalCompactionId.of(externalCompactionId); + compactionsFailed(Map.of(ecid, KeyExtent.fromThrift(extent))); + } + + void compactionsFailed(Map<ExternalCompactionId,KeyExtent> compactions) { + // Need to process each level by itself because the conditional tablet mutator does not support + // mutating multiple data levels at the same time + compactions.entrySet().stream() + .collect(groupingBy(entry -> DataLevel.of(entry.getValue().tableId()), + Collectors.toMap(Entry::getKey, Entry::getValue))) + .forEach((level, compactionsByLevel) -> compactionFailedForLevel(compactionsByLevel)); + } + + void compactionFailedForLevel(Map<ExternalCompactionId,KeyExtent> compactions) { + + try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) { + compactions.forEach((ecid, extent) -> { + try { + ctx.requireNotDeleted(extent.tableId()); + tabletsMutator.mutateTablet(extent).requireAbsentOperation().requireCompaction(ecid) + .deleteExternalCompaction(ecid).submit(new RejectionHandler() { + + @Override + public boolean callWhenTabletDoesNotExists() { + return true; + } + + @Override + public boolean test(TabletMetadata tabletMetadata) { + return tabletMetadata == null + || !tabletMetadata.getExternalCompactions().containsKey(ecid); + } + + }); + } catch (TableDeletedException e) { + LOG.warn("Table {} was deleted, unable to update metadata for compaction failure.", + extent.tableId()); + } + }); + + final List<ExternalCompactionId> ecidsForTablet = new ArrayList<>(); + tabletsMutator.process().forEach((extent, result) -> { + if (result.getStatus() != Ample.ConditionalResult.Status.ACCEPTED) { + + // this should try again later when the dead compaction detector runs, lets log it in case + // its a persistent problem + if (LOG.isDebugEnabled()) { + var ecid = + compactions.entrySet().stream().filter(entry -> entry.getValue().equals(extent)) + .findFirst().map(Map.Entry::getKey).orElse(null); + LOG.debug("Unable to remove failed compaction {} {}", extent, ecid); + } + } else { + // compactionFailed is called from the Compactor when either a compaction fails or + // is cancelled and it's called from the DeadCompactionDetector. This block is + // entered when the conditional mutator above successfully deletes an ecid from + // the tablet metadata. Remove compaction tmp files from the tablet directory + // that have a corresponding ecid in the name. + + ecidsForTablet.clear(); + compactions.entrySet().stream().filter(e -> e.getValue().compareTo(extent) == 0) + .map(Entry::getKey).forEach(ecidsForTablet::add); + + if (!ecidsForTablet.isEmpty()) { + final TabletMetadata tm = ctx.getAmple().readTablet(extent, ColumnType.DIR); + if (tm != null) { + final Collection<Volume> vols = ctx.getVolumeManager().getVolumes(); + for (Volume vol : vols) { + try { + final String volPath = + vol.getBasePath() + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + + extent.tableId().canonical() + Path.SEPARATOR + tm.getDirName(); + final FileSystem fs = vol.getFileSystem(); + for (ExternalCompactionId ecid : ecidsForTablet) { + final String fileSuffix = "_tmp_" + ecid.canonical(); + FileStatus[] files = fs.listStatus(new Path(volPath), (path) -> { + return path.getName().endsWith(fileSuffix); + }); + if (files.length > 0) { + for (FileStatus file : files) { + if (!fs.delete(file.getPath(), false)) { + LOG.warn("Unable to delete ecid tmp file: {}: ", file.getPath()); + } else { + LOG.debug("Deleted ecid tmp file: {}", file.getPath()); + } + } + } + } + } catch (IOException e) { + LOG.error("Exception deleting compaction tmp files for tablet: {}", extent, e); + } + } + } else { + // TabletMetadata does not exist for the extent. This could be due to a merge or + // split operation. Use the utility to find tmp files at the table level + deadCompactionDetector.addTableId(extent.tableId()); + } + } + } + }); + } + + compactions.forEach((k, v) -> recordCompletion(k)); + } + + /** + * Compactor calls to update the status of the assigned compaction + * + * @param tinfo trace info + * @param credentials tcredentials object + * @param externalCompactionId compaction id + * @param update compaction status update + * @param timestamp timestamp of the message + * @throws ThriftSecurityException when permission error + */ + @Override + public void updateCompactionStatus(TInfo tinfo, TCredentials credentials, + String externalCompactionId, TCompactionStatusUpdate update, long timestamp) + throws ThriftSecurityException { + // do not expect users to call this directly, expect other tservers to call this method + if (!security.canPerformSystemActions(credentials)) { + throw new AccumuloSecurityException(credentials.getPrincipal(), + SecurityErrorCode.PERMISSION_DENIED).asThriftException(); + } + LOG.debug("Compaction status update, id: {}, timestamp: {}, update: {}", externalCompactionId, + timestamp, update); + final RunningCompaction rc = RUNNING_CACHE.get(ExternalCompactionId.of(externalCompactionId)); + if (null != rc) { + rc.addUpdate(timestamp, update); + } + } + + public void recordCompletion(ExternalCompactionId ecid) { + var rc = RUNNING_CACHE.remove(ecid); + if (rc != null) { + completed.put(ecid, rc); + } + } + + protected Set<ExternalCompactionId> readExternalCompactionIds() { + try (TabletsMetadata tabletsMetadata = + this.ctx.getAmple().readTablets().forLevel(Ample.DataLevel.USER) + .filter(new HasExternalCompactionsFilter()).fetch(ECOMP).build()) { + return tabletsMetadata.stream().flatMap(tm -> tm.getExternalCompactions().keySet().stream()) + .collect(Collectors.toSet()); + } + } + + /** + * The RUNNING_CACHE set may contain external compactions that are not actually running. This + * method periodically cleans those up. + */ + public void cleanUpRunning() { + + // grab a snapshot of the ids in the set before reading the metadata table. This is done to + // avoid removing things that are added while reading the metadata. + Set<ExternalCompactionId> idsSnapshot = Set.copyOf(RUNNING_CACHE.keySet()); + + // grab the ids that are listed as running in the metadata table. It important that this is done + // after getting the snapshot. + Set<ExternalCompactionId> idsInMetadata = readExternalCompactionIds(); + + var idsToRemove = Sets.difference(idsSnapshot, idsInMetadata); + + // remove ids that are in the running set but not in the metadata table + idsToRemove.forEach(this::recordCompletion); + + if (idsToRemove.size() > 0) { + LOG.debug("Removed stale entries from RUNNING_CACHE : {}", idsToRemove); + } + } + + /** + * Return information about running compactions + * + * @param tinfo trace info + * @param credentials tcredentials object + * @return map of ECID to TExternalCompaction objects + * @throws ThriftSecurityException permission error + */ + @Override + public TExternalCompactionList getRunningCompactions(TInfo tinfo, TCredentials credentials) + throws ThriftSecurityException { + // do not expect users to call this directly, expect other tservers to call this method + if (!security.canPerformSystemActions(credentials)) { + throw new AccumuloSecurityException(credentials.getPrincipal(), + SecurityErrorCode.PERMISSION_DENIED).asThriftException(); + } + + final TExternalCompactionList result = new TExternalCompactionList(); + RUNNING_CACHE.forEach((ecid, rc) -> { + TExternalCompaction trc = new TExternalCompaction(); + trc.setGroupName(rc.getGroupName()); + trc.setCompactor(rc.getCompactorAddress()); + trc.setUpdates(rc.getUpdates()); + trc.setJob(rc.getJob()); + result.putToCompactions(ecid.canonical(), trc); + }); + return result; + } + + /** + * Return information about recently completed compactions + * + * @param tinfo trace info + * @param credentials tcredentials object + * @return map of ECID to TExternalCompaction objects + * @throws ThriftSecurityException permission error + */ + @Override + public TExternalCompactionList getCompletedCompactions(TInfo tinfo, TCredentials credentials) + throws ThriftSecurityException { + // do not expect users to call this directly, expect other tservers to call this method + if (!security.canPerformSystemActions(credentials)) { + throw new AccumuloSecurityException(credentials.getPrincipal(), + SecurityErrorCode.PERMISSION_DENIED).asThriftException(); + } + final TExternalCompactionList result = new TExternalCompactionList(); + completed.asMap().forEach((ecid, rc) -> { + TExternalCompaction trc = new TExternalCompaction(); + trc.setGroupName(rc.getGroupName()); + trc.setCompactor(rc.getCompactorAddress()); + trc.setJob(rc.getJob()); + trc.setUpdates(rc.getUpdates()); + result.putToCompactions(ecid.canonical(), trc); + }); + return result; + } + + @Override + public void cancel(TInfo tinfo, TCredentials credentials, String externalCompactionId) + throws TException { + var runningCompaction = RUNNING_CACHE.get(ExternalCompactionId.of(externalCompactionId)); + var extent = KeyExtent.fromThrift(runningCompaction.getJob().getExtent()); + try { + NamespaceId nsId = this.ctx.getNamespaceId(extent.tableId()); + if (!security.canCompact(credentials, extent.tableId(), nsId)) { + throw new AccumuloSecurityException(credentials.getPrincipal(), + SecurityErrorCode.PERMISSION_DENIED).asThriftException(); + } + } catch (TableNotFoundException e) { + throw new ThriftTableOperationException(extent.tableId().canonical(), null, + TableOperation.COMPACT_CANCEL, TableOperationExceptionType.NOTFOUND, e.getMessage()); + } + + cancelCompactionOnCompactor(runningCompaction.getCompactorAddress(), externalCompactionId); + } + + /* Method exists to be called from test */ + public CompactionJobQueues getJobQueues() { + return jobQueues; + } + + /* Method exists to be overridden in test to hide static method */ + protected List<RunningCompaction> getCompactionsRunningOnCompactors() { + return ExternalCompactionUtil.getCompactionsRunningOnCompactors(this.ctx); + } + + /* Method exists to be overridden in test to hide static method */ + protected void cancelCompactionOnCompactor(String address, String externalCompactionId) { + HostAndPort hostPort = HostAndPort.fromString(address); + ExternalCompactionUtil.cancelCompaction(this.ctx, hostPort, externalCompactionId); + } + + private void deleteEmpty(ZooReaderWriter zoorw, String path) + throws KeeperException, InterruptedException { + try { + LOG.debug("Deleting empty ZK node {}", path); + zoorw.delete(path); + } catch (KeeperException.NotEmptyException e) { + LOG.debug("Failed to delete {} its not empty, likely an expected race condition.", path); + } + } + + private void cleanUpCompactors() { + final String compactorQueuesPath = this.ctx.getZooKeeperRoot() + Constants.ZCOMPACTORS; + + var zoorw = this.ctx.getZooReaderWriter(); + + try { + var groups = zoorw.getChildren(compactorQueuesPath); + + for (String group : groups) { + String qpath = compactorQueuesPath + "/" + group; + + var compactors = zoorw.getChildren(qpath); + + if (compactors.isEmpty()) { + deleteEmpty(zoorw, qpath); + } + + for (String compactor : compactors) { + String cpath = compactorQueuesPath + "/" + group + "/" + compactor; + var lockNodes = zoorw.getChildren(compactorQueuesPath + "/" + group + "/" + compactor); + if (lockNodes.isEmpty()) { + deleteEmpty(zoorw, cpath); + } + } + } + + } catch (KeeperException | RuntimeException e) { + LOG.warn("Failed to clean up compactors", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } + } + + private static Set<StoredTabletFile> getFilesReservedBySelection(TabletMetadata tabletMetadata, + SteadyTime steadyTime, ServerContext ctx) { + if (tabletMetadata.getSelectedFiles() == null) { + return Set.of(); + } + + if (tabletMetadata.getSelectedFiles().getCompletedJobs() > 0) { + return tabletMetadata.getSelectedFiles().getFiles(); + } + + long selectedExpirationDuration = ctx.getTableConfiguration(tabletMetadata.getTableId()) + .getTimeInMillis(Property.TABLE_COMPACTION_SELECTION_EXPIRATION); + + if (steadyTime.minus(tabletMetadata.getSelectedFiles().getSelectedTime()).toMillis() + < selectedExpirationDuration) { + return tabletMetadata.getSelectedFiles().getFiles(); + } + + return Set.of(); + } +} diff --cc server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java index 962ac18a13,0000000000..5c7a913d91 mode 100644,000000..100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java @@@ -1,606 -1,0 +1,615 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.compaction; + +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.USER_COMPACTION_REQUESTED; +import static org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator.canReserveCompaction; +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.replay; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.client.admin.CompactionConfig; +import org.apache.accumulo.core.clientImpl.thrift.TInfo; +import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; ++import org.apache.accumulo.core.compaction.thrift.TNextCompactionJob; +import org.apache.accumulo.core.conf.DefaultConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; +import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; +import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; +import org.apache.accumulo.core.manager.state.tables.TableState; +import org.apache.accumulo.core.metadata.CompactableFileImpl; +import org.apache.accumulo.core.metadata.ReferencedTabletFile; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.CompactionMetadata; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; +import org.apache.accumulo.core.metadata.schema.SelectedFiles; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletOperationId; +import org.apache.accumulo.core.metadata.schema.TabletOperationType; +import org.apache.accumulo.core.metrics.MetricsInfo; +import org.apache.accumulo.core.securityImpl.thrift.TCredentials; +import org.apache.accumulo.core.spi.compaction.CompactionJob; +import org.apache.accumulo.core.spi.compaction.CompactionKind; +import org.apache.accumulo.core.spi.compaction.CompactorGroupId; +import org.apache.accumulo.core.tabletserver.thrift.TCompactionKind; +import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats; +import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; +import org.apache.accumulo.core.trace.TraceUtil; +import org.apache.accumulo.core.util.cache.Caches; +import org.apache.accumulo.core.util.compaction.CompactionJobImpl; +import org.apache.accumulo.core.util.compaction.RunningCompaction; +import org.apache.accumulo.core.util.time.SteadyTime; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator; +import org.apache.accumulo.manager.compaction.queue.CompactionJobPriorityQueue; +import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues.MetaJob; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.conf.TableConfiguration; +import org.apache.accumulo.server.security.AuditedSecurityOperation; +import org.apache.accumulo.server.security.SecurityOperation; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.easymock.EasyMock; +import org.junit.jupiter.api.Test; + +import com.google.common.net.HostAndPort; + +public class CompactionCoordinatorTest { + + // Need a non-null fateInstances reference for CompactionCoordinator.compactionCompleted + private static final AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances = + new AtomicReference<>(Map.of()); + + private static final CompactorGroupId GROUP_ID = CompactorGroupId.of("R2DQ"); + + private final HostAndPort tserverAddr = HostAndPort.fromParts("192.168.1.1", 9090); + + public MetricsInfo getMockMetrics() { + MetricsInfo metricsInfo = createMock(MetricsInfo.class); + metricsInfo.addServiceTags(anyObject(), anyObject(), anyObject()); + expectLastCall().anyTimes(); + metricsInfo.addMetricsProducers(anyObject()); + expectLastCall().anyTimes(); + metricsInfo.init(); + expectLastCall().anyTimes(); + replay(metricsInfo); + return metricsInfo; + } + + public class TestCoordinator extends CompactionCoordinator { + + private final List<RunningCompaction> runningCompactions; + + private Set<ExternalCompactionId> metadataCompactionIds = null; + + public TestCoordinator(ServerContext ctx, SecurityOperation security, + List<RunningCompaction> runningCompactions, Manager manager) { + super(ctx, security, fateInstances, "TEST_GROUP", manager); + this.runningCompactions = runningCompactions; + } + ++ @Override ++ protected int countCompactors(String groupName) { ++ return 3; ++ } ++ + @Override + protected void startDeadCompactionDetector() {} + + @Override + protected long getTServerCheckInterval() { + return 5000L; + } + + @Override + protected void startCompactionCleaner(ScheduledThreadPoolExecutor schedExecutor) {} + + @Override + protected void startRunningCleaner(ScheduledThreadPoolExecutor schedExecutor) {} + + @Override + protected void startIdleCompactionWatcher() { + // This is called from CompactionCoordinator.run(). Counting down + // the latch will exit the run method + this.shutdown.countDown(); + } + + @Override + public void compactionCompleted(TInfo tinfo, TCredentials credentials, + String externalCompactionId, TKeyExtent textent, TCompactionStats stats) + throws ThriftSecurityException {} + + @Override + public void compactionFailed(TInfo tinfo, TCredentials credentials, String externalCompactionId, + TKeyExtent extent) throws ThriftSecurityException {} + + void setMetadataCompactionIds(Set<ExternalCompactionId> mci) { + metadataCompactionIds = mci; + } + + @Override + protected Set<ExternalCompactionId> readExternalCompactionIds() { + if (metadataCompactionIds == null) { + return RUNNING_CACHE.keySet(); + } else { + return metadataCompactionIds; + } + } + + public Map<ExternalCompactionId,RunningCompaction> getRunning() { + return RUNNING_CACHE; + } + + public void resetInternals() { + getRunning().clear(); + metadataCompactionIds = null; + } + + @Override + protected List<RunningCompaction> getCompactionsRunningOnCompactors() { + return runningCompactions; + } + + @Override + protected CompactionMetadata reserveCompaction(MetaJob metaJob, String compactorAddress, + ExternalCompactionId externalCompactionId) { + return createExternalCompactionMetadata(metaJob.getJob(), + metaJob.getJob().getFiles().stream().map(CompactableFileImpl::toStoredTabletFile) + .collect(Collectors.toSet()), + metaJob.getTabletMetadata(), compactorAddress, externalCompactionId); + } + + @Override + protected CompactionMetadata createExternalCompactionMetadata(CompactionJob job, + Set<StoredTabletFile> jobFiles, TabletMetadata tablet, String compactorAddress, + ExternalCompactionId externalCompactionId) { + FateInstanceType type = FateInstanceType.fromTableId(tablet.getExtent().tableId()); + FateId fateId = FateId.from(type, UUID.randomUUID()); + return new CompactionMetadata(jobFiles, + new ReferencedTabletFile(new Path("file:///accumulo/tables/1/default_tablet/F00001.rf")), + compactorAddress, job.getKind(), job.getPriority(), job.getGroup(), true, fateId); + } + + @Override + protected TExternalCompactionJob createThriftJob(String externalCompactionId, + CompactionMetadata ecm, MetaJob metaJob, Optional<CompactionConfig> compactionConfig) { + return new TExternalCompactionJob(externalCompactionId, + metaJob.getTabletMetadata().getExtent().toThrift(), List.of(), + SystemIteratorUtil.toIteratorConfig(List.of()), + ecm.getCompactTmpName().getNormalizedPathStr(), ecm.getPropagateDeletes(), + TCompactionKind.valueOf(ecm.getKind().name()), + FateId + .from(FateInstanceType.fromTableId(metaJob.getTabletMetadata().getExtent().tableId()), + UUID.randomUUID()) + .toThrift(), + Map.of()); + } + + @Override + protected void cancelCompactionOnCompactor(String address, String externalCompactionId) {} + + } + + @Test + public void testCoordinatorColdStart() throws Exception { + + ServerContext context = EasyMock.createNiceMock(ServerContext.class); + expect(context.getCaches()).andReturn(Caches.getInstance()).anyTimes(); + expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes(); + + MetricsInfo metricsInfo = getMockMetrics(); + expect(context.getMetricsInfo()).andReturn(metricsInfo).anyTimes(); + + AuditedSecurityOperation security = EasyMock.createNiceMock(AuditedSecurityOperation.class); + + Manager manager = EasyMock.createNiceMock(Manager.class); + expect(manager.getSteadyTime()).andReturn(SteadyTime.from(100000, TimeUnit.NANOSECONDS)) + .anyTimes(); + + EasyMock.replay(context, security, manager); + + var coordinator = new TestCoordinator(context, security, new ArrayList<>(), manager); + assertEquals(0, coordinator.getJobQueues().getQueuedJobCount()); + assertEquals(0, coordinator.getRunning().size()); + coordinator.run(); + coordinator.shutdown(); + + assertEquals(0, coordinator.getJobQueues().getQueuedJobCount()); + assertEquals(0, coordinator.getRunning().size()); + EasyMock.verify(context, security, metricsInfo); + } + + @Test + public void testCoordinatorRestartOneRunningCompaction() throws Exception { + + ServerContext context = EasyMock.createNiceMock(ServerContext.class); + expect(context.getCaches()).andReturn(Caches.getInstance()).anyTimes(); + expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes(); + + MetricsInfo metricsInfo = getMockMetrics(); + expect(context.getMetricsInfo()).andReturn(metricsInfo).anyTimes(); + + List<RunningCompaction> runningCompactions = new ArrayList<>(); + ExternalCompactionId eci = ExternalCompactionId.generate(UUID.randomUUID()); + TExternalCompactionJob job = EasyMock.createNiceMock(TExternalCompactionJob.class); + expect(job.getExternalCompactionId()).andReturn(eci.toString()).anyTimes(); + TKeyExtent extent = new TKeyExtent(); + extent.setTable("1".getBytes()); + runningCompactions.add(new RunningCompaction(job, tserverAddr.toString(), GROUP_ID.toString())); + + AuditedSecurityOperation security = EasyMock.createNiceMock(AuditedSecurityOperation.class); + + Manager manager = EasyMock.createNiceMock(Manager.class); + expect(manager.getSteadyTime()).andReturn(SteadyTime.from(100000, TimeUnit.NANOSECONDS)) + .anyTimes(); + + EasyMock.replay(context, job, security, manager); + + var coordinator = new TestCoordinator(context, security, runningCompactions, manager); + coordinator.resetInternals(); + assertEquals(0, coordinator.getJobQueues().getQueuedJobCount()); + assertEquals(0, coordinator.getRunning().size()); + coordinator.run(); + coordinator.shutdown(); + assertEquals(0, coordinator.getJobQueues().getQueuedJobCount()); + assertEquals(1, coordinator.getRunning().size()); + + Map<ExternalCompactionId,RunningCompaction> running = coordinator.getRunning(); + Entry<ExternalCompactionId,RunningCompaction> ecomp = running.entrySet().iterator().next(); + assertEquals(eci, ecomp.getKey()); + RunningCompaction rc = ecomp.getValue(); + assertEquals(GROUP_ID.toString(), rc.getGroupName()); + assertEquals(tserverAddr.toString(), rc.getCompactorAddress()); + + EasyMock.verify(context, job, security); + } + + @Test + public void testGetCompactionJob() throws Exception { + + TableConfiguration tconf = EasyMock.createNiceMock(TableConfiguration.class); + expect(tconf.get(Property.TABLE_COMPACTION_CONFIGURER)) + .andReturn(Property.TABLE_COMPACTION_CONFIGURER.getDefaultValue()).anyTimes(); + + ServerContext context = EasyMock.createNiceMock(ServerContext.class); + expect(context.getCaches()).andReturn(Caches.getInstance()).anyTimes(); + expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes(); + expect(context.getTableConfiguration(TableId.of("2a"))).andReturn(tconf).anyTimes(); + + MetricsInfo metricsInfo = getMockMetrics(); + expect(context.getMetricsInfo()).andReturn(metricsInfo).anyTimes(); + + TCredentials creds = EasyMock.createNiceMock(TCredentials.class); + expect(context.rpcCreds()).andReturn(creds).anyTimes(); + + AuditedSecurityOperation security = EasyMock.createNiceMock(AuditedSecurityOperation.class); + expect(security.canPerformSystemActions(creds)).andReturn(true).anyTimes(); + + KeyExtent ke = new KeyExtent(TableId.of("2a"), new Text("z"), new Text("b")); + TabletMetadata tm = EasyMock.createNiceMock(TabletMetadata.class); + expect(tm.getExtent()).andReturn(ke).anyTimes(); + expect(tm.getFiles()).andReturn(Collections.emptySet()).anyTimes(); + expect(tm.getTableId()).andReturn(ke.tableId()).anyTimes(); + Manager manager = EasyMock.createNiceMock(Manager.class); + expect(manager.getSteadyTime()).andReturn(SteadyTime.from(100000, TimeUnit.NANOSECONDS)) + .anyTimes(); + + EasyMock.replay(tconf, context, creds, tm, security, manager); + + var coordinator = new TestCoordinator(context, security, new ArrayList<>(), manager); + assertEquals(0, coordinator.getJobQueues().getQueuedJobCount()); + assertEquals(0, coordinator.getRunning().size()); + // Use coordinator.run() to populate the internal data structures. This is tested in a different + // test. + coordinator.run(); + coordinator.shutdown(); + + assertEquals(0, coordinator.getJobQueues().getQueuedJobCount()); + assertEquals(0, coordinator.getRunning().size()); + + // Add a job to the job queue + CompactionJob job = new CompactionJobImpl((short) 1, GROUP_ID, Collections.emptyList(), + CompactionKind.SYSTEM, Optional.of(true)); + coordinator.addJobs(tm, Collections.singleton(job)); + CompactionJobPriorityQueue queue = coordinator.getJobQueues().getQueue(GROUP_ID); + assertEquals(1, queue.getQueuedJobs()); + + // Get the next job + ExternalCompactionId eci = ExternalCompactionId.generate(UUID.randomUUID()); - TExternalCompactionJob createdJob = coordinator.getCompactionJob(new TInfo(), creds, ++ TNextCompactionJob nextJob = coordinator.getCompactionJob(new TInfo(), creds, + GROUP_ID.toString(), "localhost:10241", eci.toString()); ++ assertEquals(3, nextJob.getCompactorCount()); ++ TExternalCompactionJob createdJob = nextJob.getJob(); + assertEquals(eci.toString(), createdJob.getExternalCompactionId()); + assertEquals(ke, KeyExtent.fromThrift(createdJob.getExtent())); + + assertEquals(0, coordinator.getJobQueues().getQueuedJobCount()); + assertEquals(1, coordinator.getRunning().size()); + Entry<ExternalCompactionId,RunningCompaction> entry = + coordinator.getRunning().entrySet().iterator().next(); + assertEquals(eci.toString(), entry.getKey().toString()); + assertEquals("localhost:10241", entry.getValue().getCompactorAddress()); + assertEquals(eci.toString(), entry.getValue().getJob().getExternalCompactionId()); + + EasyMock.verify(tconf, context, creds, tm, security); + } + + @Test + public void testGetCompactionJobNoJobs() throws Exception { + + ServerContext context = EasyMock.createNiceMock(ServerContext.class); + expect(context.getCaches()).andReturn(Caches.getInstance()).anyTimes(); + expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes(); + + TCredentials creds = EasyMock.createNiceMock(TCredentials.class); + + AuditedSecurityOperation security = EasyMock.createNiceMock(AuditedSecurityOperation.class); + expect(security.canPerformSystemActions(creds)).andReturn(true); + + Manager manager = EasyMock.createNiceMock(Manager.class); + expect(manager.getSteadyTime()).andReturn(SteadyTime.from(100000, TimeUnit.NANOSECONDS)) + .anyTimes(); + + EasyMock.replay(context, creds, security, manager); + + var coordinator = new TestCoordinator(context, security, new ArrayList<>(), manager); - TExternalCompactionJob job = coordinator.getCompactionJob(TraceUtil.traceInfo(), creds, ++ TNextCompactionJob nextJob = coordinator.getCompactionJob(TraceUtil.traceInfo(), creds, + GROUP_ID.toString(), "localhost:10240", UUID.randomUUID().toString()); - assertNull(job.getExternalCompactionId()); ++ assertEquals(3, nextJob.getCompactorCount()); ++ assertNull(nextJob.getJob().getExternalCompactionId()); + + EasyMock.verify(context, creds, security); + } + + @Test + public void testCleanUpRunning() throws Exception { + + ServerContext context = EasyMock.createNiceMock(ServerContext.class); + expect(context.getCaches()).andReturn(Caches.getInstance()).anyTimes(); + expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes(); + + TCredentials creds = EasyMock.createNiceMock(TCredentials.class); + + AuditedSecurityOperation security = EasyMock.createNiceMock(AuditedSecurityOperation.class); + Manager manager = EasyMock.createNiceMock(Manager.class); + expect(manager.getSteadyTime()).andReturn(SteadyTime.from(100000, TimeUnit.NANOSECONDS)) + .anyTimes(); + + EasyMock.replay(context, creds, security, manager); + + TestCoordinator coordinator = + new TestCoordinator(context, security, new ArrayList<>(), manager); + + var ecid1 = ExternalCompactionId.generate(UUID.randomUUID()); + var ecid2 = ExternalCompactionId.generate(UUID.randomUUID()); + var ecid3 = ExternalCompactionId.generate(UUID.randomUUID()); + + coordinator.getRunning().put(ecid1, new RunningCompaction(new TExternalCompaction())); + coordinator.getRunning().put(ecid2, new RunningCompaction(new TExternalCompaction())); + coordinator.getRunning().put(ecid3, new RunningCompaction(new TExternalCompaction())); + + coordinator.cleanUpRunning(); + + assertEquals(Set.of(ecid1, ecid2, ecid3), coordinator.getRunning().keySet()); + + coordinator.setMetadataCompactionIds(Set.of(ecid1, ecid2)); + + coordinator.cleanUpRunning(); + + assertEquals(Set.of(ecid1, ecid2), coordinator.getRunning().keySet()); + + EasyMock.verify(context, creds, security); + + } + + @Test + public void testCanReserve() throws Exception { + TableId tableId1 = TableId.of("5"); + TableId tableId2 = TableId.of("6"); + + var file1 = StoredTabletFile.of(new URI("file:///accumulo/tables/1/default_tablet/F00001.rf")); + var file2 = StoredTabletFile.of(new URI("file:///accumulo/tables/1/default_tablet/F00002.rf")); + var file3 = StoredTabletFile.of(new URI("file:///accumulo/tables/1/default_tablet/F00003.rf")); + var file4 = StoredTabletFile.of(new URI("file:///accumulo/tables/1/default_tablet/F00004.rf")); + + ServerContext context = EasyMock.mock(ServerContext.class); + EasyMock.expect(context.getTableState(tableId1)).andReturn(TableState.ONLINE).atLeastOnce(); + EasyMock.expect(context.getTableState(tableId2)).andReturn(TableState.OFFLINE).atLeastOnce(); + + TableConfiguration tableConf = EasyMock.createMock(TableConfiguration.class); + EasyMock.expect(tableConf.getTimeInMillis(Property.TABLE_COMPACTION_SELECTION_EXPIRATION)) + .andReturn(100L).atLeastOnce(); + + EasyMock.expect(context.getTableConfiguration(anyObject())).andReturn(tableConf).atLeastOnce(); + + FateId fateId1 = FateId.from(FateInstanceType.USER, UUID.randomUUID()); + + CompactorGroupId cgid = CompactorGroupId.of("G1"); + ReferencedTabletFile tmp1 = + ReferencedTabletFile.of(new Path("file:///accumulo/tables/1/default_tablet/C00005.rf_tmp")); + CompactionMetadata cm1 = new CompactionMetadata(Set.of(file1, file2), tmp1, "localhost:4444", + CompactionKind.SYSTEM, (short) 5, cgid, false, null); + + ReferencedTabletFile tmp2 = + ReferencedTabletFile.of(new Path("file:///accumulo/tables/1/default_tablet/C00006.rf_tmp")); + CompactionMetadata cm2 = new CompactionMetadata(Set.of(file3), tmp2, "localhost:5555", + CompactionKind.USER, (short) 5, cgid, false, fateId1); + + EasyMock.replay(context, tableConf); + + KeyExtent extent1 = new KeyExtent(tableId1, null, null); + + var dfv = new DataFileValue(1000, 100); + + var cid1 = ExternalCompactionId.generate(UUID.randomUUID()); + var cid2 = ExternalCompactionId.generate(UUID.randomUUID()); + + var selectedWithoutComp = new SelectedFiles(Set.of(file1, file2, file3), false, fateId1, + SteadyTime.from(100, TimeUnit.SECONDS)); + var selectedWithComp = new SelectedFiles(Set.of(file1, file2, file3), false, fateId1, 1, + SteadyTime.from(100, TimeUnit.SECONDS)); + + var time = SteadyTime.from(1000, TimeUnit.SECONDS); + + // should not be able to compact an offline table + var tabletOffline = TabletMetadata.builder(new KeyExtent(tableId2, null, null)) + .putFile(file1, dfv).putFile(file2, dfv).putFile(file3, dfv).putFile(file4, dfv) + .build(OPID, ECOMP, USER_COMPACTION_REQUESTED, SELECTED); + assertFalse(canReserveCompaction(tabletOffline, CompactionKind.SYSTEM, Set.of(file1, file2), + context, time)); + + // nothing should prevent this compaction + var tablet1 = + TabletMetadata.builder(extent1).putFile(file1, dfv).putFile(file2, dfv).putFile(file3, dfv) + .putFile(file4, dfv).build(OPID, ECOMP, USER_COMPACTION_REQUESTED, SELECTED); + assertTrue( + canReserveCompaction(tablet1, CompactionKind.SYSTEM, Set.of(file1, file2), context, time)); + + // should not be able to do a user compaction unless selected files are present + assertFalse( + canReserveCompaction(tablet1, CompactionKind.USER, Set.of(file1, file2), context, time)); + + // should not be able to compact a tablet with user compaction request in place + var tablet3 = + TabletMetadata.builder(extent1).putFile(file1, dfv).putFile(file2, dfv).putFile(file3, dfv) + .putFile(file4, dfv).putUserCompactionRequested(fateId1).build(OPID, ECOMP, SELECTED); + assertFalse( + canReserveCompaction(tablet3, CompactionKind.SYSTEM, Set.of(file1, file2), context, time)); + + // should not be able to compact a tablet when the job has files not present in the tablet + var tablet4 = TabletMetadata.builder(extent1).putFile(file1, dfv).putFile(file2, dfv) + .putFile(file3, dfv).build(OPID, ECOMP, USER_COMPACTION_REQUESTED, SELECTED); + assertFalse(canReserveCompaction(tablet4, CompactionKind.SYSTEM, Set.of(file1, file2, file4), + context, time)); + + // should not be able to compact a tablet with an operation id present + TabletOperationId opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId1); + var tablet5 = TabletMetadata.builder(extent1).putFile(file1, dfv).putFile(file2, dfv) + .putFile(file3, dfv).putFile(file4, dfv).putOperation(opid) + .build(ECOMP, USER_COMPACTION_REQUESTED, SELECTED); + assertFalse( + canReserveCompaction(tablet5, CompactionKind.SYSTEM, Set.of(file1, file2), context, time)); + + // should not be able to compact a tablet if the job files overlaps with running compactions + var tablet6 = TabletMetadata.builder(extent1).putFile(file1, dfv).putFile(file2, dfv) + .putFile(file3, dfv).putFile(file4, dfv).putExternalCompaction(cid1, cm1) + .putExternalCompaction(cid2, cm2).build(OPID, USER_COMPACTION_REQUESTED, SELECTED); + assertFalse( + canReserveCompaction(tablet6, CompactionKind.SYSTEM, Set.of(file1, file2), context, time)); + // should be able to compact the file that is outside of the set of files currently compacting + assertTrue(canReserveCompaction(tablet6, CompactionKind.SYSTEM, Set.of(file4), context, time)); + + // create a tablet with a selected set of files + var selTabletWithComp = TabletMetadata.builder(extent1).putFile(file1, dfv).putFile(file2, dfv) + .putFile(file3, dfv).putFile(file4, dfv).putSelectedFiles(selectedWithComp) + .build(OPID, USER_COMPACTION_REQUESTED, ECOMP); + // 0 completed jobs + var selTabletWithoutComp = TabletMetadata.builder(extent1).putFile(file1, dfv) + .putFile(file2, dfv).putFile(file3, dfv).putFile(file4, dfv) + .putSelectedFiles(selectedWithoutComp).build(OPID, USER_COMPACTION_REQUESTED, ECOMP); + + // Should be able to start if no completed and overlap + assertTrue(canReserveCompaction(selTabletWithoutComp, CompactionKind.SYSTEM, + Set.of(file1, file2), context, time)); + assertTrue(canReserveCompaction(selTabletWithoutComp, CompactionKind.SYSTEM, + Set.of(file3, file4), context, time)); + + // should not be able to start a system compaction if the set of files overlaps with the + // selected files + assertFalse(canReserveCompaction(selTabletWithComp, CompactionKind.SYSTEM, Set.of(file1, file2), + context, time)); + assertFalse(canReserveCompaction(selTabletWithComp, CompactionKind.SYSTEM, Set.of(file3, file4), + context, time)); + // should be able to start a system compaction on the set of files not in the selected set + assertTrue(canReserveCompaction(selTabletWithComp, CompactionKind.SYSTEM, Set.of(file4), + context, time)); + // should be able to start user compactions on files that are selected + assertTrue(canReserveCompaction(selTabletWithComp, CompactionKind.USER, Set.of(file1, file2), + context, time)); + assertTrue(canReserveCompaction(selTabletWithComp, CompactionKind.USER, Set.of(file2, file3), + context, time)); + assertTrue(canReserveCompaction(selTabletWithComp, CompactionKind.USER, + Set.of(file1, file2, file3), context, time)); + // should not be able to start user compactions on files that fall outside of the selected set + assertFalse(canReserveCompaction(selTabletWithComp, CompactionKind.USER, Set.of(file1, file4), + context, time)); + assertFalse( + canReserveCompaction(selTabletWithComp, CompactionKind.USER, Set.of(file4), context, time)); + assertFalse(canReserveCompaction(selTabletWithComp, CompactionKind.USER, + Set.of(file1, file2, file3, file4), context, time)); + + // test selected files and running compaction + var selRunningTablet = TabletMetadata.builder(extent1).putFile(file1, dfv).putFile(file2, dfv) + .putFile(file3, dfv).putFile(file4, dfv).putSelectedFiles(selectedWithComp) + .putExternalCompaction(cid2, cm2).build(OPID, USER_COMPACTION_REQUESTED); + // should be able to compact files that are in the selected set and not in the running set + assertTrue(canReserveCompaction(selRunningTablet, CompactionKind.USER, Set.of(file1, file2), + context, time)); + // should not be able to compact because files overlap the running set + assertFalse(canReserveCompaction(selRunningTablet, CompactionKind.USER, Set.of(file2, file3), + context, time)); + // should not be able to start a system compaction if the set of files overlaps with the + // selected files and/or the running set + assertFalse(canReserveCompaction(selRunningTablet, CompactionKind.SYSTEM, Set.of(file1, file2), + context, time)); + assertFalse(canReserveCompaction(selRunningTablet, CompactionKind.SYSTEM, Set.of(file3, file4), + context, time)); + // should be able to start a system compaction on the set of files not in the selected set + assertTrue(canReserveCompaction(selRunningTablet, CompactionKind.SYSTEM, Set.of(file4), context, + time)); + + // should not be able to compact a tablet that does not exists + assertFalse( + canReserveCompaction(null, CompactionKind.SYSTEM, Set.of(file1, file2), context, time)); + assertFalse( + canReserveCompaction(null, CompactionKind.USER, Set.of(file1, file2), context, time)); + + EasyMock.verify(context); + } +}