This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e0fa1e775d Refactor ContinuousJfrStarter initialization to register 
as a cluster config change listener (#16987)
5e0fa1e775d is described below

commit 5e0fa1e775dbc42bace3d82d39f77f8d240ff8ab
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Mon Oct 13 14:43:03 2025 +0200

    Refactor ContinuousJfrStarter initialization to register as a cluster 
config change listener (#16987)
---
 .../broker/broker/helix/BaseBrokerStarter.java     |   5 +-
 .../pinot/controller/BaseControllerStarter.java    |   3 +-
 .../core/util/trace/ContinuousJfrStarter.java      | 207 ++++++++++++++-----
 .../core/util/trace/ContinuousJfrStarterTest.java  | 222 +++++++++++++++++++++
 .../org/apache/pinot/minion/BaseMinionStarter.java |   2 +-
 .../server/starter/helix/BaseServerStarter.java    |   2 +-
 .../apache/pinot/spi/env/PinotConfiguration.java   |  10 +-
 7 files changed, 390 insertions(+), 61 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index 5d355f6a7a5..fac8e85beeb 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -211,8 +211,6 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
         Helix.PREFIX_OF_BROKER_INSTANCE, _instanceId);
 
     _brokerConf.setProperty(Broker.CONFIG_OF_BROKER_ID, _instanceId);
-
-    ContinuousJfrStarter.init(_brokerConf);
   }
 
   /// Can be overridden to apply custom configs to the broker conf.
@@ -537,6 +535,9 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
     _isStarting = false;
     _brokerMetrics.addTimedValue(BrokerTimer.STARTUP_SUCCESS_DURATION_MS,
         System.currentTimeMillis() - startTimeMs, TimeUnit.MILLISECONDS);
+
+    
_defaultClusterConfigChangeHandler.registerClusterConfigChangeListener(ContinuousJfrStarter.INSTANCE);
+
     LOGGER.info("Finish starting Pinot broker");
   }
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 06d8917d4eb..f6717d92ea8 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -289,7 +289,6 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
     TableConfigUtils.setDisableGroovy(_config.isDisableIngestionGroovy());
     
TableConfigUtils.setEnforcePoolBasedAssignment(_config.isEnforcePoolBasedAssignmentEnabled());
 
-    ContinuousJfrStarter.init(_config);
     ControllerJobTypes.init(_config);
   }
 
@@ -720,6 +719,8 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
     });
 
     
_serviceStatusCallbackList.add(generateServiceStatusCallback(_helixParticipantManager));
+
+    
_clusterConfigChangeHandler.registerClusterConfigChangeListener(ContinuousJfrStarter.INSTANCE);
   }
 
   protected PinotLLCRealtimeSegmentManager 
