This is an automated email from the ASF dual-hosted git repository.

ddanielr pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 3298d6db89 Add Compaction Job Min & Max Wait properties (#4223)
3298d6db89 is described below

commit 3298d6db89ad5cd2cf0e73173088df317a4d1fe2
Author: Daniel Roberts <ddani...@gmail.com>
AuthorDate: Fri Feb 16 17:02:50 2024 -0500

    Add Compaction Job Min & Max Wait properties (#4223)
    
    * Add Compaction Min Wait and Max Wait properties
    
    * Adds min and max wait properties to configure the min and max wait 
intervals
    in the compactor.
    * Changes the logic in compaction-coordinator to use these new properties
     when calculating the wait period for sending warning messages
    
    * Also use the MAX_JOB_WAIT_TIME prop for the thrift retry interval when
    the compactor is unable to communicate with the compaction-coordinator.
---
 .../org/apache/accumulo/core/conf/Property.java    | 10 ++++++++
 .../coordinator/CompactionCoordinator.java         |  4 +--
 .../coordinator/CompactionCoordinatorTest.java     | 21 ++++++++++++++++
 .../org/apache/accumulo/compactor/Compactor.java   | 21 ++++++++++------
 .../apache/accumulo/compactor/CompactorTest.java   | 29 ++++++++++++++++++++++
 5 files changed, 75 insertions(+), 10 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 1fa04490fb..08a93ae6b3 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -1433,6 +1433,16 @@ public enum Property {
   COMPACTOR_PREFIX("compactor.", null, PropertyType.PREFIX,
       "Properties in this category affect the behavior of the accumulo 
compactor server.", "2.1.0"),
   @Experimental
+  COMPACTOR_MIN_JOB_WAIT_TIME("compactor.wait.time.job.min", "1s", 
PropertyType.TIMEDURATION,
+      "The minimum amount of time to wait between checks for the next 
compaction job, backing off"
+          + "exponentially until COMPACTOR_MAX_JOB_WAIT_TIME is reached.",
+      "2.1.3"),
+  @Experimental
+  COMPACTOR_MAX_JOB_WAIT_TIME("compactor.wait.time.job.max", "5m", 
PropertyType.TIMEDURATION,
+      "Compactors do exponential backoff when their request for work 
repeatedly come back empty. "
+          + "This is the maximum amount of time to wait between checks for the 
next compaction job.",
+      "2.1.3"),
+  @Experimental
   COMPACTOR_PORTSEARCH("compactor.port.search", "false", PropertyType.BOOLEAN,
       "If the compactor.port.client is in use, search higher ports until one 
is available.",
       "2.1.0"),
diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index df94ccb824..337f5bc685 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -101,8 +101,6 @@ public class CompactionCoordinator extends AbstractServer
 
   private static final Logger LOG = 
LoggerFactory.getLogger(CompactionCoordinator.class);
   private static final long TIME_BETWEEN_GC_CHECKS = 5000;
-  private static final long FIFTEEN_MINUTES = TimeUnit.MINUTES.toMillis(15);
-
   protected static final QueueSummaries QUEUE_SUMMARIES = new QueueSummaries();
 
   /*
@@ -383,7 +381,7 @@ public class CompactionCoordinator extends AbstractServer
   }
 
   protected long getMissingCompactorWarningTime() {
-    return FIFTEEN_MINUTES;
+    return 
getConfiguration().getTimeInMillis(Property.COMPACTOR_MAX_JOB_WAIT_TIME) * 3;
   }
 
   protected long getTServerCheckInterval() {
diff --git 
a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
 
b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
index 7f46d68e7f..117d50108a 100644
--- 
a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
+++ 
b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
@@ -39,7 +39,10 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
 import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
 import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
 import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
 import org.apache.accumulo.core.metadata.TServerInstance;
 import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
@@ -195,6 +198,24 @@ public class CompactionCoordinatorTest {
 
   }
 
+  @Test
+  public void testCoordinatorWarningTime() {
+    PowerMock.resetAll();
+    PowerMock.suppress(PowerMock.constructor(AbstractServer.class));
+    ServerContext context = PowerMock.createNiceMock(ServerContext.class);
+
+    SiteConfiguration aconf = SiteConfiguration.empty()
+        .withOverrides(Map.of(Property.COMPACTOR_MAX_JOB_WAIT_TIME.getKey(), 
"15s")).build();
+    ConfigurationCopy config = new ConfigurationCopy(aconf);
+    expect(context.getConfiguration()).andReturn(config).anyTimes();
+
+    PowerMock.replay(context);
+
+    var coordinator = new TestCoordinator(null, null, null, null, context, 
null);
+    // Should be equal to 3 * 15_000 milliseconds
+    assertEquals(45_000, coordinator.getMissingCompactorWarningTime());
+  }
+
   @Test
   public void testCoordinatorColdStartNoCompactions() throws Exception {
     PowerMock.resetAll();
diff --git 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 893069e920..1434d2a1f1 100644
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -447,8 +447,13 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
    * @throws RetriesExceededException thrown when retries have been exceeded
    */
   protected TExternalCompactionJob getNextJob(Supplier<UUID> uuid) throws 
RetriesExceededException {
+    final long startingWaitTime =
+        
getConfiguration().getTimeInMillis(Property.COMPACTOR_MIN_JOB_WAIT_TIME);
+    final long maxWaitTime =
+        
getConfiguration().getTimeInMillis(Property.COMPACTOR_MAX_JOB_WAIT_TIME);
+
     RetryableThriftCall<TExternalCompactionJob> nextJobThriftCall =
-        new RetryableThriftCall<>(1000, RetryableThriftCall.MAX_WAIT_TIME, 0, 
() -> {
+        new RetryableThriftCall<>(startingWaitTime, maxWaitTime, 0, () -> {
           Client coordinatorClient = getCoordinatorClient();
           try {
             ExternalCompactionId eci = 
ExternalCompactionId.generate(uuid.get());
@@ -587,12 +592,14 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
   protected long getWaitTimeBetweenCompactionChecks() {
     // get the total number of compactors assigned to this queue
     int numCompactors = ExternalCompactionUtil.countCompactors(queueName, 
getContext());
-    // Aim for around 3 compactors checking in every second
-    long sleepTime = numCompactors * 1000L / 3;
-    // Ensure a compactor sleeps at least around a second
-    sleepTime = Math.max(1000, sleepTime);
-    // Ensure a compactor sleep not too much more than 5 mins
-    sleepTime = Math.min(300_000L, sleepTime);
+    long minWait = 
getConfiguration().getTimeInMillis(Property.COMPACTOR_MIN_JOB_WAIT_TIME);
+    // Aim for around 3 compactors checking in per min wait time.
+    long sleepTime = numCompactors * minWait / 3;
+    // Ensure a compactor waits at least the minimum time
+    sleepTime = Math.max(minWait, sleepTime);
+    // Ensure a sleeping compactor has a configurable max sleep time
+    sleepTime = 
Math.min(getConfiguration().getTimeInMillis(Property.COMPACTOR_MAX_JOB_WAIT_TIME),
+        sleepTime);
     // Add some random jitter to the sleep time, that averages out to sleep 
time. This will spread
     // compactors out evenly over time.
     sleepTime = (long) (.9 * sleepTime + sleepTime * .2 * random.nextDouble());
diff --git 
a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
 
b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
index 174afc8593..2b8fa20b7c 100644
--- 
a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
+++ 
b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.net.UnknownHostException;
+import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.UUID;
@@ -42,6 +43,7 @@ import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
+import org.apache.accumulo.core.fate.zookeeper.ZooCache;
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
 import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats;
@@ -449,4 +451,31 @@ public class CompactorTest {
     assertEquals(TCompactionState.CANCELLED, c.getLatestState());
   }
 
+  @Test
+  public void testCompactionWaitProperty() {
+    PowerMock.resetAll();
+    PowerMock.suppress(PowerMock.methods(Halt.class, "halt"));
+    PowerMock.suppress(PowerMock.constructor(AbstractServer.class));
+
+    var conf = new ConfigurationCopy(DefaultConfiguration.getInstance());
+    conf.set(Property.COMPACTOR_MAX_JOB_WAIT_TIME, "800ms");
+
+    ServerContext context = PowerMock.createNiceMock(ServerContext.class);
+    expect(context.getConfiguration()).andReturn(conf).anyTimes();
+    expect(context.getZooKeeperRoot()).andReturn("test").anyTimes();
+    ZooCache zkc = PowerMock.createNiceMock(ZooCache.class);
+    
expect(zkc.getChildren("test/compactors/testQ")).andReturn(List.of("compactor_1")).anyTimes();
+    expect(context.getZooCache()).andReturn(zkc).anyTimes();
+
+    PowerMock.replayAll();
+
+    SuccessfulCompactor c = new SuccessfulCompactor(null, null, null, context, 
null);
+    PowerMock.verifyAll();
+
+    Long maxWait = c.getWaitTimeBetweenCompactionChecks();
+    // compaction jitter means maxWait is between 0.9 and 1.1 of the desired 
value.
+    assertTrue(maxWait >= 720L);
+    assertTrue(maxWait <= 968L);
+  }
+
 }

Reply via email to