This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit df8a3670a4100bee083a02d6ac6f45ff36ffe8f6 Merge: 3ab133cef0 1d365a9089 Author: Christopher Tubbs <ctubb...@apache.org> AuthorDate: Wed Aug 28 17:53:16 2024 -0400 Merge branch '3.1' .../java/org/apache/accumulo/core/file/rfile/AbstractRFileTest.java | 4 +++- .../apache/accumulo/core/iterators/user/IndexedDocIteratorTest.java | 2 +- .../java/org/apache/accumulo/hadoop/its/mapreduce/MapReduceIT.java | 5 +++-- .../minicluster/MiniAccumuloClusterExistingZooKeepersTest.java | 3 ++- .../accumulo/manager/compaction/CompactionCoordinatorTest.java | 3 ++- 5 files changed, 11 insertions(+), 6 deletions(-) diff --cc server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java index 90c100aa72,0000000000..9967610691 mode 100644,000000..100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java @@@ -1,617 -1,0 +1,618 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.compaction; + ++import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.USER_COMPACTION_REQUESTED; +import static org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator.canReserveCompaction; +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.replay; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.client.admin.CompactionConfig; +import org.apache.accumulo.core.clientImpl.thrift.TInfo; +import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; +import org.apache.accumulo.core.compaction.thrift.TNextCompactionJob; +import org.apache.accumulo.core.conf.DefaultConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; +import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; +import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; +import org.apache.accumulo.core.manager.state.tables.TableState; +import org.apache.accumulo.core.metadata.CompactableFileImpl; +import org.apache.accumulo.core.metadata.ReferencedTabletFile; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.CompactionMetadata; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; +import org.apache.accumulo.core.metadata.schema.SelectedFiles; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletOperationId; +import org.apache.accumulo.core.metadata.schema.TabletOperationType; +import org.apache.accumulo.core.metrics.MetricsInfo; +import org.apache.accumulo.core.securityImpl.thrift.TCredentials; +import org.apache.accumulo.core.spi.compaction.CompactionJob; +import org.apache.accumulo.core.spi.compaction.CompactionKind; +import org.apache.accumulo.core.spi.compaction.CompactorGroupId; +import org.apache.accumulo.core.tabletserver.thrift.TCompactionKind; +import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats; +import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; +import org.apache.accumulo.core.trace.TraceUtil; +import org.apache.accumulo.core.util.cache.Caches; +import org.apache.accumulo.core.util.compaction.CompactionJobImpl; +import org.apache.accumulo.core.util.compaction.RunningCompaction; +import org.apache.accumulo.core.util.time.SteadyTime; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator; +import org.apache.accumulo.manager.compaction.queue.CompactionJobPriorityQueue; +import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues.MetaJob; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.conf.TableConfiguration; +import org.apache.accumulo.server.security.AuditedSecurityOperation; +import org.apache.accumulo.server.security.SecurityOperation; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.easymock.EasyMock; +import org.junit.jupiter.api.Test; + +import com.google.common.net.HostAndPort; + +public class CompactionCoordinatorTest { + + // Need a non-null fateInstances reference for CompactionCoordinator.compactionCompleted + private static final AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances = + new AtomicReference<>(Map.of()); + + private static final CompactorGroupId GROUP_ID = CompactorGroupId.of("R2DQ"); + + private final HostAndPort tserverAddr = HostAndPort.fromParts("192.168.1.1", 9090); + + public MetricsInfo getMockMetrics() { + MetricsInfo metricsInfo = createMock(MetricsInfo.class); + metricsInfo.addServiceTags(anyObject(), anyObject(), anyObject()); + expectLastCall().anyTimes(); + metricsInfo.addMetricsProducers(anyObject()); + expectLastCall().anyTimes(); + metricsInfo.init(); + expectLastCall().anyTimes(); + replay(metricsInfo); + return metricsInfo; + } + + public class TestCoordinator extends CompactionCoordinator { + + private final List<RunningCompaction> runningCompactions; + + private Set<ExternalCompactionId> metadataCompactionIds = null; + + public TestCoordinator(ServerContext ctx, SecurityOperation security, + List<RunningCompaction> runningCompactions, Manager manager) { + super(ctx, security, fateInstances, manager); + this.runningCompactions = runningCompactions; + } + + @Override + protected int countCompactors(String groupName) { + return 3; + } + + @Override + protected void startDeadCompactionDetector() {} + + @Override + protected long getTServerCheckInterval() { + return 5000L; + } + + @Override + protected void startCompactorZKCleaner(ScheduledThreadPoolExecutor schedExecutor) {} + + @Override + protected void startInternalStateCleaner(ScheduledThreadPoolExecutor schedExecutor) { + // This is called from CompactionCoordinator.run(). Counting down + // the latch will exit the run method + this.shutdown.countDown(); + } + + @Override + public void compactionCompleted(TInfo tinfo, TCredentials credentials, + String externalCompactionId, TKeyExtent textent, TCompactionStats stats) + throws ThriftSecurityException {} + + @Override + public void compactionFailed(TInfo tinfo, TCredentials credentials, String externalCompactionId, + TKeyExtent extent) throws ThriftSecurityException {} + + void setMetadataCompactionIds(Set<ExternalCompactionId> mci) { + metadataCompactionIds = mci; + } + + @Override + protected Set<ExternalCompactionId> readExternalCompactionIds() { + if (metadataCompactionIds == null) { + return RUNNING_CACHE.keySet(); + } else { + return metadataCompactionIds; + } + } + + public Map<ExternalCompactionId,RunningCompaction> getRunning() { + return RUNNING_CACHE; + } + + public void resetInternals() { + getRunning().clear(); + metadataCompactionIds = null; + } + + @Override + protected List<RunningCompaction> getCompactionsRunningOnCompactors() { + return runningCompactions; + } + + @Override + protected Map<String,Set<HostAndPort>> getRunningCompactors() { + return Map.of(); + } + + @Override + protected CompactionMetadata reserveCompaction(MetaJob metaJob, String compactorAddress, + ExternalCompactionId externalCompactionId) { + return createExternalCompactionMetadata(metaJob.getJob(), + metaJob.getJob().getFiles().stream().map(CompactableFileImpl::toStoredTabletFile) + .collect(Collectors.toSet()), + metaJob.getTabletMetadata(), compactorAddress, externalCompactionId); + } + + @Override + protected CompactionMetadata createExternalCompactionMetadata(CompactionJob job, + Set<StoredTabletFile> jobFiles, TabletMetadata tablet, String compactorAddress, + ExternalCompactionId externalCompactionId) { + FateInstanceType type = FateInstanceType.fromTableId(tablet.getExtent().tableId()); + FateId fateId = FateId.from(type, UUID.randomUUID()); + return new CompactionMetadata(jobFiles, + new ReferencedTabletFile(new Path("file:///accumulo/tables/1/default_tablet/F00001.rf")), + compactorAddress, job.getKind(), job.getPriority(), job.getGroup(), true, fateId); + } + + @Override + protected TExternalCompactionJob createThriftJob(String externalCompactionId, + CompactionMetadata ecm, MetaJob metaJob, Optional<CompactionConfig> compactionConfig) { + return new TExternalCompactionJob(externalCompactionId, + metaJob.getTabletMetadata().getExtent().toThrift(), List.of(), + SystemIteratorUtil.toIteratorConfig(List.of()), + ecm.getCompactTmpName().getNormalizedPathStr(), ecm.getPropagateDeletes(), + TCompactionKind.valueOf(ecm.getKind().name()), + FateId + .from(FateInstanceType.fromTableId(metaJob.getTabletMetadata().getExtent().tableId()), + UUID.randomUUID()) + .toThrift(), + Map.of()); + } + + @Override + protected void cancelCompactionOnCompactor(String address, String externalCompactionId) {} + + } + + @Test + public void testCoordinatorColdStart() throws Exception { + + ServerContext context = EasyMock.createNiceMock(ServerContext.class); + expect(context.getCaches()).andReturn(Caches.getInstance()).anyTimes(); + expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes(); + + MetricsInfo metricsInfo = getMockMetrics(); + expect(context.getMetricsInfo()).andReturn(metricsInfo).anyTimes(); + + AuditedSecurityOperation security = EasyMock.createNiceMock(AuditedSecurityOperation.class); + + Manager manager = EasyMock.createNiceMock(Manager.class); + expect(manager.getSteadyTime()).andReturn(SteadyTime.from(100000, TimeUnit.NANOSECONDS)) + .anyTimes(); + + EasyMock.replay(context, security, manager); + + var coordinator = new TestCoordinator(context, security, new ArrayList<>(), manager); + assertEquals(0, coordinator.getJobQueues().getQueuedJobCount()); + assertEquals(0, coordinator.getRunning().size()); + coordinator.run(); + coordinator.shutdown(); + + assertEquals(0, coordinator.getJobQueues().getQueuedJobCount()); + assertEquals(0, coordinator.getRunning().size()); + EasyMock.verify(context, security, metricsInfo); + } + + @Test + public void testCoordinatorRestartOneRunningCompaction() throws Exception { + + ServerContext context = EasyMock.createNiceMock(ServerContext.class); + expect(context.getCaches()).andReturn(Caches.getInstance()).anyTimes(); + expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes(); + + MetricsInfo metricsInfo = getMockMetrics(); + expect(context.getMetricsInfo()).andReturn(metricsInfo).anyTimes(); + + List<RunningCompaction> runningCompactions = new ArrayList<>(); + ExternalCompactionId eci = ExternalCompactionId.generate(UUID.randomUUID()); + TExternalCompactionJob job = EasyMock.createNiceMock(TExternalCompactionJob.class); + expect(job.getExternalCompactionId()).andReturn(eci.toString()).anyTimes(); + TKeyExtent extent = new TKeyExtent(); - extent.setTable("1".getBytes()); ++ extent.setTable("1".getBytes(UTF_8)); + runningCompactions.add(new RunningCompaction(job, tserverAddr.toString(), GROUP_ID.toString())); + + AuditedSecurityOperation security = EasyMock.createNiceMock(AuditedSecurityOperation.class); + + Manager manager = EasyMock.createNiceMock(Manager.class); + expect(manager.getSteadyTime()).andReturn(SteadyTime.from(100000, TimeUnit.NANOSECONDS)) + .anyTimes(); + + EasyMock.replay(context, job, security, manager); + + var coordinator = new TestCoordinator(context, security, runningCompactions, manager); + coordinator.resetInternals(); + assertEquals(0, coordinator.getJobQueues().getQueuedJobCount()); + assertEquals(0, coordinator.getRunning().size()); + coordinator.run(); + coordinator.shutdown(); + assertEquals(0, coordinator.getJobQueues().getQueuedJobCount()); + assertEquals(1, coordinator.getRunning().size()); + + Map<ExternalCompactionId,RunningCompaction> running = coordinator.getRunning(); + Entry<ExternalCompactionId,RunningCompaction> ecomp = running.entrySet().iterator().next(); + assertEquals(eci, ecomp.getKey()); + RunningCompaction rc = ecomp.getValue(); + assertEquals(GROUP_ID.toString(), rc.getGroupName()); + assertEquals(tserverAddr.toString(), rc.getCompactorAddress()); + + EasyMock.verify(context, job, security); + } + + @Test + public void testGetCompactionJob() throws Exception { + + TableConfiguration tconf = EasyMock.createNiceMock(TableConfiguration.class); + expect(tconf.get(Property.TABLE_COMPACTION_CONFIGURER)) + .andReturn(Property.TABLE_COMPACTION_CONFIGURER.getDefaultValue()).anyTimes(); + + ServerContext context = EasyMock.createNiceMock(ServerContext.class); + expect(context.getCaches()).andReturn(Caches.getInstance()).anyTimes(); + expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes(); + expect(context.getTableConfiguration(TableId.of("2a"))).andReturn(tconf).anyTimes(); + + MetricsInfo metricsInfo = getMockMetrics(); + expect(context.getMetricsInfo()).andReturn(metricsInfo).anyTimes(); + + TCredentials creds = EasyMock.createNiceMock(TCredentials.class); + expect(context.rpcCreds()).andReturn(creds).anyTimes(); + + AuditedSecurityOperation security = EasyMock.createNiceMock(AuditedSecurityOperation.class); + expect(security.canPerformSystemActions(creds)).andReturn(true).anyTimes(); + + KeyExtent ke = new KeyExtent(TableId.of("2a"), new Text("z"), new Text("b")); + TabletMetadata tm = EasyMock.createNiceMock(TabletMetadata.class); + expect(tm.getExtent()).andReturn(ke).anyTimes(); + expect(tm.getFiles()).andReturn(Collections.emptySet()).anyTimes(); + expect(tm.getTableId()).andReturn(ke.tableId()).anyTimes(); + Manager manager = EasyMock.createNiceMock(Manager.class); + expect(manager.getSteadyTime()).andReturn(SteadyTime.from(100000, TimeUnit.NANOSECONDS)) + .anyTimes(); + + EasyMock.replay(tconf, context, creds, tm, security, manager); + + var coordinator = new TestCoordinator(context, security, new ArrayList<>(), manager); + assertEquals(0, coordinator.getJobQueues().getQueuedJobCount()); + assertEquals(0, coordinator.getRunning().size()); + // Use coordinator.run() to populate the internal data structures. This is tested in a different + // test. + coordinator.run(); + coordinator.shutdown(); + + assertEquals(0, coordinator.getJobQueues().getQueuedJobCount()); + assertEquals(0, coordinator.getRunning().size()); + + // Add a job to the job queue + CompactionJob job = new CompactionJobImpl((short) 1, GROUP_ID, Collections.emptyList(), + CompactionKind.SYSTEM, Optional.of(true)); + coordinator.addJobs(tm, Collections.singleton(job)); + CompactionJobPriorityQueue queue = coordinator.getJobQueues().getQueue(GROUP_ID); + assertEquals(1, queue.getQueuedJobs()); + + // Get the next job + ExternalCompactionId eci = ExternalCompactionId.generate(UUID.randomUUID()); + TNextCompactionJob nextJob = coordinator.getCompactionJob(new TInfo(), creds, + GROUP_ID.toString(), "localhost:10241", eci.toString()); + assertEquals(3, nextJob.getCompactorCount()); + TExternalCompactionJob createdJob = nextJob.getJob(); + assertEquals(eci.toString(), createdJob.getExternalCompactionId()); + assertEquals(ke, KeyExtent.fromThrift(createdJob.getExtent())); + + assertEquals(0, coordinator.getJobQueues().getQueuedJobCount()); + assertEquals(1, coordinator.getRunning().size()); + Entry<ExternalCompactionId,RunningCompaction> entry = + coordinator.getRunning().entrySet().iterator().next(); + assertEquals(eci.toString(), entry.getKey().toString()); + assertEquals("localhost:10241", entry.getValue().getCompactorAddress()); + assertEquals(eci.toString(), entry.getValue().getJob().getExternalCompactionId()); + + EasyMock.verify(tconf, context, creds, tm, security); + } + + @Test + public void testGetCompactionJobNoJobs() throws Exception { + + ServerContext context = EasyMock.createNiceMock(ServerContext.class); + expect(context.getCaches()).andReturn(Caches.getInstance()).anyTimes(); + expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes(); + + TCredentials creds = EasyMock.createNiceMock(TCredentials.class); + + AuditedSecurityOperation security = EasyMock.createNiceMock(AuditedSecurityOperation.class); + expect(security.canPerformSystemActions(creds)).andReturn(true); + + Manager manager = EasyMock.createNiceMock(Manager.class); + expect(manager.getSteadyTime()).andReturn(SteadyTime.from(100000, TimeUnit.NANOSECONDS)) + .anyTimes(); + + EasyMock.replay(context, creds, security, manager); + + var coordinator = new TestCoordinator(context, security, new ArrayList<>(), manager); + TNextCompactionJob nextJob = coordinator.getCompactionJob(TraceUtil.traceInfo(), creds, + GROUP_ID.toString(), "localhost:10240", UUID.randomUUID().toString()); + assertEquals(3, nextJob.getCompactorCount()); + assertNull(nextJob.getJob().getExternalCompactionId()); + + EasyMock.verify(context, creds, security); + } + + @Test + public void testCleanUpRunning() throws Exception { + + ServerContext context = EasyMock.createNiceMock(ServerContext.class); + expect(context.getCaches()).andReturn(Caches.getInstance()).anyTimes(); + expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes(); + + TCredentials creds = EasyMock.createNiceMock(TCredentials.class); + + AuditedSecurityOperation security = EasyMock.createNiceMock(AuditedSecurityOperation.class); + Manager manager = EasyMock.createNiceMock(Manager.class); + expect(manager.getSteadyTime()).andReturn(SteadyTime.from(100000, TimeUnit.NANOSECONDS)) + .anyTimes(); + + EasyMock.replay(context, creds, security, manager); + + TestCoordinator coordinator = + new TestCoordinator(context, security, new ArrayList<>(), manager); + + var ecid1 = ExternalCompactionId.generate(UUID.randomUUID()); + var ecid2 = ExternalCompactionId.generate(UUID.randomUUID()); + var ecid3 = ExternalCompactionId.generate(UUID.randomUUID()); + + coordinator.getRunning().put(ecid1, new RunningCompaction(new TExternalCompaction())); + coordinator.getRunning().put(ecid2, new RunningCompaction(new TExternalCompaction())); + coordinator.getRunning().put(ecid3, new RunningCompaction(new TExternalCompaction())); + + coordinator.cleanUpInternalState(); + + assertEquals(Set.of(ecid1, ecid2, ecid3), coordinator.getRunning().keySet()); + + coordinator.setMetadataCompactionIds(Set.of(ecid1, ecid2)); + + coordinator.cleanUpInternalState(); + + assertEquals(Set.of(ecid1, ecid2), coordinator.getRunning().keySet()); + + EasyMock.verify(context, creds, security); + + } + + @Test + public void testCanReserve() throws Exception { + TableId tableId1 = TableId.of("5"); + TableId tableId2 = TableId.of("6"); + + var file1 = StoredTabletFile.of(new URI("file:///accumulo/tables/1/default_tablet/F00001.rf")); + var file2 = StoredTabletFile.of(new URI("file:///accumulo/tables/1/default_tablet/F00002.rf")); + var file3 = StoredTabletFile.of(new URI("file:///accumulo/tables/1/default_tablet/F00003.rf")); + var file4 = StoredTabletFile.of(new URI("file:///accumulo/tables/1/default_tablet/F00004.rf")); + + ServerContext context = EasyMock.mock(ServerContext.class); + EasyMock.expect(context.getTableState(tableId1)).andReturn(TableState.ONLINE).atLeastOnce(); + EasyMock.expect(context.getTableState(tableId2)).andReturn(TableState.OFFLINE).atLeastOnce(); + + TableConfiguration tableConf = EasyMock.createMock(TableConfiguration.class); + EasyMock.expect(tableConf.getTimeInMillis(Property.TABLE_COMPACTION_SELECTION_EXPIRATION)) + .andReturn(100L).atLeastOnce(); + + EasyMock.expect(context.getTableConfiguration(anyObject())).andReturn(tableConf).atLeastOnce(); + + FateId fateId1 = FateId.from(FateInstanceType.USER, UUID.randomUUID()); + + CompactorGroupId cgid = CompactorGroupId.of("G1"); + ReferencedTabletFile tmp1 = + ReferencedTabletFile.of(new Path("file:///accumulo/tables/1/default_tablet/C00005.rf_tmp")); + CompactionMetadata cm1 = new CompactionMetadata(Set.of(file1, file2), tmp1, "localhost:4444", + CompactionKind.SYSTEM, (short) 5, cgid, false, null); + + ReferencedTabletFile tmp2 = + ReferencedTabletFile.of(new Path("file:///accumulo/tables/1/default_tablet/C00006.rf_tmp")); + CompactionMetadata cm2 = new CompactionMetadata(Set.of(file3), tmp2, "localhost:5555", + CompactionKind.USER, (short) 5, cgid, false, fateId1); + + EasyMock.replay(context, tableConf); + + KeyExtent extent1 = new KeyExtent(tableId1, null, null); + + var dfv = new DataFileValue(1000, 100); + + var cid1 = ExternalCompactionId.generate(UUID.randomUUID()); + var cid2 = ExternalCompactionId.generate(UUID.randomUUID()); + + var selectedWithoutComp = new SelectedFiles(Set.of(file1, file2, file3), false, fateId1, + SteadyTime.from(100, TimeUnit.SECONDS)); + var selectedWithComp = new SelectedFiles(Set.of(file1, file2, file3), false, fateId1, 1, + SteadyTime.from(100, TimeUnit.SECONDS)); + + var time = SteadyTime.from(1000, TimeUnit.SECONDS); + + // should not be able to compact an offline table + var tabletOffline = TabletMetadata.builder(new KeyExtent(tableId2, null, null)) + .putFile(file1, dfv).putFile(file2, dfv).putFile(file3, dfv).putFile(file4, dfv) + .build(OPID, ECOMP, USER_COMPACTION_REQUESTED, SELECTED); + assertFalse(canReserveCompaction(tabletOffline, CompactionKind.SYSTEM, Set.of(file1, file2), + context, time)); + + // nothing should prevent this compaction + var tablet1 = + TabletMetadata.builder(extent1).putFile(file1, dfv).putFile(file2, dfv).putFile(file3, dfv) + .putFile(file4, dfv).build(OPID, ECOMP, USER_COMPACTION_REQUESTED, SELECTED); + assertTrue( + canReserveCompaction(tablet1, CompactionKind.SYSTEM, Set.of(file1, file2), context, time)); + + // should not be able to do a user compaction unless selected files are present + assertFalse( + canReserveCompaction(tablet1, CompactionKind.USER, Set.of(file1, file2), context, time)); + + // should not be able to compact a tablet with user compaction request in place + var tablet3 = + TabletMetadata.builder(extent1).putFile(file1, dfv).putFile(file2, dfv).putFile(file3, dfv) + .putFile(file4, dfv).putUserCompactionRequested(fateId1).build(OPID, ECOMP, SELECTED); + assertFalse( + canReserveCompaction(tablet3, CompactionKind.SYSTEM, Set.of(file1, file2), context, time)); + + // should not be able to compact a tablet when the job has files not present in the tablet + var tablet4 = TabletMetadata.builder(extent1).putFile(file1, dfv).putFile(file2, dfv) + .putFile(file3, dfv).build(OPID, ECOMP, USER_COMPACTION_REQUESTED, SELECTED); + assertFalse(canReserveCompaction(tablet4, CompactionKind.SYSTEM, Set.of(file1, file2, file4), + context, time)); + + // should not be able to compact a tablet with an operation id present + TabletOperationId opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId1); + var tablet5 = TabletMetadata.builder(extent1).putFile(file1, dfv).putFile(file2, dfv) + .putFile(file3, dfv).putFile(file4, dfv).putOperation(opid) + .build(ECOMP, USER_COMPACTION_REQUESTED, SELECTED); + assertFalse( + canReserveCompaction(tablet5, CompactionKind.SYSTEM, Set.of(file1, file2), context, time)); + + // should not be able to compact a tablet if the job files overlaps with running compactions + var tablet6 = TabletMetadata.builder(extent1).putFile(file1, dfv).putFile(file2, dfv) + .putFile(file3, dfv).putFile(file4, dfv).putExternalCompaction(cid1, cm1) + .putExternalCompaction(cid2, cm2).build(OPID, USER_COMPACTION_REQUESTED, SELECTED); + assertFalse( + canReserveCompaction(tablet6, CompactionKind.SYSTEM, Set.of(file1, file2), context, time)); + // should be able to compact the file that is outside of the set of files currently compacting + assertTrue(canReserveCompaction(tablet6, CompactionKind.SYSTEM, Set.of(file4), context, time)); + + // create a tablet with a selected set of files + var selTabletWithComp = TabletMetadata.builder(extent1).putFile(file1, dfv).putFile(file2, dfv) + .putFile(file3, dfv).putFile(file4, dfv).putSelectedFiles(selectedWithComp) + .build(OPID, USER_COMPACTION_REQUESTED, ECOMP); + // 0 completed jobs + var selTabletWithoutComp = TabletMetadata.builder(extent1).putFile(file1, dfv) + .putFile(file2, dfv).putFile(file3, dfv).putFile(file4, dfv) + .putSelectedFiles(selectedWithoutComp).build(OPID, USER_COMPACTION_REQUESTED, ECOMP); + + // Should be able to start if no completed and overlap + assertTrue(canReserveCompaction(selTabletWithoutComp, CompactionKind.SYSTEM, + Set.of(file1, file2), context, time)); + assertTrue(canReserveCompaction(selTabletWithoutComp, CompactionKind.SYSTEM, + Set.of(file3, file4), context, time)); + + // should not be able to start a system compaction if the set of files overlaps with the + // selected files + assertFalse(canReserveCompaction(selTabletWithComp, CompactionKind.SYSTEM, Set.of(file1, file2), + context, time)); + assertFalse(canReserveCompaction(selTabletWithComp, CompactionKind.SYSTEM, Set.of(file3, file4), + context, time)); + // should be able to start a system compaction on the set of files not in the selected set + assertTrue(canReserveCompaction(selTabletWithComp, CompactionKind.SYSTEM, Set.of(file4), + context, time)); + // should be able to start user compactions on files that are selected + assertTrue(canReserveCompaction(selTabletWithComp, CompactionKind.USER, Set.of(file1, file2), + context, time)); + assertTrue(canReserveCompaction(selTabletWithComp, CompactionKind.USER, Set.of(file2, file3), + context, time)); + assertTrue(canReserveCompaction(selTabletWithComp, CompactionKind.USER, + Set.of(file1, file2, file3), context, time)); + // should not be able to start user compactions on files that fall outside of the selected set + assertFalse(canReserveCompaction(selTabletWithComp, CompactionKind.USER, Set.of(file1, file4), + context, time)); + assertFalse( + canReserveCompaction(selTabletWithComp, CompactionKind.USER, Set.of(file4), context, time)); + assertFalse(canReserveCompaction(selTabletWithComp, CompactionKind.USER, + Set.of(file1, file2, file3, file4), context, time)); + + // test selected files and running compaction + var selRunningTablet = TabletMetadata.builder(extent1).putFile(file1, dfv).putFile(file2, dfv) + .putFile(file3, dfv).putFile(file4, dfv).putSelectedFiles(selectedWithComp) + .putExternalCompaction(cid2, cm2).build(OPID, USER_COMPACTION_REQUESTED); + // should be able to compact files that are in the selected set and not in the running set + assertTrue(canReserveCompaction(selRunningTablet, CompactionKind.USER, Set.of(file1, file2), + context, time)); + // should not be able to compact because files overlap the running set + assertFalse(canReserveCompaction(selRunningTablet, CompactionKind.USER, Set.of(file2, file3), + context, time)); + // should not be able to start a system compaction if the set of files overlaps with the + // selected files and/or the running set + assertFalse(canReserveCompaction(selRunningTablet, CompactionKind.SYSTEM, Set.of(file1, file2), + context, time)); + assertFalse(canReserveCompaction(selRunningTablet, CompactionKind.SYSTEM, Set.of(file3, file4), + context, time)); + // should be able to start a system compaction on the set of files not in the selected set + assertTrue(canReserveCompaction(selRunningTablet, CompactionKind.SYSTEM, Set.of(file4), context, + time)); + + // should not be able to compact a tablet that does not exists + assertFalse( + canReserveCompaction(null, CompactionKind.SYSTEM, Set.of(file1, file2), context, time)); + assertFalse( + canReserveCompaction(null, CompactionKind.USER, Set.of(file1, file2), context, time)); + + EasyMock.verify(context); + } +}