This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 8ffa381 Add OfflineSegmentIntervalChecker to PeriodicTasksIntegrationTests (#4278) 8ffa381 is described below commit 8ffa381f32a034aae070e087e16db2e416df88cc Author: Seunghyun Lee <sn...@linkedin.com> AuthorDate: Wed Jun 5 17:38:59 2019 -0700 Add OfflineSegmentIntervalChecker to PeriodicTasksIntegrationTests (#4278) * Add OfflineSegmentIntervalChecker to PeriodicTasksIntegrationTests * Addressed comments --- .../pinot/common/metrics/ValidationMetrics.java | 14 ++++++-- .../apache/pinot/controller/ControllerConf.java | 5 +++ .../validation/OfflineSegmentIntervalChecker.java | 5 +++ .../ControllerPeriodicTasksIntegrationTests.java | 37 +++++++++++++++++++--- 4 files changed, 54 insertions(+), 7 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ValidationMetrics.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ValidationMetrics.java index 88d300b..8f4b4fd 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ValidationMetrics.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ValidationMetrics.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.common.metrics; +import com.google.common.annotations.VisibleForTesting; import com.yammer.metrics.core.Gauge; import com.yammer.metrics.core.MetricName; import com.yammer.metrics.core.MetricsRegistry; @@ -33,7 +34,6 @@ import java.util.concurrent.TimeUnit; */ public class ValidationMetrics { private final MetricsRegistry _metricsRegistry; - private final Map<String, Long> _gaugeValues = new HashMap<>(); private final Set<MetricName> _metricNames = new HashSet<>(); @@ -216,7 +216,8 @@ public class ValidationMetrics { makeGauge(fullGaugeName, makeMetricName(fullGaugeName), _storedValueGaugeFactory, segmentCount); } - private String makeGaugeName(final String resource, final String gaugeName) { + @VisibleForTesting + public static String makeGaugeName(final String resource, final String gaugeName) { return "pinot.controller." + resource + "." + gaugeName; } @@ -246,4 +247,13 @@ public class ValidationMetrics { _metricNames.clear(); _gaugeValues.clear(); } + + @VisibleForTesting + public long getValueOfGauge(final String fullGaugeName) { + Long value = _gaugeValues.get(fullGaugeName); + if (value == null) { + return 0; + } + return value; + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java index 0b23fdb..d5a620e 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java @@ -629,6 +629,11 @@ public class ControllerConf extends PropertiesConfiguration { initialDelayInSeconds); } + public void setOfflineSegmentIntervalCheckerInitialDelayInSeconds(long initialDelayInSeconds) { + setProperty(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_INITIAL_DELAY_IN_SECONDS, + initialDelayInSeconds); + } + public long getPeriodicTaskInitialDelayInSeconds() { return ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds(); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java index 08e7c41..7f19395 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java @@ -183,6 +183,11 @@ public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask<Void> return numTotalDocs; } + @VisibleForTesting + public ValidationMetrics getValidationMetrics() { + return _validationMetrics; + } + @Override public void cleanUpTask() { LOGGER.info("Unregister all the validation metrics."); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java index a947373..a0a2c5c 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java @@ -31,9 +31,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; -import kafka.server.KafkaServerStartable; import org.apache.commons.io.FileUtils; -import org.apache.helix.HelixAdmin; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; import org.apache.pinot.common.config.TableConfig; @@ -45,10 +43,12 @@ import org.apache.pinot.common.data.Schema; import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMeter; import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.common.metrics.ValidationMetrics; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.helix.HelixHelper; import org.apache.pinot.common.utils.retry.RetryPolicies; import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.validation.OfflineSegmentIntervalChecker; import org.apache.pinot.core.indexsegment.generator.SegmentVersion; import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils; import org.apache.pinot.util.TestUtils; @@ -74,7 +74,8 @@ import org.testng.annotations.Test; * See group = "segmentStatusChecker" for example. * The tables needed for the test will be created in beforeTask(), and dropped in afterTask() * - * The groups run sequentially in the order: segmentStatusChecker -> realtimeSegmentRelocation -> brokerResourceValidationManager -> .... + * The groups run sequentially in the order: segmentStatusChecker -> realtimeSegmentRelocation -> + * brokerResourceValidationManager -> OfflineSegmentIntervalChecker .... */ public class ControllerPeriodicTasksIntegrationTests extends BaseClusterIntegrationTestSet { @@ -107,8 +108,10 @@ public class ControllerPeriodicTasksIntegrationTests extends BaseClusterIntegrat controllerConf.setStatusCheckerFrequencyInSeconds(PERIODIC_TASK_FREQ_SECONDS); controllerConf.setRealtimeSegmentRelocationInitialDelayInSeconds(PERIODIC_TASK_INITIAL_DELAY_SECONDS); controllerConf.setRealtimeSegmentRelocatorFrequency(PERIODIC_TASK_FREQ); - controllerConf.setBrokerResourceValidationInitialDelayInSeconds(PERIODIC_TASK_FREQ_SECONDS); + controllerConf.setBrokerResourceValidationInitialDelayInSeconds(PERIODIC_TASK_INITIAL_DELAY_SECONDS); controllerConf.setBrokerResourceValidationFrequencyInSeconds(PERIODIC_TASK_FREQ_SECONDS); + controllerConf.setOfflineSegmentIntervalCheckerInitialDelayInSeconds(PERIODIC_TASK_INITIAL_DELAY_SECONDS); + controllerConf.setOfflineSegmentIntervalCheckerFrequencyInSeconds(PERIODIC_TASK_FREQ_SECONDS); startController(controllerConf); startBroker(); @@ -472,7 +475,31 @@ public class ControllerPeriodicTasksIntegrationTests extends BaseClusterIntegrat dropOfflineTable(table2); } - // TODO: tests for other ControllerPeriodicTasks (RetentionManager, OfflineSegmentIntervalChecker, RealtimeSegmentValidationManager) + @Test(groups = "offlineSegmentIntervalChecker", dependsOnGroups = "brokerResourceValidationManager") + public void testOfflineSegmentIntervalChecker() + throws Exception { + OfflineSegmentIntervalChecker offlineSegmentIntervalChecker = _controllerStarter.getOfflineSegmentIntervalChecker(); + ValidationMetrics validationMetrics = offlineSegmentIntervalChecker.getValidationMetrics(); + + String tablNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(DEFAULT_TABLE_NAME); + + // Wait until OfflineSegmentIntervalChecker gets executed + TestUtils.waitForCondition( + input -> validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tablNameWithType, "SegmentCount")) + > 0, 60_000, "Timed out waiting for OfflineSegmentIntervalChecker"); + + // Test the validation metrics values updated by OfflineSegmentIntervalChecker against the known values + // from segment metadata + Assert.assertEquals( + validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tablNameWithType, "SegmentCount")), 12); + Assert.assertEquals( + validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tablNameWithType, "missingSegmentCount")), 0); + Assert.assertEquals( + validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tablNameWithType, "TotalDocumentCount")), + 115545); + } + + // TODO: tests for other ControllerPeriodicTasks (RetentionManagert , RealtimeSegmentValidationManager) @Override protected boolean isUsingNewConfigFormat() { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org