createPinotLLCRealtimeSegmentManager() {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/util/trace/ContinuousJfrStarter.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/ContinuousJfrStarter.java
index 646817efa1e..b4158c2a742 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/util/trace/ContinuousJfrStarter.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/ContinuousJfrStarter.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.util.trace;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.File;
 import java.io.IOException;
 import java.io.UncheckedIOException;
@@ -26,22 +27,24 @@ import java.nio.file.Paths;
 import java.text.ParseException;
 import java.time.Duration;
 import java.time.Instant;
-import java.time.ZoneOffset;
-import java.time.ZonedDateTime;
-import java.time.format.DateTimeFormatter;
 import java.time.format.DateTimeParseException;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import javax.annotation.concurrent.GuardedBy;
 import jdk.jfr.Configuration;
 import jdk.jfr.Recording;
+import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-public class ContinuousJfrStarter {
+public class ContinuousJfrStarter implements PinotClusterConfigChangeListener {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ContinuousJfrStarter.class);
   /// Key that controls whether to enable continuous JFR recording.
   public static final String ENABLED = "enabled";
@@ -118,84 +121,186 @@ public class ContinuousJfrStarter {
   /// A flag to track whether the JFR recording has been started.
   /// This is specially useful for testing and quickstarts, where servers, 
brokers and other components are executed
   /// in the same JVM.
-  private static boolean _started = false;
+  public static final ContinuousJfrStarter INSTANCE = new 
ContinuousJfrStarter();
+  @GuardedBy("this")
+  private boolean _running = false;
+  @GuardedBy("this")
+  private Map<String, Object> _currentConfig;
+  @GuardedBy("this")
+  private Recording _recording;
+  @GuardedBy("this")
+  private Thread _cleanupThread;
 
-  private ContinuousJfrStarter() {
+  @VisibleForTesting
+  protected ContinuousJfrStarter() {
   }
 
-  public synchronized static void init(PinotConfiguration config) {
+  @Override
+  public void onChange(Set<String> changedConfigs, Map<String, String> 
clusterConfigs) {
+    boolean jfrChanged = changedConfigs.stream()
+        .anyMatch(changedConfig -> 
changedConfig.startsWith(CommonConstants.JFR));
+    if (!jfrChanged && _currentConfig != null) {
+      LOGGER.debug("ChangedConfigs: {} does not contain any JFR config. 
Skipping updates", changedConfigs);
+      return;
+    }
+    PinotConfiguration config = new PinotConfiguration(clusterConfigs);
     PinotConfiguration subset = config.subset(CommonConstants.JFR);
 
+    synchronized (this) {
+      Map<String, Object> newSubsetMap = subset.toMap();
+
+      if (_currentConfig != null && _currentConfig.equals(newSubsetMap)) {
+        // No change
+        LOGGER.debug("JFR config change detected, but no actual change in 
config");
+        return;
+      }
+
+      stopRecording();
+      _currentConfig = newSubsetMap;
+      startRecording(subset);
+    }
+  }
+
+  public boolean isRunning() {
+    return _running;
+  }
+
+  private void stopRecording() {
+    if (!_running) {
+      return;
+    }
+    assert _recording != null;
+    LOGGER.debug("Stopping recording {}", _recording.getName());
+    _recording.stop();
+    _recording.close();
+
+    if (_cleanupThread != null) {
+      _cleanupThread.interrupt();
+      try {
+        _cleanupThread.join(5_000);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        LOGGER.warn("Interrupted while waiting for cleanup thread to stop");
+      }
+      _cleanupThread = null;
+    }
+
+    LOGGER.info("Stopped continuous JFR recording {}", _recording.getName());
+    _recording = null;
+    _running = false;
+  }
+
+  private void startRecording(PinotConfiguration subset) {
     if (!subset.getProperty(ENABLED, DEFAULT_ENABLED)) {
+      LOGGER.info("Continuous JFR recording is disabled");
       return;
     }
-    if (_started) {
+    if (_running) {
       return;
     }
 
+    _recording = createRecording(subset);
+    _recording.setName(subset.getProperty(NAME, DEFAULT_NAME));
+
+    _recording.setDumpOnExit(subset.getProperty(DUMP_ON_EXIT, 
DEFAULT_DUMP_ON_EXIT));
+
+    prepareFileDumps(subset);
 
-    Recording recording;
+    try {
+      boolean toDisk = subset.getProperty(TO_DISK, DEFAULT_TO_DISK);
+      if (toDisk) {
+        _recording.setToDisk(true);
+        _recording.setMaxSize(subset.getProperty(MAX_SIZE, DEFAULT_MAX_SIZE));
+        _recording.setMaxAge(Duration.parse(subset.getProperty(MAX_AGE, 
DEFAULT_MAX_AGE).toUpperCase(Locale.ENGLISH)));
+      }
+    } catch (DateTimeParseException e) {
+      throw new RuntimeException("Failed to parse duration", e);
+    }
+    _recording.start();
+    LOGGER.info("Started continuous JFR recording {} with configuration: {}", 
_recording.getName(), subset);
+    _running = true;
+  }
+
+  @VisibleForTesting
+  protected static Path getRecordingPath(Path parentDir, String name, Instant 
timestamp) {
+    String filename = "recording-" + name + "-" + timestamp + ".jfr";
+    return parentDir.resolve(filename);
+  }
+
+  private void prepareFileDumps(PinotConfiguration subset) {
+    try {
+      Path directory = Path.of(subset.getProperty(DIRECTORY, 
Paths.get(".").toString()));
+      if (!directory.toFile().canWrite()) {
+        throw new RuntimeException("Cannot write: " + directory);
+      }
+
+      Path recordingPath = getRecordingPath(directory, _recording.getName(), 
Instant.now());
+      _recording.setDestination(recordingPath);
+
+      int maxDumps = subset.getProperty(MAX_DUMPS, DEFAULT_MAX_DUMPS);
+      if (maxDumps > 0) {
+        _cleanupThread = createThread(() -> {
+          while (!Thread.currentThread().isInterrupted()) {
+            cleanUpDumps(directory, maxDumps, _recording.getName());
+            try {
+              Thread.sleep(Duration.ofHours(1).toMillis());
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+              break;
+            }
+          }
+        });
+        _cleanupThread.start();
+      }
+    } catch (IOException e) {
+      throw new UncheckedIOException("Failed to create new recording file", e);
+    }
+  }
+
+  @VisibleForTesting
+  protected Recording createRecording(PinotConfiguration subset) {
     String jfrConfName = subset.getProperty(CONFIGURATION, 
DEFAULT_CONFIGURATION);
     try {
       Configuration configuration = 
Configuration.getConfiguration(jfrConfName);
-      recording = new Recording(configuration);
+      return new Recording(configuration);
     } catch (ParseException e) {
       throw new RuntimeException("Failed to parse JFR configuration '" + 
jfrConfName + "'", e);
     } catch (IOException e) {
       throw new UncheckedIOException("Failed to read JFR configuration '" + 
jfrConfName + "'", e);
     }
-    boolean dumpOnExit = subset.getProperty(DUMP_ON_EXIT, 
DEFAULT_DUMP_ON_EXIT);
-    recording.setDumpOnExit(dumpOnExit);
-    if (dumpOnExit) {
-      try {
-        Path directory = Path.of(subset.getProperty(DIRECTORY, 
Paths.get(".").toString()));
-        if (!directory.toFile().canWrite()) {
-          throw new RuntimeException("Cannot write: " + directory);
-        }
-
-        String timestamp = ZonedDateTime.ofInstant(Instant.now(), 
ZoneOffset.UTC)
-            .format(DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss"));
-        String filename = "recording-" + timestamp + ".jfr";
-        Path recordingPath = directory.resolve(filename);
-        recording.setDestination(recordingPath);
-
-        int maxDumps = subset.getProperty(MAX_DUMPS, DEFAULT_MAX_DUMPS);
-        if (maxDumps > 0) {
-          Thread cleanupThread = new Thread(() -> cleanUpDumps(directory, 
maxDumps));
-          cleanupThread.setName("JFR-Dump-Cleanup");
-          cleanupThread.setDaemon(true);
-          cleanupThread.start();
-        }
-      } catch (IOException e) {
-        throw new UncheckedIOException("Failed to create new recording file", 
e);
-      }
-    }
+  }
 
-    try {
-      recording.setName(subset.getProperty(NAME, DEFAULT_NAME));
-      boolean toDisk = subset.getProperty(TO_DISK, DEFAULT_TO_DISK);
-      if (toDisk) {
-        recording.setToDisk(true);
-        recording.setMaxSize(subset.getProperty(MAX_SIZE, DEFAULT_MAX_SIZE));
-        recording.setMaxAge(Duration.parse(subset.getProperty(MAX_AGE, 
DEFAULT_MAX_AGE).toUpperCase(Locale.ENGLISH)));
-      }
-    } catch (DateTimeParseException e) {
-      throw new RuntimeException("Failed to parse duration", e);
-    }
-    recording.start();
-    _started = true;
+  private Thread createThread(Runnable runnable) {
+    Thread thread = new Thread(runnable);
+    thread.setName("JFR-Dump-Cleanup");
+    thread.setDaemon(true);
+    return thread;
   }
 
-  private static void cleanUpDumps(Path directory, int maxDumps) {
+  @VisibleForTesting
+  protected static void cleanUpDumps(Path directory, int maxDumps, String 
recordingName) {
     if (maxDumps < 0) {
       LOGGER.debug("maxDumps is negative, no cleanup will be performed");
       return;
     }
-    File[] files = directory.toFile().listFiles();
+    LOGGER.info("Cleaning up old JFR dumps in {} to keep at most {} dumps", 
directory, maxDumps);
+    File[] files = directory.toFile()
+        .listFiles((dir, name) -> name.startsWith("recording-" + 
recordingName) && name.endsWith(".jfr"));
     if (files == null) {
       return;
     }
     Arrays.sort(files, Comparator.comparing(File::getName).reversed());
+    if (files.length <= maxDumps) {
+      LOGGER.info("No cleanup needed, found {} dumps", files.length);
+      return;
+    }
+    File[] filesToDelete = Arrays.copyOfRange(files, maxDumps, files.length);
+    if (LOGGER.isInfoEnabled()) {
+      String filesToDeleteName = Arrays.stream(filesToDelete)
+          .map(File::getName)
+          .collect(Collectors.joining(", "));
+      LOGGER.info("Found {} dumps, going to delete the following older dumps 
{}", files.length, filesToDeleteName);
+    }
     for (int i = maxDumps; i < files.length; i++) {
       boolean delete = files[i].delete();
       if (!delete) {
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/util/trace/ContinuousJfrStarterTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/util/trace/ContinuousJfrStarterTest.java
new file mode 100644
index 00000000000..99cd3a3eb1b
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/util/trace/ContinuousJfrStarterTest.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.util.trace;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import jdk.jfr.Recording;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.assertj.core.api.Assertions;
+import org.mockito.Mockito;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class ContinuousJfrStarterTest {
+
+  private Recording _recording;
+  private ContinuousJfrStarter _continuousJfrStarter;
+
+  @BeforeMethod
+  public void setUp() {
+    _recording = Mockito.mock(Recording.class);
+    _continuousJfrStarter = new ContinuousJfrStarter() {
+      @Override
+      protected Recording createRecording(PinotConfiguration subset) {
+        return _recording;
+      }
+    };
+  }
+
+  @Test
+  public void disabledByDefault() {
+    _continuousJfrStarter.onChange(Set.of(), Map.of());
+
+    Assertions.assertThat(_continuousJfrStarter.isRunning())
+        .describedAs("Recording should be disabled")
+        .isFalse();
+    Mockito.verifyNoInteractions(_recording);
+  }
+
+  @Test
+  public void canBeEnabled() {
+    Map<String, String> config = Map.of("pinot.jfr.enabled", "true");
+    _continuousJfrStarter.onChange(Set.of(), config);
+
+    Assertions.assertThat(_continuousJfrStarter.isRunning())
+        .describedAs("Recording should be enabled")
+        .isTrue();
+    Mockito.verify(_recording).start();
+  }
+
+  @Test
+  public void canBeTurnedOff() {
+    // First start it
+    Map<String, String> enabledConfig = Map.of("pinot.jfr.enabled", "true");
+    _continuousJfrStarter.onChange(Set.of(), enabledConfig);
+
+    Assertions.assertThat(_continuousJfrStarter.isRunning())
+        .describedAs("Recording should be enabled")
+        .isTrue();
+    Mockito.verify(_recording).start();
+
+    // Then stop it
+    Set<String> changed = Set.of("pinot.jfr.enabled");
+    Map<String, String> disabledConfig = Map.of("pinot.jfr.enabled", "false");
+    _continuousJfrStarter.onChange(changed, disabledConfig);
+
+    Assertions.assertThat(_continuousJfrStarter.isRunning())
+        .describedAs("Recording should be disabled")
+        .isFalse();
+    Mockito.verify(_recording).stop();
+  }
+
+  @Test
+  public void canBeTurnedOn() {
+    // First setup with it off
+    Map<String, String> disabledConfig = Map.of("pinot.jfr.enabled", "false");
+    _continuousJfrStarter.onChange(Set.of(), disabledConfig);
+    Assertions.assertThat(_continuousJfrStarter.isRunning())
+        .describedAs("Recording should be disabled")
+        .isFalse();
+    Mockito.verifyNoInteractions(_recording);
+
+    // Then start it
+    Set<String> changed = Set.of("pinot.jfr.enabled");
+    Map<String, String> enabledConfig = Map.of("pinot.jfr.enabled", "true");
+    _continuousJfrStarter.onChange(changed, enabledConfig);
+
+    Assertions.assertThat(_continuousJfrStarter.isRunning())
+        .describedAs("Recording should be enabled")
+        .isTrue();
+    Mockito.verify(_recording).start();
+  }
+
+  @Test
+  public void noOpWhenOtherPropChanges() {
+    // First setup with it off
+    Map<String, String> disabledConfig = Map.of("pinot.jfr.enabled", "false");
+    _continuousJfrStarter.onChange(Set.of(), disabledConfig);
+    Assertions.assertThat(_continuousJfrStarter.isRunning())
+        .describedAs("Recording should be disabled")
+        .isFalse();
+    Mockito.verifyNoInteractions(_recording);
+
+    // Then call onChange with no relevant changes
+    Set<String> changed = Set.of("some.other.config");
+    Map<String, String> stillDisabledConfig = Map.of("pinot.jfr.enabled", 
"false", "some.other.config", "true");
+    _continuousJfrStarter.onChange(changed, stillDisabledConfig);
+
+    Assertions.assertThat(_continuousJfrStarter.isRunning())
+        .describedAs("Recording should still be disabled")
+        .isFalse();
+    Mockito.verifyNoInteractions(_recording);
+  }
+
+  @Test
+  public void noOpWhenNewConfigIsEqual() {
+    // First setup with it off
+    Map<String, String> disabledConfig = Map.of("pinot.jfr.enabled", "false");
+    _continuousJfrStarter.onChange(Set.of(), disabledConfig);
+    Assertions.assertThat(_continuousJfrStarter.isRunning())
+        .describedAs("Recording should be disabled")
+        .isFalse();
+    Mockito.verifyNoInteractions(_recording);
+
+    // Then call onChange with the same config
+    Set<String> changed = Set.of("pinot.jfr.enabled");
+    Map<String, String> stillDisabledConfig = Map.of("pinot.jfr.enabled", 
"false");
+    _continuousJfrStarter.onChange(changed, stillDisabledConfig);
+
+    Assertions.assertThat(_continuousJfrStarter.isRunning())
+        .describedAs("Recording should still be disabled")
+        .isFalse();
+    Mockito.verifyNoInteractions(_recording);
+  }
+
+  @Test
+  public void cleanUpThreadDeletesFiles()
+      throws IOException {
+    Path tempDirectory = Files.createTempDirectory("jfr-test-");
+    int maxDumps = 3;
+    long now = ZonedDateTime.of(2025, 10, 13, 12, 0, 0, 0, 
ZoneOffset.UTC).toInstant().toEpochMilli();
+    try {
+      long[] dates = IntStream.range(0, maxDumps * 2)
+          .mapToLong(i -> now - i * 3600_000L)
+          .sorted()
+          .toArray();
+      for (long creationDate : dates) {
+        Path path = ContinuousJfrStarter.getRecordingPath(tempDirectory, 
"test", Instant.ofEpochMilli(creationDate));
+        File file = path.toFile();
+        Assertions.assertThat(file.createNewFile())
+            .describedAs("Should be able to create a file in the temp 
directory")
+            .isTrue();
+        Assertions.assertThat(file.setLastModified(creationDate))
+            .describedAs("Should be able to set the last modified time")
+            .isTrue();
+      }
+
+      // Verify that we have 2 * maxDumps files
+      try (var files = Files.list(tempDirectory)) {
+        Assertions.assertThat(files.count())
+            .describedAs("Should have 2 * maxDumps files in the temp 
directory")
+            .isEqualTo(maxDumps * 2);
+      }
+
+      // Run the cleanup
+      ContinuousJfrStarter.cleanUpDumps(tempDirectory, maxDumps, "test");
+
+      // Verify that we have maxDumps files and only the newest ones are kept
+      try (var files = Files.list(tempDirectory)) {
+        var remainingFiles = files.collect(Collectors.toSet());
+        Assertions.assertThat(remainingFiles)
+            .describedAs("Should have maxDumps files in the temp directory")
+            .hasSize(maxDumps);
+        for (int i = 0; i < maxDumps; i++) {
+          long creationDate = dates[dates.length - 1 - i];
+          Instant timestamp = Instant.ofEpochMilli(creationDate);
+          Path expectedPath = 
ContinuousJfrStarter.getRecordingPath(tempDirectory, "test", timestamp);
+          Assertions.assertThat(remainingFiles)
+              .describedAs("Should contain the expected file: %s", 
expectedPath)
+              .contains(expectedPath);
+        }
+      }
+
+      // Clean all files
+      ContinuousJfrStarter.cleanUpDumps(tempDirectory, 0, "test");
+      try (var files = Files.list(tempDirectory)) {
+        Assertions.assertThat(files.count())
+            .describedAs("Should have no files in the temp directory")
+            .isEqualTo(0);
+      }
+    } finally {
+      FileUtils.deleteDirectory(tempDirectory.toFile());
+    }
+  }
+}
diff --git 
a/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java 
b/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java
index f12b9ca6a6b..fbcbff886c8 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java
@@ -141,7 +141,7 @@ public abstract class BaseMinionStarter implements 
ServiceStartable {
         Executors.newCachedThreadPool(new 
ThreadFactoryBuilder().setNameFormat("async-task-thread-%d").build());
     MinionEventObservers.init(_config, _executorService);
 
-    ContinuousJfrStarter.init(_config);
+    
_clusterConfigChangeHandler.registerClusterConfigChangeListener(ContinuousJfrStarter.INSTANCE);
   }
 
   /// Can be overridden to apply custom configs to the minion conf.
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index ddcfd5c2514..3e813338b70 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -276,7 +276,7 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
     _helixManager =
         HelixManagerFactory.getZKHelixManager(_helixClusterName, _instanceId, 
InstanceType.PARTICIPANT, _zkAddress);
 
-    ContinuousJfrStarter.init(_serverConf);
+    
_clusterConfigChangeHandler.registerClusterConfigChangeListener(ContinuousJfrStarter.INSTANCE);
   }
 
   /// Can be overridden to apply custom configs to the server conf.
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/env/PinotConfiguration.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/env/PinotConfiguration.java
index 2bf0c2bb9ed..334896b0432 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/env/PinotConfiguration.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/env/PinotConfiguration.java
@@ -123,7 +123,7 @@ public class PinotConfiguration {
    *
    * @param baseProperties to provide programmatically through a {@link Map}.
    */
-  public PinotConfiguration(Map<String, Object> baseProperties) {
+  public PinotConfiguration(Map<String, ? extends Object> baseProperties) {
     this(baseProperties, new SystemEnvironment().getEnvironmentVariables());
   }
 
@@ -135,7 +135,7 @@ public class PinotConfiguration {
    * @param baseProperties with highest precedences (e.g. CLI arguments)
    * @param environmentVariables as a {@link Map}.
    */
-  public PinotConfiguration(Map<String, Object> baseProperties, Map<String, 
String> environmentVariables) {
+  public PinotConfiguration(Map<String, ? extends Object> baseProperties, 
Map<String, String> environmentVariables) {
     _configuration = new CompositeConfiguration(
         applyDynamicEnvConfig(computeConfigurationsFromSources(baseProperties, 
environmentVariables),
             environmentVariables));
@@ -190,7 +190,7 @@ public class PinotConfiguration {
     }).collect(Collectors.toList());
   }
 
-  private static List<Configuration> 
computeConfigurationsFromSources(Map<String, Object> baseProperties,
+  private static List<Configuration> 
computeConfigurationsFromSources(Map<String, ? extends Object> baseProperties,
       Map<String, String> environmentVariables) {
     Map<String, Object> relaxedBaseProperties = 
relaxProperties(baseProperties);
     // Env is only used to check for config paths to load.
@@ -262,12 +262,12 @@ public class PinotConfiguration {
     return envVarEntry.getKey().replace("_", ".").toLowerCase();
   }
 
-  private static Map<String, Object> relaxProperties(Map<String, Object> 
properties) {
+  private static Map<String, Object> relaxProperties(Map<String, ? extends 
Object> properties) {
     return properties.entrySet().stream()
         .collect(Collectors.toMap(PinotConfiguration::relaxPropertyName, 
Entry::getValue));
   }
 
-  private static String relaxPropertyName(Entry<String, Object> propertyEntry) 
{
+  private static String relaxPropertyName(Entry<String, ? extends Object> 
propertyEntry) {
     return relaxPropertyName(propertyEntry.getKey());
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to