This is an automated email from the ASF dual-hosted git repository. saurabhd336 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new bc07b8dd2c Metric for upsert tables count (#12505) bc07b8dd2c is described below commit bc07b8dd2c953b8f88967d56c1c28af1873ab158 Author: Shounak kulkarni <shounakmk...@gmail.com> AuthorDate: Wed Feb 28 21:00:18 2024 +0500 Metric for upsert tables count (#12505) * Metric for upsert table count * move other table metric computation to updateTableConfigMetrics --- .../pinot/common/metrics/ControllerGauge.java | 1 + .../controller/helix/SegmentStatusChecker.java | 19 +++++++----- .../ControllerPeriodicTasksIntegrationTest.java | 36 ++++++++++++++++++++-- .../tests/UpsertTableIntegrationTest.java | 8 +++-- 4 files changed, 51 insertions(+), 13 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java index 82e86c55a9..938242ef78 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java @@ -54,6 +54,7 @@ public enum ControllerGauge implements AbstractMetrics.Gauge { REALTIME_TABLE_COUNT("TableCount", true), OFFLINE_TABLE_COUNT("TableCount", true), DISABLED_TABLE_COUNT("TableCount", true), + UPSERT_TABLE_COUNT("TableCount", true), PERIODIC_TASK_NUM_TABLES_PROCESSED("PeriodicTaskNumTablesProcessed", true), TIME_MS_SINCE_LAST_MINION_TASK_METADATA_UPDATE("TimeMsSinceLastMinionTaskMetadataUpdate", false), TIME_MS_SINCE_LAST_SUCCESSFUL_MINION_TASK_GENERATION("TimeMsSinceLastSuccessfulMinionTaskGeneration", false), diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java index dd598b2b21..1ac7446592 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java @@ -117,7 +117,7 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh protected void processTable(String tableNameWithType, Context context) { try { TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType); - updateTableConfigMetrics(tableNameWithType, tableConfig); + updateTableConfigMetrics(tableNameWithType, tableConfig, context); updateSegmentMetrics(tableNameWithType, tableConfig, context); updateTableSizeMetrics(tableNameWithType); } catch (Exception e) { @@ -132,6 +132,7 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh protected void postprocess(Context context) { _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.REALTIME_TABLE_COUNT, context._realTimeTableCount); _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.OFFLINE_TABLE_COUNT, context._offlineTableCount); + _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.UPSERT_TABLE_COUNT, context._upsertTableCount); _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT, context._disabledTables.size()); //emit a 0 for tables that are not paused/disabled. This makes alert expressions simpler as we don't have to deal @@ -156,12 +157,20 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh * Updates metrics related to the table config. * If table config not found, resets the metrics */ - private void updateTableConfigMetrics(String tableNameWithType, TableConfig tableConfig) { + private void updateTableConfigMetrics(String tableNameWithType, TableConfig tableConfig, Context context) { if (tableConfig == null) { LOGGER.warn("Found null table config for table: {}. Resetting table config metrics.", tableNameWithType); _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.REPLICATION_FROM_CONFIG, 0); return; } + if (tableConfig.getTableType() == TableType.OFFLINE) { + context._offlineTableCount++; + } else { + context._realTimeTableCount++; + } + if (tableConfig.isUpsertEnabled()) { + context._upsertTableCount++; + } int replication = tableConfig.getReplication(); _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.REPLICATION_FROM_CONFIG, replication); } @@ -177,11 +186,6 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh */ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableConfig, Context context) { TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); - if (tableType == TableType.OFFLINE) { - context._offlineTableCount++; - } else { - context._realTimeTableCount++; - } IdealState idealState = _pinotHelixResourceManager.getTableIdealState(tableNameWithType); @@ -386,6 +390,7 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh private boolean _logDisabledTables; private int _realTimeTableCount; private int _offlineTableCount; + private int _upsertTableCount; private Set<String> _processedTables = new HashSet<>(); private Set<String> _disabledTables = new HashSet<>(); private Set<String> _pausedTables = new HashSet<>(); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java index 6b558b93e7..03a2b6a000 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java @@ -19,8 +19,10 @@ package org.apache.pinot.integration.tests; import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -40,6 +42,7 @@ import org.apache.pinot.controller.validation.OfflineSegmentIntervalChecker; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TagOverrideConfig; import org.apache.pinot.spi.config.table.TenantConfig; +import org.apache.pinot.spi.config.table.UpsertConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; import org.apache.pinot.spi.utils.builder.TableNameBuilder; @@ -75,11 +78,18 @@ public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrati private String _currentTable = DEFAULT_TABLE_NAME; + private String _schemaFileName = DEFAULT_SCHEMA_FILE_NAME; + @Override protected String getTableName() { return _currentTable; } + @Override + protected String getSchemaFileName() { + return _schemaFileName; + } + @Override protected int getNumReplicas() { return NUM_REPLICAS; @@ -182,6 +192,7 @@ public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrati String emptyTable = "emptyTable"; String disabledTable = "disabledTable"; String tableWithOfflineSegment = "tableWithOfflineSegment"; + String upsertTable = "upsertTable"; Schema schema = createSchema(); _currentTable = emptyTable; @@ -195,6 +206,11 @@ public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrati addTableConfig(createOfflineTableConfig()); _helixAdmin.enableResource(getHelixClusterName(), TableNameBuilder.OFFLINE.tableNameWithType(disabledTable), false); + _currentTable = upsertTable; + _schemaFileName = UpsertTableIntegrationTest.UPSERT_SCHEMA_FILE_NAME; + setupUpsertTable(); + _schemaFileName = DEFAULT_SCHEMA_FILE_NAME; + _currentTable = tableWithOfflineSegment; schema.setSchemaName(_currentTable); addSchema(schema); @@ -211,7 +227,7 @@ public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrati _currentTable = DEFAULT_TABLE_NAME; - int numTables = 5; + int numTables = 6; ControllerMetrics controllerMetrics = _controllerStarter.getControllerMetrics(); TestUtils.waitForCondition(aVoid -> { if (MetricValueUtils.getGlobalGaugeValue(controllerMetrics, "SegmentStatusChecker", @@ -246,8 +262,9 @@ public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrati return false; } return MetricValueUtils.getGlobalGaugeValue(controllerMetrics, ControllerGauge.OFFLINE_TABLE_COUNT) == 4 - && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, ControllerGauge.REALTIME_TABLE_COUNT) == 1 - && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, ControllerGauge.DISABLED_TABLE_COUNT) == 1; + && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, ControllerGauge.REALTIME_TABLE_COUNT) == 2 + && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, ControllerGauge.DISABLED_TABLE_COUNT) == 1 + && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, ControllerGauge.UPSERT_TABLE_COUNT) == 1; }, 60_000, "Timed out waiting for SegmentStatusChecker"); dropOfflineTable(emptyTable); @@ -258,6 +275,19 @@ public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrati deleteSchema(tableWithOfflineSegment); } + private void setupUpsertTable() + throws IOException { + Schema upsertSchema = createSchema(); + upsertSchema.setSchemaName(getTableName()); + upsertSchema.getDateTimeFieldSpecs().get(0).setName(UpsertTableIntegrationTest.TIME_COL_NAME); + addSchema(upsertSchema); + TableConfig tableConfig = + createCSVUpsertTableConfig(getTableName(), getKafkaTopic(), getNumKafkaPartitions(), new HashMap<>(), + new UpsertConfig(UpsertConfig.Mode.FULL), UpsertTableIntegrationTest.PRIMARY_KEY_COL); + tableConfig.getValidationConfig().setTimeColumnName(UpsertTableIntegrationTest.TIME_COL_NAME); + addTableConfig(tableConfig); + } + private boolean checkSegmentStatusCheckerMetrics(ControllerMetrics controllerMetrics, String tableNameWithType, IdealState idealState, long expectedNumReplicas, long expectedPercentReplicas, long expectedSegmentsInErrorState, long expectedPercentSegmentsAvailable) { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java index 342a8b01f3..238d515b54 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java @@ -82,8 +82,10 @@ public class UpsertTableIntegrationTest extends BaseClusterIntegrationTestSet { private static final String CSV_DELIMITER = ","; private static final String TABLE_NAME = "gameScores"; private static final int NUM_SERVERS = 2; - private static final String PRIMARY_KEY_COL = "playerId"; private static final String DELETE_COL = "deleted"; + public static final String PRIMARY_KEY_COL = "playerId"; + public static final String TIME_COL_NAME = "timestampInEpoch"; + public static final String UPSERT_SCHEMA_FILE_NAME = "upsert_table_test.schema"; protected PinotTaskManager _taskManager; protected PinotHelixTaskResourceManager _helixTaskResourceManager; @@ -133,13 +135,13 @@ public class UpsertTableIntegrationTest extends BaseClusterIntegrationTestSet { @Override protected String getSchemaFileName() { - return "upsert_table_test.schema"; + return UPSERT_SCHEMA_FILE_NAME; } @Nullable @Override protected String getTimeColumnName() { - return "timestampInEpoch"; + return TIME_COL_NAME; } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org