This is an automated email from the ASF dual-hosted git repository. ddanielr pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new 3298d6db89 Add Compaction Job Min & Max Wait properties (#4223) 3298d6db89 is described below commit 3298d6db89ad5cd2cf0e73173088df317a4d1fe2 Author: Daniel Roberts <ddani...@gmail.com> AuthorDate: Fri Feb 16 17:02:50 2024 -0500 Add Compaction Job Min & Max Wait properties (#4223) * Add Compaction Min Wait and Max Wait properties * Adds min and max wait properties to configure the min and max wait intervals in the compactor. * Changes the logic in compaction-coordinator to use these new properties when calculating the wait period for sending warning messages * Also use the MAX_JOB_WAIT_TIME prop for the thrift retry interval when the compactor is unable to communicate with the compaction-coordinator. --- .../org/apache/accumulo/core/conf/Property.java | 10 ++++++++ .../coordinator/CompactionCoordinator.java | 4 +-- .../coordinator/CompactionCoordinatorTest.java | 21 ++++++++++++++++ .../org/apache/accumulo/compactor/Compactor.java | 21 ++++++++++------ .../apache/accumulo/compactor/CompactorTest.java | 29 ++++++++++++++++++++++ 5 files changed, 75 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 1fa04490fb..08a93ae6b3 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -1433,6 +1433,16 @@ public enum Property { COMPACTOR_PREFIX("compactor.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the accumulo compactor server.", "2.1.0"), @Experimental + COMPACTOR_MIN_JOB_WAIT_TIME("compactor.wait.time.job.min", "1s", PropertyType.TIMEDURATION, + "The minimum amount of time to wait between checks for the next compaction job, backing off" + + "exponentially until COMPACTOR_MAX_JOB_WAIT_TIME is reached.", + "2.1.3"), + @Experimental + COMPACTOR_MAX_JOB_WAIT_TIME("compactor.wait.time.job.max", "5m", PropertyType.TIMEDURATION, + "Compactors do exponential backoff when their request for work repeatedly come back empty. " + + "This is the maximum amount of time to wait between checks for the next compaction job.", + "2.1.3"), + @Experimental COMPACTOR_PORTSEARCH("compactor.port.search", "false", PropertyType.BOOLEAN, "If the compactor.port.client is in use, search higher ports until one is available.", "2.1.0"), diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index df94ccb824..337f5bc685 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -101,8 +101,6 @@ public class CompactionCoordinator extends AbstractServer private static final Logger LOG = LoggerFactory.getLogger(CompactionCoordinator.class); private static final long TIME_BETWEEN_GC_CHECKS = 5000; - private static final long FIFTEEN_MINUTES = TimeUnit.MINUTES.toMillis(15); - protected static final QueueSummaries QUEUE_SUMMARIES = new QueueSummaries(); /* @@ -383,7 +381,7 @@ public class CompactionCoordinator extends AbstractServer } protected long getMissingCompactorWarningTime() { - return FIFTEEN_MINUTES; + return getConfiguration().getTimeInMillis(Property.COMPACTOR_MAX_JOB_WAIT_TIME) * 3; } protected long getTServerCheckInterval() { diff --git a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java index 7f46d68e7f..117d50108a 100644 --- a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java +++ b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java @@ -39,7 +39,10 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; +import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.DefaultConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; @@ -195,6 +198,24 @@ public class CompactionCoordinatorTest { } + @Test + public void testCoordinatorWarningTime() { + PowerMock.resetAll(); + PowerMock.suppress(PowerMock.constructor(AbstractServer.class)); + ServerContext context = PowerMock.createNiceMock(ServerContext.class); + + SiteConfiguration aconf = SiteConfiguration.empty() + .withOverrides(Map.of(Property.COMPACTOR_MAX_JOB_WAIT_TIME.getKey(), "15s")).build(); + ConfigurationCopy config = new ConfigurationCopy(aconf); + expect(context.getConfiguration()).andReturn(config).anyTimes(); + + PowerMock.replay(context); + + var coordinator = new TestCoordinator(null, null, null, null, context, null); + // Should be equal to 3 * 15_000 milliseconds + assertEquals(45_000, coordinator.getMissingCompactorWarningTime()); + } + @Test public void testCoordinatorColdStartNoCompactions() throws Exception { PowerMock.resetAll(); diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 893069e920..1434d2a1f1 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -447,8 +447,13 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac * @throws RetriesExceededException thrown when retries have been exceeded */ protected TExternalCompactionJob getNextJob(Supplier<UUID> uuid) throws RetriesExceededException { + final long startingWaitTime = + getConfiguration().getTimeInMillis(Property.COMPACTOR_MIN_JOB_WAIT_TIME); + final long maxWaitTime = + getConfiguration().getTimeInMillis(Property.COMPACTOR_MAX_JOB_WAIT_TIME); + RetryableThriftCall<TExternalCompactionJob> nextJobThriftCall = - new RetryableThriftCall<>(1000, RetryableThriftCall.MAX_WAIT_TIME, 0, () -> { + new RetryableThriftCall<>(startingWaitTime, maxWaitTime, 0, () -> { Client coordinatorClient = getCoordinatorClient(); try { ExternalCompactionId eci = ExternalCompactionId.generate(uuid.get()); @@ -587,12 +592,14 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac protected long getWaitTimeBetweenCompactionChecks() { // get the total number of compactors assigned to this queue int numCompactors = ExternalCompactionUtil.countCompactors(queueName, getContext()); - // Aim for around 3 compactors checking in every second - long sleepTime = numCompactors * 1000L / 3; - // Ensure a compactor sleeps at least around a second - sleepTime = Math.max(1000, sleepTime); - // Ensure a compactor sleep not too much more than 5 mins - sleepTime = Math.min(300_000L, sleepTime); + long minWait = getConfiguration().getTimeInMillis(Property.COMPACTOR_MIN_JOB_WAIT_TIME); + // Aim for around 3 compactors checking in per min wait time. + long sleepTime = numCompactors * minWait / 3; + // Ensure a compactor waits at least the minimum time + sleepTime = Math.max(minWait, sleepTime); + // Ensure a sleeping compactor has a configurable max sleep time + sleepTime = Math.min(getConfiguration().getTimeInMillis(Property.COMPACTOR_MAX_JOB_WAIT_TIME), + sleepTime); // Add some random jitter to the sleep time, that averages out to sleep time. This will spread // compactors out evenly over time. sleepTime = (long) (.9 * sleepTime + sleepTime * .2 * random.nextDouble()); diff --git a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java index 174afc8593..2b8fa20b7c 100644 --- a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java +++ b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.net.UnknownHostException; +import java.util.List; import java.util.Timer; import java.util.TimerTask; import java.util.UUID; @@ -42,6 +43,7 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; +import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats; @@ -449,4 +451,31 @@ public class CompactorTest { assertEquals(TCompactionState.CANCELLED, c.getLatestState()); } + @Test + public void testCompactionWaitProperty() { + PowerMock.resetAll(); + PowerMock.suppress(PowerMock.methods(Halt.class, "halt")); + PowerMock.suppress(PowerMock.constructor(AbstractServer.class)); + + var conf = new ConfigurationCopy(DefaultConfiguration.getInstance()); + conf.set(Property.COMPACTOR_MAX_JOB_WAIT_TIME, "800ms"); + + ServerContext context = PowerMock.createNiceMock(ServerContext.class); + expect(context.getConfiguration()).andReturn(conf).anyTimes(); + expect(context.getZooKeeperRoot()).andReturn("test").anyTimes(); + ZooCache zkc = PowerMock.createNiceMock(ZooCache.class); + expect(zkc.getChildren("test/compactors/testQ")).andReturn(List.of("compactor_1")).anyTimes(); + expect(context.getZooCache()).andReturn(zkc).anyTimes(); + + PowerMock.replayAll(); + + SuccessfulCompactor c = new SuccessfulCompactor(null, null, null, context, null); + PowerMock.verifyAll(); + + Long maxWait = c.getWaitTimeBetweenCompactionChecks(); + // compaction jitter means maxWait is between 0.9 and 1.1 of the desired value. + assertTrue(maxWait >= 720L); + assertTrue(maxWait <= 968L); + } + }