This is an automated email from the ASF dual-hosted git repository.

sumitagrawal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new f388317a6c8 HDDS-12554. Support callback on completed reconfiguration 
(#8391)
f388317a6c8 is described below

commit f388317a6c848708fe762e833e5201ceecee0b6f
Author: Sarveksha Yeshavantha Raju 
<[email protected]>
AuthorDate: Fri Jun 6 12:08:39 2025 +0530

    HDDS-12554. Support callback on completed reconfiguration (#8391)
---
 .../hadoop/hdds/utils/BackgroundService.java       |  43 +++--
 .../apache/hadoop/ozone/HddsDatanodeService.java   |   2 +
 .../hadoop/hdds/conf/ReconfigurableBase.java       | 209 +++++++++++++++++++++
 .../hdds/conf/ReconfigurationChangeCallback.java   |  29 +++
 .../hadoop/hdds/conf/ReconfigurationHandler.java   |  54 +++++-
 .../hdds/scm/server/StorageContainerManager.java   |   2 +
 .../ozone/reconfig/TestOmReconfiguration.java      |   2 +
 .../hadoop/ozone/shell/TestReconfigShell.java      |  55 +++++-
 .../src/test/resources/ozone-site.xml              |  12 ++
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  13 +-
 .../ozone/om/service/DirectoryDeletingService.java |  23 +++
 11 files changed, 428 insertions(+), 16 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java
index 959bee8d8c5..a5df9a1776e 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java
@@ -41,12 +41,14 @@ public abstract class BackgroundService {
       LoggerFactory.getLogger(BackgroundService.class);
 
   // Executor to launch child tasks
-  private final ScheduledThreadPoolExecutor exec;
-  private final ThreadGroup threadGroup;
+  private ScheduledThreadPoolExecutor exec;
+  private ThreadGroup threadGroup;
   private final String serviceName;
-  private final long interval;
+  private long interval;
   private final long serviceTimeoutInNanos;
-  private final TimeUnit unit;
+  private TimeUnit unit;
+  private final int threadPoolSize;
+  private final String threadNamePrefix;
   private final PeriodicalTask service;
 
   public BackgroundService(String serviceName, long interval,
@@ -62,14 +64,9 @@ public BackgroundService(String serviceName, long interval,
     this.serviceName = serviceName;
     this.serviceTimeoutInNanos = TimeDuration.valueOf(serviceTimeout, unit)
             .toLong(TimeUnit.NANOSECONDS);
-    threadGroup = new ThreadGroup(serviceName);
-    ThreadFactory threadFactory = new ThreadFactoryBuilder()
-        .setThreadFactory(r -> new Thread(threadGroup, r))
-        .setDaemon(true)
-        .setNameFormat(threadNamePrefix + serviceName + "#%d")
-        .build();
-    exec = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(
-        threadPoolSize, threadFactory);
+    this.threadPoolSize = threadPoolSize;
+    this.threadNamePrefix = threadNamePrefix;
+    initExecutorAndThreadGroup();
     service = new PeriodicalTask();
   }
 
@@ -103,10 +100,20 @@ public void runPeriodicalTaskNow() throws Exception {
   }
 
   // start service
-  public void start() {
+  public synchronized void start() {
+    if (exec == null || exec.isShutdown() || exec.isTerminated()) {
+      initExecutorAndThreadGroup();
+    }
+    LOG.info("Starting service {} with interval {} {}", serviceName,
+        interval, unit.name().toLowerCase());
     exec.scheduleWithFixedDelay(service, 0, interval, unit);
   }
 
+  protected synchronized void setInterval(long newInterval, TimeUnit newUnit) {
+    this.interval = newInterval;
+    this.unit = newUnit;
+  }
+
   public abstract BackgroundTaskQueue getTasks();
 
   /**
@@ -172,4 +179,14 @@ public void shutdown() {
       threadGroup.destroy();
     }
   }
+
+  private void initExecutorAndThreadGroup() {
+    threadGroup = new ThreadGroup(serviceName);
+    ThreadFactory threadFactory = new ThreadFactoryBuilder()
+        .setThreadFactory(r -> new Thread(threadGroup, r))
+        .setDaemon(true)
+        .setNameFormat(threadNamePrefix + serviceName + "#%d")
+        .build();
+    exec = (ScheduledThreadPoolExecutor) 
Executors.newScheduledThreadPool(threadPoolSize, threadFactory);
+  }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
index 585cab9d38a..31dab87935e 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
@@ -287,6 +287,8 @@ public String getNamespace() {
               .register(REPLICATION_STREAMS_LIMIT_KEY,
                   this::reconfigReplicationStreamsLimit);
 
+      
reconfigurationHandler.setReconfigurationCompleteCallback(reconfigurationHandler.defaultLoggingCallback());
+
       datanodeStateMachine = new DatanodeStateMachine(this, datanodeDetails, 
conf,
           dnCertClient, secretKeyClient, this::terminateDatanode,
           reconfigurationHandler);
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurableBase.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurableBase.java
new file mode 100644
index 00000000000..3d3f7e29662
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurableBase.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.conf;
+
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.ConfigRedactor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.conf.Reconfigurable;
+import org.apache.hadoop.conf.ReconfigurationException;
+import org.apache.hadoop.conf.ReconfigurationTaskStatus;
+import org.apache.hadoop.conf.ReconfigurationUtil;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base class to support dynamic reconfiguration of configuration properties 
at runtime.
+ */
+public abstract class ReconfigurableBase extends Configured implements 
Reconfigurable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ReconfigurableBase.class);
+  private final ReconfigurationUtil reconfigurationUtil = new 
ReconfigurationUtil();
+  private Thread reconfigThread = null;
+  private volatile boolean shouldRun = true;
+  private final Object reconfigLock = new Object();
+  private long startTime = 0L;
+  private long endTime = 0L;
+  private Map<ReconfigurationUtil.PropertyChange, Optional<String>> status = 
null;
+  private final Collection<Consumer<ReconfigurationTaskStatus>> 
reconfigurationCompleteCallbacks = new ArrayList<>();
+
+  public ReconfigurableBase(Configuration conf) {
+    super(conf == null ? new Configuration() : conf);
+  }
+
+  protected abstract Configuration getNewConf();
+
+  @VisibleForTesting
+  public Collection<ReconfigurationUtil.PropertyChange> 
getChangedProperties(Configuration newConf,
+      Configuration oldConf) {
+    return this.reconfigurationUtil.parseChangedProperties(newConf, oldConf);
+  }
+
+  public void startReconfigurationTask() throws IOException {
+    synchronized (this.reconfigLock) {
+      String errorMessage;
+      if (!this.shouldRun) {
+        errorMessage = "The server is stopped.";
+        LOG.warn(errorMessage);
+        throw new IOException(errorMessage);
+      } else if (this.reconfigThread != null) {
+        errorMessage = "Another reconfiguration task is running.";
+        LOG.warn(errorMessage);
+        throw new IOException(errorMessage);
+      } else {
+        this.reconfigThread = new ReconfigurationThread(this);
+        this.reconfigThread.setDaemon(true);
+        this.reconfigThread.setName("Reconfiguration Task");
+        this.reconfigThread.start();
+        this.startTime = Time.now();
+      }
+    }
+  }
+
+  public ReconfigurationTaskStatus getReconfigurationTaskStatus() {
+    synchronized (this.reconfigLock) {
+      return this.reconfigThread != null ? new 
ReconfigurationTaskStatus(this.startTime, 0L, null) :
+          new ReconfigurationTaskStatus(this.startTime, this.endTime, 
this.status);
+    }
+  }
+
+  public void shutdownReconfigurationTask() {
+    Thread tempThread;
+    synchronized (this.reconfigLock) {
+      this.shouldRun = false;
+      if (this.reconfigThread == null) {
+        return;
+      }
+
+      tempThread = this.reconfigThread;
+      this.reconfigThread = null;
+    }
+
+    try {
+      tempThread.join();
+    } catch (InterruptedException ignored) {
+    }
+
+  }
+
+  @Override
+  public final void reconfigureProperty(String property, String newVal) throws 
ReconfigurationException {
+    if (this.isPropertyReconfigurable(property)) {
+      LOG.info("changing property " + property + " to " + newVal);
+      synchronized (this.getConf()) {
+        this.getConf().get(property);
+        String effectiveValue = this.reconfigurePropertyImpl(property, newVal);
+        if (newVal != null) {
+          this.getConf().set(property, effectiveValue);
+        } else {
+          this.getConf().unset(property);
+        }
+      }
+    } else {
+      throw new ReconfigurationException(property, newVal, 
this.getConf().get(property));
+    }
+  }
+
+  @Override
+  public abstract Collection<String> getReconfigurableProperties();
+
+  @Override
+  public boolean isPropertyReconfigurable(String property) {
+    return this.getReconfigurableProperties().contains(property);
+  }
+
+  protected abstract String reconfigurePropertyImpl(String var1, String var2) 
throws ReconfigurationException;
+
+  private static class ReconfigurationThread extends Thread {
+    private final ReconfigurableBase parent;
+
+    ReconfigurationThread(ReconfigurableBase base) {
+      this.parent = base;
+    }
+
+    @Override
+    public void run() {
+      LOG.info("Starting reconfiguration task.");
+      Configuration oldConf = this.parent.getConf();
+      Configuration newConf = this.parent.getNewConf();
+      Collection<ReconfigurationUtil.PropertyChange> changes = 
this.parent.getChangedProperties(newConf, oldConf);
+      Map<ReconfigurationUtil.PropertyChange, Optional<String>> results = 
Maps.newHashMap();
+      ConfigRedactor oldRedactor = new ConfigRedactor(oldConf);
+      ConfigRedactor newRedactor = new ConfigRedactor(newConf);
+
+      for (ReconfigurationUtil.PropertyChange change : changes) {
+        String errorMessage = null;
+        String oldValRedacted = oldRedactor.redact(change.prop, change.oldVal);
+        String newValRedacted = newRedactor.redact(change.prop, change.newVal);
+        if (!this.parent.isPropertyReconfigurable(change.prop)) {
+          LOG.info(String.format("Property %s is not configurable: old value: 
%s, new value: %s",
+              change.prop, oldValRedacted, newValRedacted));
+        } else {
+          LOG.info("Change property: " + change.prop + " from \"" +
+              (change.oldVal == null ? "<default>" : oldValRedacted) + "\" to 
\"" +
+              (change.newVal == null ? "<default>" : newValRedacted) + "\".");
+
+          try {
+            String effectiveValue = 
this.parent.reconfigurePropertyImpl(change.prop, change.newVal);
+            if (change.newVal != null) {
+              oldConf.set(change.prop, effectiveValue);
+            } else {
+              oldConf.unset(change.prop);
+            }
+          } catch (ReconfigurationException reconfException) {
+            Throwable cause = reconfException.getCause();
+            errorMessage = cause == null ? reconfException.getMessage() : 
cause.getMessage();
+            LOG.error("Failed to reconfigure property {}: {}", change.prop, 
errorMessage, reconfException);
+          }
+
+          results.put(change, Optional.ofNullable(errorMessage));
+        }
+      }
+
+      synchronized (this.parent.reconfigLock) {
+        this.parent.endTime = Time.now();
+        this.parent.status = Collections.unmodifiableMap(results);
+        this.parent.reconfigThread = null;
+
+        for (Consumer<ReconfigurationTaskStatus> callback : 
parent.reconfigurationCompleteCallbacks) {
+          try {
+            callback.accept(parent.getReconfigurationTaskStatus());
+          } catch (Exception e) {
+            LOG.warn("Reconfiguration complete callback threw exception", e);
+          }
+        }
+      }
+    }
+  }
+
+  public void 
addReconfigurationCompleteCallback(Consumer<ReconfigurationTaskStatus> 
callback) {
+    synchronized (reconfigLock) {
+      this.reconfigurationCompleteCallbacks.add(callback);
+    }
+  }
+
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationChangeCallback.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationChangeCallback.java
new file mode 100644
index 00000000000..810df7870d1
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationChangeCallback.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.conf;
+
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Callback interface to handle configuration changes after a reconfiguration 
task completes.
+ */
+@FunctionalInterface
+public interface ReconfigurationChangeCallback {
+  void onPropertiesChanged(Map<String, Boolean> changedKeys, Configuration 
newConf);
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationHandler.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationHandler.java
index a594bfa2760..979525f7a1a 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationHandler.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationHandler.java
@@ -22,18 +22,22 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiConsumer;
 import java.util.function.UnaryOperator;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.ReconfigurableBase;
 import org.apache.hadoop.conf.ReconfigurationException;
 import org.apache.hadoop.conf.ReconfigurationTaskStatus;
+import org.apache.hadoop.conf.ReconfigurationUtil;
 import org.apache.hadoop.hdds.protocol.ReconfigureProtocol;
 import org.apache.ratis.util.function.CheckedConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Keeps track of reconfigurable properties and the corresponding functions
@@ -42,16 +46,64 @@
 public class ReconfigurationHandler extends ReconfigurableBase
     implements ReconfigureProtocol {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(ReconfigurationHandler.class);
   private final String name;
   private final CheckedConsumer<String, IOException> requireAdminPrivilege;
   private final Map<String, UnaryOperator<String>> properties =
       new ConcurrentHashMap<>();
 
+  private final List<ReconfigurationChangeCallback> completeCallbacks = new 
ArrayList<>();
+  private BiConsumer<ReconfigurationTaskStatus, Configuration> 
reconfigurationStatusListener;
+
+  public void registerCompleteCallback(ReconfigurationChangeCallback callback) 
{
+    completeCallbacks.add(callback);
+  }
+
+  public void 
setReconfigurationCompleteCallback(BiConsumer<ReconfigurationTaskStatus, 
Configuration>
+      statusListener) {
+    this.reconfigurationStatusListener = statusListener;
+  }
+
+  public BiConsumer<ReconfigurationTaskStatus, Configuration> 
defaultLoggingCallback() {
+    return (status, conf) -> {
+      if (status.getStatus() != null && !status.getStatus().isEmpty()) {
+        LOG.info("Reconfiguration completed with {} updated properties.",
+            status.getStatus().size());
+      } else {
+        LOG.info("Reconfiguration complete. No properties were changed.");
+      }
+    };
+  }
+
+  private void triggerCompleteCallbacks(ReconfigurationTaskStatus status, 
Configuration newConf) {
+    if (status.getStatus() != null && !status.getStatus().isEmpty()) {
+      Map<String, Boolean> changedKeys = new HashMap<>();
+      for (ReconfigurationUtil.PropertyChange change : 
status.getStatus().keySet()) {
+        boolean deleted = change.newVal == null;
+        changedKeys.put(change.prop, !deleted);
+      }
+      for (ReconfigurationChangeCallback callback : completeCallbacks) {
+        callback.onPropertiesChanged(changedKeys, newConf);
+      }
+    }
+
+    if (reconfigurationStatusListener != null) {
+      reconfigurationStatusListener.accept(status, newConf);
+    }
+  }
+
   public ReconfigurationHandler(String name, OzoneConfiguration config,
       CheckedConsumer<String, IOException> requireAdminPrivilege) {
     super(config);
     this.name = name;
     this.requireAdminPrivilege = requireAdminPrivilege;
+
+    // Register callback on reconfiguration complete
+    addReconfigurationCompleteCallback(status -> {
+      Configuration newConf = getNewConf();
+      triggerCompleteCallbacks(status, newConf);
+    });
+
   }
 
   public ReconfigurationHandler register(
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 7773a91fec9..36b29c6a079 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -403,6 +403,8 @@ private StorageContainerManager(OzoneConfiguration conf,
             .register(OZONE_READONLY_ADMINISTRATORS,
                 this::reconfOzoneReadOnlyAdmins);
 
+    
reconfigurationHandler.setReconfigurationCompleteCallback(reconfigurationHandler.defaultLoggingCallback());
+
     initializeSystemManagers(conf, configurator);
 
     if (isSecretKeyEnable(securityConfig)) {
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/reconfig/TestOmReconfiguration.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/reconfig/TestOmReconfiguration.java
index 05e7e2f0f3e..55172f78f00 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/reconfig/TestOmReconfiguration.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/reconfig/TestOmReconfiguration.java
@@ -19,6 +19,7 @@
 
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_READONLY_ADMINISTRATORS;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_VOLUME_LISTALL_ALLOWED;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_VOLUME_LISTALL_ALLOWED_DEFAULT;
@@ -51,6 +52,7 @@ void reconfigurableProperties() {
         .add(OZONE_KEY_DELETING_LIMIT_PER_TASK)
         .add(OZONE_OM_VOLUME_LISTALL_ALLOWED)
         .add(OZONE_READONLY_ADMINISTRATORS)
+        .add(OZONE_DIR_DELETING_SERVICE_INTERVAL)
         .addAll(new OmConfig().reconfigurableProperties())
         .build();
 
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestReconfigShell.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestReconfigShell.java
index b43536f5bd7..289044f89db 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestReconfigShell.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestReconfigShell.java
@@ -18,13 +18,18 @@
 package org.apache.hadoop.ozone.shell;
 
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
-import org.apache.hadoop.conf.ReconfigurableBase;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.ReconfigurationException;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.ReconfigurableBase;
+import org.apache.hadoop.hdds.conf.ReconfigurationHandler;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -33,7 +38,9 @@
 import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.admin.OzoneAdmin;
 import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.service.DirectoryDeletingService;
 import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ozone.test.GenericTestUtils.LogCapturer;
 import org.apache.ozone.test.NonHATests;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -47,6 +54,8 @@
 public abstract class TestReconfigShell implements NonHATests.TestCase {
 
   private OzoneAdmin ozoneAdmin;
+  private OzoneConfiguration conf;
+  private ReconfigurationHandler reconfigurationHandler;
   private GenericTestUtils.PrintStreamCapturer out;
   private GenericTestUtils.PrintStreamCapturer err;
 
@@ -55,6 +64,8 @@ void capture() {
     out = GenericTestUtils.captureOut();
     err = GenericTestUtils.captureErr();
     ozoneAdmin = new OzoneAdmin();
+    conf = new OzoneConfiguration();
+    reconfigurationHandler = 
cluster().getOzoneManager().getReconfigurationHandler();
   }
 
   @AfterEach
@@ -77,6 +88,35 @@ void testOzoneManagerGetReconfigurationProperties() {
     executeAndAssertProperties(om.getReconfigurationHandler(), "OM", socket);
   }
 
+  @Test
+  void testDirectoryDeletingServiceIntervalReconfiguration() throws 
ReconfigurationException {
+    OzoneManager om = cluster().getOzoneManager();
+    InetSocketAddress socket = om.getOmRpcServerAddr();
+    LogCapturer logCapturer = 
LogCapturer.captureLogs(DirectoryDeletingService.class);
+
+    String initialInterval = "1m";
+    String intervalFromXML = "2m"; //config value set in ozone-site.xml
+    long intervalFromXMLInSeconds = TimeUnit.MINUTES.toSeconds(2); //120 
seconds
+
+    
reconfigurationHandler.reconfigurePropertyImpl(OZONE_DIR_DELETING_SERVICE_INTERVAL,
 initialInterval);
+    
assertThat(reconfigurationHandler.getConf().get(OZONE_DIR_DELETING_SERVICE_INTERVAL)).isEqualTo(initialInterval);
+
+    //Start the reconfiguration task
+    executeAndAssertStart("OM", socket);
+    //If config value is set in ozone-site.xml then it is picked up during 
reconfiguration
+    
assertThat(conf.get(OZONE_DIR_DELETING_SERVICE_INTERVAL)).isEqualTo(intervalFromXML);
+
+    executeAndAssertStatus("OM", socket);
+    
assertThat(reconfigurationHandler.getConf().get(OZONE_DIR_DELETING_SERVICE_INTERVAL)).isEqualTo(intervalFromXML);
+    assertThat(out.get()).contains(
+        String.format("SUCCESS: Changed property %s", 
OZONE_DIR_DELETING_SERVICE_INTERVAL)
+    );
+    assertThat(logCapturer.getOutput()).contains(
+        String.format("Updating and restarting DirectoryDeletingService with 
interval: %d %s",
+            intervalFromXMLInSeconds, TimeUnit.SECONDS.name().toLowerCase())
+    );
+  }
+
   @Test
   void testStorageContainerManagerGetReconfigurationProperties() {
     StorageContainerManager scm = cluster().getStorageContainerManager();
@@ -130,4 +170,17 @@ private void executeForInServiceDatanodes(int 
expectedCount) {
   private String getAddress(InetSocketAddress socket) {
     return socket.getHostString() + ":" + socket.getPort();
   }
+
+  private void executeAndAssertStart(String service, InetSocketAddress socket) 
{
+    String address = socket.getHostString() + ":" + socket.getPort();
+    ozoneAdmin.getCmd().execute("reconfig", "--service", service, "--address", 
address, "start");
+    assertThat(out.get()).contains(service + ": Started reconfiguration task 
on node [" + address + "]");
+  }
+
+  private void executeAndAssertStatus(String service, InetSocketAddress 
socket) {
+    String address = socket.getHostString() + ":" + socket.getPort();
+    ozoneAdmin.getCmd().execute("reconfig", "--service", service, "--address", 
address, "status");
+    assertThat(out.get()).contains(service + ": Reconfiguring status for node 
[" + address + "]: started");
+  }
+
 }
diff --git a/hadoop-ozone/integration-test/src/test/resources/ozone-site.xml 
b/hadoop-ozone/integration-test/src/test/resources/ozone-site.xml
index 5ea2eb89dfa..2b07b1d060d 100644
--- a/hadoop-ozone/integration-test/src/test/resources/ozone-site.xml
+++ b/hadoop-ozone/integration-test/src/test/resources/ozone-site.xml
@@ -127,5 +127,17 @@
     <name>ozone.client.datastream.window.size</name>
     <value>8MB</value>
   </property>
+  <property>
+    <name>ozone.readonly.administrators</name>
+    <value>admin</value>
+  </property>
+  <property>
+    <name>ozone.administrators</name>
+    <value>admin</value>
+  </property>
+  <property>
+  <name>ozone.directory.deleting.service.interval</name>
+  <value>2m</value>
+  </property>
 
 </configuration>
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index ad1741df0a5..ec2b9964f53 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -52,6 +52,7 @@
 import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT_DEFAULT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_EDEKCACHELOADER_INITIAL_DELAY_MS_DEFAULT;
@@ -280,6 +281,7 @@
 import org.apache.hadoop.ozone.om.s3.S3SecretCacheProvider;
 import org.apache.hadoop.ozone.om.s3.S3SecretStoreProvider;
 import org.apache.hadoop.ozone.om.service.CompactDBService;
+import org.apache.hadoop.ozone.om.service.DirectoryDeletingService;
 import org.apache.hadoop.ozone.om.service.OMRangerBGSyncService;
 import org.apache.hadoop.ozone.om.service.QuotaRepairTask;
 import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils;
@@ -494,6 +496,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   // instance creation every single time.
   private UncheckedAutoCloseableSupplier<IOmMetadataReader> rcOmMetadataReader;
   private OmSnapshotManager omSnapshotManager;
+  private volatile DirectoryDeletingService dirDeletingService;
 
   @SuppressWarnings("methodlength")
   private OzoneManager(OzoneConfiguration conf, StartupOption startupOption)
@@ -519,7 +522,10 @@ private OzoneManager(OzoneConfiguration conf, 
StartupOption startupOption)
                 this::reconfOzoneReadOnlyAdmins)
             .register(OZONE_OM_VOLUME_LISTALL_ALLOWED, 
this::reconfigureAllowListAllVolumes)
             .register(OZONE_KEY_DELETING_LIMIT_PER_TASK,
-                this::reconfOzoneKeyDeletingLimitPerTask);
+                this::reconfOzoneKeyDeletingLimitPerTask)
+            .register(OZONE_DIR_DELETING_SERVICE_INTERVAL, 
this::reconfOzoneDirDeletingServiceInterval);
+
+    
reconfigurationHandler.setReconfigurationCompleteCallback(reconfigurationHandler.defaultLoggingCallback());
 
     versionManager = new OMLayoutVersionManager(omStorage.getLayoutVersion());
     upgradeFinalizer = new OMUpgradeFinalizer(versionManager);
@@ -5148,6 +5154,11 @@ private String reconfigureAllowListAllVolumes(String 
newVal) {
     return String.valueOf(allowListAllVolumes);
   }
 
+  private String reconfOzoneDirDeletingServiceInterval(String newVal) {
+    getConfiguration().set(OZONE_DIR_DELETING_SERVICE_INTERVAL, newVal);
+    return newVal;
+  }
+
   public void validateReplicationConfig(ReplicationConfig replicationConfig)
       throws OMException {
     try {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
index 7451032492e..ad90490101c 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
@@ -17,6 +17,9 @@
 
 package org.apache.hadoop.ozone.om.service;
 
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT;
+
 import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -29,6 +32,7 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.ReconfigurationHandler;
 import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.utils.BackgroundTask;
 import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
@@ -100,9 +104,28 @@ public DirectoryDeletingService(long interval, TimeUnit 
unit,
     this.isRunningOnAOS = new AtomicBoolean(false);
     this.dirDeletingCorePoolSize = dirDeletingServiceCorePoolSize;
     deletedDirSupplier = new DeletedDirSupplier();
+    registerReconfigCallbacks(ozoneManager.getReconfigurationHandler(), 
configuration);
     taskCount.set(0);
   }
 
+  public void registerReconfigCallbacks(ReconfigurationHandler handler, 
OzoneConfiguration conf) {
+    handler.registerCompleteCallback((changedKeys, newConf) -> {
+      if (changedKeys.containsKey(OZONE_DIR_DELETING_SERVICE_INTERVAL)) {
+        updateAndRestart(conf);
+      }
+    });
+  }
+
+  private synchronized void updateAndRestart(OzoneConfiguration conf) {
+    long newInterval = 
conf.getTimeDuration(OZONE_DIR_DELETING_SERVICE_INTERVAL,
+        OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT, TimeUnit.SECONDS);
+    LOG.info("Updating and restarting DirectoryDeletingService with interval: 
{} {}",
+        newInterval, TimeUnit.SECONDS.name().toLowerCase());
+    shutdown();
+    setInterval(newInterval, TimeUnit.SECONDS);
+    start();
+  }
+
   private boolean shouldRun() {
     if (getOzoneManager() == null) {
       // OzoneManager can be null for testing


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to