This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 5e0fa1e775d Refactor ContinuousJfrStarter initialization to register
as a cluster config change listener (#16987)
5e0fa1e775d is described below
commit 5e0fa1e775dbc42bace3d82d39f77f8d240ff8ab
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Mon Oct 13 14:43:03 2025 +0200
Refactor ContinuousJfrStarter initialization to register as a cluster
config change listener (#16987)
---
.../broker/broker/helix/BaseBrokerStarter.java | 5 +-
.../pinot/controller/BaseControllerStarter.java | 3 +-
.../core/util/trace/ContinuousJfrStarter.java | 207 ++++++++++++++-----
.../core/util/trace/ContinuousJfrStarterTest.java | 222 +++++++++++++++++++++
.../org/apache/pinot/minion/BaseMinionStarter.java | 2 +-
.../server/starter/helix/BaseServerStarter.java | 2 +-
.../apache/pinot/spi/env/PinotConfiguration.java | 10 +-
7 files changed, 390 insertions(+), 61 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index 5d355f6a7a5..fac8e85beeb 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -211,8 +211,6 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
Helix.PREFIX_OF_BROKER_INSTANCE, _instanceId);
_brokerConf.setProperty(Broker.CONFIG_OF_BROKER_ID, _instanceId);
-
- ContinuousJfrStarter.init(_brokerConf);
}
/// Can be overridden to apply custom configs to the broker conf.
@@ -537,6 +535,9 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
_isStarting = false;
_brokerMetrics.addTimedValue(BrokerTimer.STARTUP_SUCCESS_DURATION_MS,
System.currentTimeMillis() - startTimeMs, TimeUnit.MILLISECONDS);
+
+
_defaultClusterConfigChangeHandler.registerClusterConfigChangeListener(ContinuousJfrStarter.INSTANCE);
+
LOGGER.info("Finish starting Pinot broker");
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 06d8917d4eb..f6717d92ea8 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -289,7 +289,6 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
TableConfigUtils.setDisableGroovy(_config.isDisableIngestionGroovy());
TableConfigUtils.setEnforcePoolBasedAssignment(_config.isEnforcePoolBasedAssignmentEnabled());
- ContinuousJfrStarter.init(_config);
ControllerJobTypes.init(_config);
}
@@ -720,6 +719,8 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
});
_serviceStatusCallbackList.add(generateServiceStatusCallback(_helixParticipantManager));
+
+
_clusterConfigChangeHandler.registerClusterConfigChangeListener(ContinuousJfrStarter.INSTANCE);
}
protected PinotLLCRealtimeSegmentManager
createPinotLLCRealtimeSegmentManager() {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/util/trace/ContinuousJfrStarter.java
b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/ContinuousJfrStarter.java
index 646817efa1e..b4158c2a742 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/util/trace/ContinuousJfrStarter.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/ContinuousJfrStarter.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.util.trace;
+import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
@@ -26,22 +27,24 @@ import java.nio.file.Paths;
import java.text.ParseException;
import java.time.Duration;
import java.time.Instant;
-import java.time.ZoneOffset;
-import java.time.ZonedDateTime;
-import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import javax.annotation.concurrent.GuardedBy;
import jdk.jfr.Configuration;
import jdk.jfr.Recording;
+import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ContinuousJfrStarter {
+public class ContinuousJfrStarter implements PinotClusterConfigChangeListener {
private static final Logger LOGGER =
LoggerFactory.getLogger(ContinuousJfrStarter.class);
/// Key that controls whether to enable continuous JFR recording.
public static final String ENABLED = "enabled";
@@ -118,84 +121,186 @@ public class ContinuousJfrStarter {
/// A flag to track whether the JFR recording has been started.
/// This is specially useful for testing and quickstarts, where servers,
brokers and other components are executed
/// in the same JVM.
- private static boolean _started = false;
+ public static final ContinuousJfrStarter INSTANCE = new
ContinuousJfrStarter();
+ @GuardedBy("this")
+ private boolean _running = false;
+ @GuardedBy("this")
+ private Map<String, Object> _currentConfig;
+ @GuardedBy("this")
+ private Recording _recording;
+ @GuardedBy("this")
+ private Thread _cleanupThread;
- private ContinuousJfrStarter() {
+ @VisibleForTesting
+ protected ContinuousJfrStarter() {
}
- public synchronized static void init(PinotConfiguration config) {
+ @Override
+ public void onChange(Set<String> changedConfigs, Map<String, String>
clusterConfigs) {
+ boolean jfrChanged = changedConfigs.stream()
+ .anyMatch(changedConfig ->
changedConfig.startsWith(CommonConstants.JFR));
+ if (!jfrChanged && _currentConfig != null) {
+ LOGGER.debug("ChangedConfigs: {} does not contain any JFR config.
Skipping updates", changedConfigs);
+ return;
+ }
+ PinotConfiguration config = new PinotConfiguration(clusterConfigs);
PinotConfiguration subset = config.subset(CommonConstants.JFR);
+ synchronized (this) {
+ Map<String, Object> newSubsetMap = subset.toMap();
+
+ if (_currentConfig != null && _currentConfig.equals(newSubsetMap)) {
+ // No change
+ LOGGER.debug("JFR config change detected, but no actual change in
config");
+ return;
+ }
+
+ stopRecording();
+ _currentConfig = newSubsetMap;
+ startRecording(subset);
+ }
+ }
+
+ public boolean isRunning() {
+ return _running;
+ }
+
+ private void stopRecording() {
+ if (!_running) {
+ return;
+ }
+ assert _recording != null;
+ LOGGER.debug("Stopping recording {}", _recording.getName());
+ _recording.stop();
+ _recording.close();
+
+ if (_cleanupThread != null) {
+ _cleanupThread.interrupt();
+ try {
+ _cleanupThread.join(5_000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.warn("Interrupted while waiting for cleanup thread to stop");
+ }
+ _cleanupThread = null;
+ }
+
+ LOGGER.info("Stopped continuous JFR recording {}", _recording.getName());
+ _recording = null;
+ _running = false;
+ }
+
+ private void startRecording(PinotConfiguration subset) {
if (!subset.getProperty(ENABLED, DEFAULT_ENABLED)) {
+ LOGGER.info("Continuous JFR recording is disabled");
return;
}
- if (_started) {
+ if (_running) {
return;
}
+ _recording = createRecording(subset);
+ _recording.setName(subset.getProperty(NAME, DEFAULT_NAME));
+
+ _recording.setDumpOnExit(subset.getProperty(DUMP_ON_EXIT,
DEFAULT_DUMP_ON_EXIT));
+
+ prepareFileDumps(subset);
- Recording recording;
+ try {
+ boolean toDisk = subset.getProperty(TO_DISK, DEFAULT_TO_DISK);
+ if (toDisk) {
+ _recording.setToDisk(true);
+ _recording.setMaxSize(subset.getProperty(MAX_SIZE, DEFAULT_MAX_SIZE));
+ _recording.setMaxAge(Duration.parse(subset.getProperty(MAX_AGE,
DEFAULT_MAX_AGE).toUpperCase(Locale.ENGLISH)));
+ }
+ } catch (DateTimeParseException e) {
+ throw new RuntimeException("Failed to parse duration", e);
+ }
+ _recording.start();
+ LOGGER.info("Started continuous JFR recording {} with configuration: {}",
_recording.getName(), subset);
+ _running = true;
+ }
+
+ @VisibleForTesting
+ protected static Path getRecordingPath(Path parentDir, String name, Instant
timestamp) {
+ String filename = "recording-" + name + "-" + timestamp + ".jfr";
+ return parentDir.resolve(filename);
+ }
+
+ private void prepareFileDumps(PinotConfiguration subset) {
+ try {
+ Path directory = Path.of(subset.getProperty(DIRECTORY,
Paths.get(".").toString()));
+ if (!directory.toFile().canWrite()) {
+ throw new RuntimeException("Cannot write: " + directory);
+ }
+
+ Path recordingPath = getRecordingPath(directory, _recording.getName(),
Instant.now());
+ _recording.setDestination(recordingPath);
+
+ int maxDumps = subset.getProperty(MAX_DUMPS, DEFAULT_MAX_DUMPS);
+ if (maxDumps > 0) {
+ _cleanupThread = createThread(() -> {
+ while (!Thread.currentThread().isInterrupted()) {
+ cleanUpDumps(directory, maxDumps, _recording.getName());
+ try {
+ Thread.sleep(Duration.ofHours(1).toMillis());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ });
+ _cleanupThread.start();
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to create new recording file", e);
+ }
+ }
+
+ @VisibleForTesting
+ protected Recording createRecording(PinotConfiguration subset) {
String jfrConfName = subset.getProperty(CONFIGURATION,
DEFAULT_CONFIGURATION);
try {
Configuration configuration =
Configuration.getConfiguration(jfrConfName);
- recording = new Recording(configuration);
+ return new Recording(configuration);
} catch (ParseException e) {
throw new RuntimeException("Failed to parse JFR configuration '" +
jfrConfName + "'", e);
} catch (IOException e) {
throw new UncheckedIOException("Failed to read JFR configuration '" +
jfrConfName + "'", e);
}
- boolean dumpOnExit = subset.getProperty(DUMP_ON_EXIT,
DEFAULT_DUMP_ON_EXIT);
- recording.setDumpOnExit(dumpOnExit);
- if (dumpOnExit) {
- try {
- Path directory = Path.of(subset.getProperty(DIRECTORY,
Paths.get(".").toString()));
- if (!directory.toFile().canWrite()) {
- throw new RuntimeException("Cannot write: " + directory);
- }
-
- String timestamp = ZonedDateTime.ofInstant(Instant.now(),
ZoneOffset.UTC)
- .format(DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss"));
- String filename = "recording-" + timestamp + ".jfr";
- Path recordingPath = directory.resolve(filename);
- recording.setDestination(recordingPath);
-
- int maxDumps = subset.getProperty(MAX_DUMPS, DEFAULT_MAX_DUMPS);
- if (maxDumps > 0) {
- Thread cleanupThread = new Thread(() -> cleanUpDumps(directory,
maxDumps));
- cleanupThread.setName("JFR-Dump-Cleanup");
- cleanupThread.setDaemon(true);
- cleanupThread.start();
- }
- } catch (IOException e) {
- throw new UncheckedIOException("Failed to create new recording file",
e);
- }
- }
+ }
- try {
- recording.setName(subset.getProperty(NAME, DEFAULT_NAME));
- boolean toDisk = subset.getProperty(TO_DISK, DEFAULT_TO_DISK);
- if (toDisk) {
- recording.setToDisk(true);
- recording.setMaxSize(subset.getProperty(MAX_SIZE, DEFAULT_MAX_SIZE));
- recording.setMaxAge(Duration.parse(subset.getProperty(MAX_AGE,
DEFAULT_MAX_AGE).toUpperCase(Locale.ENGLISH)));
- }
- } catch (DateTimeParseException e) {
- throw new RuntimeException("Failed to parse duration", e);
- }
- recording.start();
- _started = true;
+ private Thread createThread(Runnable runnable) {
+ Thread thread = new Thread(runnable);
+ thread.setName("JFR-Dump-Cleanup");
+ thread.setDaemon(true);
+ return thread;
}
- private static void cleanUpDumps(Path directory, int maxDumps) {
+ @VisibleForTesting
+ protected static void cleanUpDumps(Path directory, int maxDumps, String
recordingName) {
if (maxDumps < 0) {
LOGGER.debug("maxDumps is negative, no cleanup will be performed");
return;
}
- File[] files = directory.toFile().listFiles();
+ LOGGER.info("Cleaning up old JFR dumps in {} to keep at most {} dumps",
directory, maxDumps);
+ File[] files = directory.toFile()
+ .listFiles((dir, name) -> name.startsWith("recording-" +
recordingName) && name.endsWith(".jfr"));
if (files == null) {
return;
}
Arrays.sort(files, Comparator.comparing(File::getName).reversed());
+ if (files.length <= maxDumps) {
+ LOGGER.info("No cleanup needed, found {} dumps", files.length);
+ return;
+ }
+ File[] filesToDelete = Arrays.copyOfRange(files, maxDumps, files.length);
+ if (LOGGER.isInfoEnabled()) {
+ String filesToDeleteName = Arrays.stream(filesToDelete)
+ .map(File::getName)
+ .collect(Collectors.joining(", "));
+ LOGGER.info("Found {} dumps, going to delete the following older dumps
{}", files.length, filesToDeleteName);
+ }
for (int i = maxDumps; i < files.length; i++) {
boolean delete = files[i].delete();
if (!delete) {
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/util/trace/ContinuousJfrStarterTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/util/trace/ContinuousJfrStarterTest.java
new file mode 100644
index 00000000000..99cd3a3eb1b
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/util/trace/ContinuousJfrStarterTest.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.util.trace;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import jdk.jfr.Recording;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.assertj.core.api.Assertions;
+import org.mockito.Mockito;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class ContinuousJfrStarterTest {
+
+ private Recording _recording;
+ private ContinuousJfrStarter _continuousJfrStarter;
+
+ @BeforeMethod
+ public void setUp() {
+ _recording = Mockito.mock(Recording.class);
+ _continuousJfrStarter = new ContinuousJfrStarter() {
+ @Override
+ protected Recording createRecording(PinotConfiguration subset) {
+ return _recording;
+ }
+ };
+ }
+
+ @Test
+ public void disabledByDefault() {
+ _continuousJfrStarter.onChange(Set.of(), Map.of());
+
+ Assertions.assertThat(_continuousJfrStarter.isRunning())
+ .describedAs("Recording should be disabled")
+ .isFalse();
+ Mockito.verifyNoInteractions(_recording);
+ }
+
+ @Test
+ public void canBeEnabled() {
+ Map<String, String> config = Map.of("pinot.jfr.enabled", "true");
+ _continuousJfrStarter.onChange(Set.of(), config);
+
+ Assertions.assertThat(_continuousJfrStarter.isRunning())
+ .describedAs("Recording should be enabled")
+ .isTrue();
+ Mockito.verify(_recording).start();
+ }
+
+ @Test
+ public void canBeTurnedOff() {
+ // First start it
+ Map<String, String> enabledConfig = Map.of("pinot.jfr.enabled", "true");
+ _continuousJfrStarter.onChange(Set.of(), enabledConfig);
+
+ Assertions.assertThat(_continuousJfrStarter.isRunning())
+ .describedAs("Recording should be enabled")
+ .isTrue();
+ Mockito.verify(_recording).start();
+
+ // Then stop it
+ Set<String> changed = Set.of("pinot.jfr.enabled");
+ Map<String, String> disabledConfig = Map.of("pinot.jfr.enabled", "false");
+ _continuousJfrStarter.onChange(changed, disabledConfig);
+
+ Assertions.assertThat(_continuousJfrStarter.isRunning())
+ .describedAs("Recording should be disabled")
+ .isFalse();
+ Mockito.verify(_recording).stop();
+ }
+
+ @Test
+ public void canBeTurnedOn() {
+ // First setup with it off
+ Map<String, String> disabledConfig = Map.of("pinot.jfr.enabled", "false");
+ _continuousJfrStarter.onChange(Set.of(), disabledConfig);
+ Assertions.assertThat(_continuousJfrStarter.isRunning())
+ .describedAs("Recording should be disabled")
+ .isFalse();
+ Mockito.verifyNoInteractions(_recording);
+
+ // Then start it
+ Set<String> changed = Set.of("pinot.jfr.enabled");
+ Map<String, String> enabledConfig = Map.of("pinot.jfr.enabled", "true");
+ _continuousJfrStarter.onChange(changed, enabledConfig);
+
+ Assertions.assertThat(_continuousJfrStarter.isRunning())
+ .describedAs("Recording should be enabled")
+ .isTrue();
+ Mockito.verify(_recording).start();
+ }
+
+ @Test
+ public void noOpWhenOtherPropChanges() {
+ // First setup with it off
+ Map<String, String> disabledConfig = Map.of("pinot.jfr.enabled", "false");
+ _continuousJfrStarter.onChange(Set.of(), disabledConfig);
+ Assertions.assertThat(_continuousJfrStarter.isRunning())
+ .describedAs("Recording should be disabled")
+ .isFalse();
+ Mockito.verifyNoInteractions(_recording);
+
+ // Then call onChange with no relevant changes
+ Set<String> changed = Set.of("some.other.config");
+ Map<String, String> stillDisabledConfig = Map.of("pinot.jfr.enabled",
"false", "some.other.config", "true");
+ _continuousJfrStarter.onChange(changed, stillDisabledConfig);
+
+ Assertions.assertThat(_continuousJfrStarter.isRunning())
+ .describedAs("Recording should still be disabled")
+ .isFalse();
+ Mockito.verifyNoInteractions(_recording);
+ }
+
+ @Test
+ public void noOpWhenNewConfigIsEqual() {
+ // First setup with it off
+ Map<String, String> disabledConfig = Map.of("pinot.jfr.enabled", "false");
+ _continuousJfrStarter.onChange(Set.of(), disabledConfig);
+ Assertions.assertThat(_continuousJfrStarter.isRunning())
+ .describedAs("Recording should be disabled")
+ .isFalse();
+ Mockito.verifyNoInteractions(_recording);
+
+ // Then call onChange with the same config
+ Set<String> changed = Set.of("pinot.jfr.enabled");
+ Map<String, String> stillDisabledConfig = Map.of("pinot.jfr.enabled",
"false");
+ _continuousJfrStarter.onChange(changed, stillDisabledConfig);
+
+ Assertions.assertThat(_continuousJfrStarter.isRunning())
+ .describedAs("Recording should still be disabled")
+ .isFalse();
+ Mockito.verifyNoInteractions(_recording);
+ }
+
+ @Test
+ public void cleanUpThreadDeletesFiles()
+ throws IOException {
+ Path tempDirectory = Files.createTempDirectory("jfr-test-");
+ int maxDumps = 3;
+ long now = ZonedDateTime.of(2025, 10, 13, 12, 0, 0, 0,
ZoneOffset.UTC).toInstant().toEpochMilli();
+ try {
+ long[] dates = IntStream.range(0, maxDumps * 2)
+ .mapToLong(i -> now - i * 3600_000L)
+ .sorted()
+ .toArray();
+ for (long creationDate : dates) {
+ Path path = ContinuousJfrStarter.getRecordingPath(tempDirectory,
"test", Instant.ofEpochMilli(creationDate));
+ File file = path.toFile();
+ Assertions.assertThat(file.createNewFile())
+ .describedAs("Should be able to create a file in the temp
directory")
+ .isTrue();
+ Assertions.assertThat(file.setLastModified(creationDate))
+ .describedAs("Should be able to set the last modified time")
+ .isTrue();
+ }
+
+ // Verify that we have 2 * maxDumps files
+ try (var files = Files.list(tempDirectory)) {
+ Assertions.assertThat(files.count())
+ .describedAs("Should have 2 * maxDumps files in the temp
directory")
+ .isEqualTo(maxDumps * 2);
+ }
+
+ // Run the cleanup
+ ContinuousJfrStarter.cleanUpDumps(tempDirectory, maxDumps, "test");
+
+ // Verify that we have maxDumps files and only the newest ones are kept
+ try (var files = Files.list(tempDirectory)) {
+ var remainingFiles = files.collect(Collectors.toSet());
+ Assertions.assertThat(remainingFiles)
+ .describedAs("Should have maxDumps files in the temp directory")
+ .hasSize(maxDumps);
+ for (int i = 0; i < maxDumps; i++) {
+ long creationDate = dates[dates.length - 1 - i];
+ Instant timestamp = Instant.ofEpochMilli(creationDate);
+ Path expectedPath =
ContinuousJfrStarter.getRecordingPath(tempDirectory, "test", timestamp);
+ Assertions.assertThat(remainingFiles)
+ .describedAs("Should contain the expected file: %s",
expectedPath)
+ .contains(expectedPath);
+ }
+ }
+
+ // Clean all files
+ ContinuousJfrStarter.cleanUpDumps(tempDirectory, 0, "test");
+ try (var files = Files.list(tempDirectory)) {
+ Assertions.assertThat(files.count())
+ .describedAs("Should have no files in the temp directory")
+ .isEqualTo(0);
+ }
+ } finally {
+ FileUtils.deleteDirectory(tempDirectory.toFile());
+ }
+ }
+}
diff --git
a/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java
b/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java
index f12b9ca6a6b..fbcbff886c8 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java
@@ -141,7 +141,7 @@ public abstract class BaseMinionStarter implements
ServiceStartable {
Executors.newCachedThreadPool(new
ThreadFactoryBuilder().setNameFormat("async-task-thread-%d").build());
MinionEventObservers.init(_config, _executorService);
- ContinuousJfrStarter.init(_config);
+
_clusterConfigChangeHandler.registerClusterConfigChangeListener(ContinuousJfrStarter.INSTANCE);
}
/// Can be overridden to apply custom configs to the minion conf.
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index ddcfd5c2514..3e813338b70 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -276,7 +276,7 @@ public abstract class BaseServerStarter implements
ServiceStartable {
_helixManager =
HelixManagerFactory.getZKHelixManager(_helixClusterName, _instanceId,
InstanceType.PARTICIPANT, _zkAddress);
- ContinuousJfrStarter.init(_serverConf);
+
_clusterConfigChangeHandler.registerClusterConfigChangeListener(ContinuousJfrStarter.INSTANCE);
}
/// Can be overridden to apply custom configs to the server conf.
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/env/PinotConfiguration.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/env/PinotConfiguration.java
index 2bf0c2bb9ed..334896b0432 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/env/PinotConfiguration.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/env/PinotConfiguration.java
@@ -123,7 +123,7 @@ public class PinotConfiguration {
*
* @param baseProperties to provide programmatically through a {@link Map}.
*/
- public PinotConfiguration(Map<String, Object> baseProperties) {
+ public PinotConfiguration(Map<String, ? extends Object> baseProperties) {
this(baseProperties, new SystemEnvironment().getEnvironmentVariables());
}
@@ -135,7 +135,7 @@ public class PinotConfiguration {
* @param baseProperties with highest precedences (e.g. CLI arguments)
* @param environmentVariables as a {@link Map}.
*/
- public PinotConfiguration(Map<String, Object> baseProperties, Map<String,
String> environmentVariables) {
+ public PinotConfiguration(Map<String, ? extends Object> baseProperties,
Map<String, String> environmentVariables) {
_configuration = new CompositeConfiguration(
applyDynamicEnvConfig(computeConfigurationsFromSources(baseProperties,
environmentVariables),
environmentVariables));
@@ -190,7 +190,7 @@ public class PinotConfiguration {
}).collect(Collectors.toList());
}
- private static List<Configuration>
computeConfigurationsFromSources(Map<String, Object> baseProperties,
+ private static List<Configuration>
computeConfigurationsFromSources(Map<String, ? extends Object> baseProperties,
Map<String, String> environmentVariables) {
Map<String, Object> relaxedBaseProperties =
relaxProperties(baseProperties);
// Env is only used to check for config paths to load.
@@ -262,12 +262,12 @@ public class PinotConfiguration {
return envVarEntry.getKey().replace("_", ".").toLowerCase();
}
- private static Map<String, Object> relaxProperties(Map<String, Object>
properties) {
+ private static Map<String, Object> relaxProperties(Map<String, ? extends
Object> properties) {
return properties.entrySet().stream()
.collect(Collectors.toMap(PinotConfiguration::relaxPropertyName,
Entry::getValue));
}
- private static String relaxPropertyName(Entry<String, Object> propertyEntry)
{
+ private static String relaxPropertyName(Entry<String, ? extends Object>
propertyEntry) {
return relaxPropertyName(propertyEntry.getKey());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]