This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 00b5a73b20 Keep get tables API with and without database (#12804) 00b5a73b20 is described below commit 00b5a73b20415aaea0f9bb4f44c07ad21f563193 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Mon Apr 8 01:10:31 2024 -0700 Keep get tables API with and without database (#12804) --- .../pinot/controller/BaseControllerStarter.java | 125 +++--- .../api/resources/PinotBrokerRestletResource.java | 13 +- .../PinotLeadControllerRestletResource.java | 3 +- .../helix/core/PinotHelixResourceManager.java | 79 ++-- .../core/cleanup/StaleInstancesCleanupTask.java | 9 +- .../helix/core/minion/PinotTaskManager.java | 25 +- .../core/periodictask/ControllerPeriodicTask.java | 9 +- .../helix/RealtimeConsumerMonitorTest.java | 85 ++--- .../controller/helix/SegmentStatusCheckerTest.java | 425 ++++++++++----------- .../periodictask/ControllerPeriodicTaskTest.java | 20 +- .../helix/core/retention/RetentionManagerTest.java | 71 ++-- 11 files changed, 376 insertions(+), 488 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index f78a49f2f9..91df2e0743 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -201,8 +201,8 @@ public abstract class BaseControllerStarter implements ServiceStartable { _helixClusterName = _config.getHelixClusterName(); ServiceStartableUtils.applyClusterConfig(_config, _helixZkURL, _helixClusterName, ServiceRole.CONTROLLER); - PinotInsecureMode.setPinotInInsecureMode( - Boolean.valueOf(_config.getProperty(CommonConstants.CONFIG_OF_PINOT_INSECURE_MODE, + PinotInsecureMode.setPinotInInsecureMode(Boolean.valueOf( + _config.getProperty(CommonConstants.CONFIG_OF_PINOT_INSECURE_MODE, CommonConstants.DEFAULT_PINOT_INSECURE_MODE))); setupHelixSystemProperties(); @@ -531,8 +531,8 @@ public abstract class BaseControllerStarter implements ServiceStartable { if (tableConfig != null) { Map<String, String> streamConfigMap = IngestionConfigUtils.getStreamConfigMap(tableConfig); try { - StreamConfig.validateConsumerType( - streamConfigMap.getOrDefault(StreamConfigProperties.STREAM_TYPE, "kafka"), streamConfigMap); + StreamConfig.validateConsumerType(streamConfigMap.getOrDefault(StreamConfigProperties.STREAM_TYPE, "kafka"), + streamConfigMap); } catch (Exception e) { existingHlcTables.add(rt); } @@ -587,66 +587,63 @@ public abstract class BaseControllerStarter implements ServiceStartable { AtomicInteger failedToUpdateTableConfigCount = new AtomicInteger(); ZkHelixPropertyStore<ZNRecord> propertyStore = _helixResourceManager.getPropertyStore(); - _helixResourceManager.getDatabaseNames().stream() - .map(_helixResourceManager::getAllTables) - .flatMap(List::stream) - .forEach(tableNameWithType -> { - Pair<TableConfig, Integer> tableConfigWithVersion = - ZKMetadataProvider.getTableConfigWithVersion(propertyStore, tableNameWithType); - if (tableConfigWithVersion == null) { - // This might due to table deletion, just log it here. - LOGGER.warn("Failed to find table config for table: {}, the table likely already got deleted", - tableNameWithType); - return; - } - TableConfig tableConfig = tableConfigWithVersion.getLeft(); - String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); - String schemaPath = ZKMetadataProvider.constructPropertyStorePathForSchema(rawTableName); - boolean schemaExists = propertyStore.exists(schemaPath, AccessOption.PERSISTENT); - String existSchemaName = tableConfig.getValidationConfig().getSchemaName(); - if (existSchemaName == null || existSchemaName.equals(rawTableName)) { - // Although the table config is valid, we still need to ensure the schema exists - if (!schemaExists) { - LOGGER.warn("Failed to find schema for table: {}", tableNameWithType); - tableWithoutSchemaCount.getAndIncrement(); - return; - } - // Table config is already in good status - return; - } - misconfiguredTableCount.getAndIncrement(); - if (schemaExists) { - // If a schema named `rawTableName` already exists, then likely this is a misconfiguration. - // Reset schema name in table config to null to let the table point to the existing schema. - LOGGER.warn("Schema: {} already exists, fix the schema name in table config from {} to null", rawTableName, - existSchemaName); - } else { - // Copy the schema current table referring to to `rawTableName` if it does not exist - Schema schema = _helixResourceManager.getSchema(existSchemaName); - if (schema == null) { - LOGGER.warn("Failed to find schema: {} for table: {}", existSchemaName, tableNameWithType); - tableWithoutSchemaCount.getAndIncrement(); - return; - } - schema.setSchemaName(rawTableName); - if (propertyStore.create(schemaPath, SchemaUtils.toZNRecord(schema), AccessOption.PERSISTENT)) { - LOGGER.info("Copied schema: {} to {}", existSchemaName, rawTableName); - } else { - LOGGER.warn("Failed to copy schema: {} to {}", existSchemaName, rawTableName); - failedToCopySchemaCount.getAndIncrement(); - return; - } - } - // Update table config to remove schema name - tableConfig.getValidationConfig().setSchemaName(null); - if (ZKMetadataProvider.setTableConfig(propertyStore, tableConfig, tableConfigWithVersion.getRight())) { - LOGGER.info("Removed schema name from table config for table: {}", tableNameWithType); - fixedSchemaTableCount.getAndIncrement(); - } else { - LOGGER.warn("Failed to update table config for table: {}", tableNameWithType); - failedToUpdateTableConfigCount.getAndIncrement(); - } - }); + _helixResourceManager.getAllTables().forEach(tableNameWithType -> { + Pair<TableConfig, Integer> tableConfigWithVersion = + ZKMetadataProvider.getTableConfigWithVersion(propertyStore, tableNameWithType); + if (tableConfigWithVersion == null) { + // This might due to table deletion, just log it here. + LOGGER.warn("Failed to find table config for table: {}, the table likely already got deleted", + tableNameWithType); + return; + } + TableConfig tableConfig = tableConfigWithVersion.getLeft(); + String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + String schemaPath = ZKMetadataProvider.constructPropertyStorePathForSchema(rawTableName); + boolean schemaExists = propertyStore.exists(schemaPath, AccessOption.PERSISTENT); + String existSchemaName = tableConfig.getValidationConfig().getSchemaName(); + if (existSchemaName == null || existSchemaName.equals(rawTableName)) { + // Although the table config is valid, we still need to ensure the schema exists + if (!schemaExists) { + LOGGER.warn("Failed to find schema for table: {}", tableNameWithType); + tableWithoutSchemaCount.getAndIncrement(); + return; + } + // Table config is already in good status + return; + } + misconfiguredTableCount.getAndIncrement(); + if (schemaExists) { + // If a schema named `rawTableName` already exists, then likely this is a misconfiguration. + // Reset schema name in table config to null to let the table point to the existing schema. + LOGGER.warn("Schema: {} already exists, fix the schema name in table config from {} to null", rawTableName, + existSchemaName); + } else { + // Copy the schema current table referring to to `rawTableName` if it does not exist + Schema schema = _helixResourceManager.getSchema(existSchemaName); + if (schema == null) { + LOGGER.warn("Failed to find schema: {} for table: {}", existSchemaName, tableNameWithType); + tableWithoutSchemaCount.getAndIncrement(); + return; + } + schema.setSchemaName(rawTableName); + if (propertyStore.create(schemaPath, SchemaUtils.toZNRecord(schema), AccessOption.PERSISTENT)) { + LOGGER.info("Copied schema: {} to {}", existSchemaName, rawTableName); + } else { + LOGGER.warn("Failed to copy schema: {} to {}", existSchemaName, rawTableName); + failedToCopySchemaCount.getAndIncrement(); + return; + } + } + // Update table config to remove schema name + tableConfig.getValidationConfig().setSchemaName(null); + if (ZKMetadataProvider.setTableConfig(propertyStore, tableConfig, tableConfigWithVersion.getRight())) { + LOGGER.info("Removed schema name from table config for table: {}", tableNameWithType); + fixedSchemaTableCount.getAndIncrement(); + } else { + LOGGER.warn("Failed to update table config for table: {}", tableNameWithType); + failedToUpdateTableConfigCount.getAndIncrement(); + } + }); LOGGER.info( "Found {} tables misconfigured, {} tables without schema. Successfully fixed schema for {} tables, failed to " + "fix {} tables due to copy schema failure, failed to fix {} tables due to update table config failure.", diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotBrokerRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotBrokerRestletResource.java index 1863819ab9..4fdca71a82 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotBrokerRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotBrokerRestletResource.java @@ -127,10 +127,8 @@ public class PinotBrokerRestletResource { @ApiOperation(value = "List tables to brokers mappings", notes = "List tables to brokers mappings") public Map<String, List<String>> getTablesToBrokersMapping( @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state, @Context HttpHeaders headers) { - Map<String, List<String>> resultMap = new HashMap<>(); - _pinotHelixResourceManager.getAllRawTables(headers.getHeaderString(DATABASE)) - .forEach(table -> resultMap.put(table, getBrokersForTable(table, null, state, headers))); - return resultMap; + return _pinotHelixResourceManager.getAllRawTables(headers.getHeaderString(DATABASE)).stream() + .collect(Collectors.toMap(table -> table, table -> getBrokersForTable(table, null, state, headers))); } @GET @@ -201,11 +199,8 @@ public class PinotBrokerRestletResource { @ApiOperation(value = "List tables to brokers mappings", notes = "List tables to brokers mappings") public Map<String, List<InstanceInfo>> getTablesToBrokersMappingV2( @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state, @Context HttpHeaders headers) { - Map<String, List<InstanceInfo>> resultMap = new HashMap<>(); - String databaseName = headers.getHeaderString(DATABASE); - _pinotHelixResourceManager.getAllRawTables(databaseName).stream() - .forEach(table -> resultMap.put(table, getBrokersForTableV2(table, null, state, headers))); - return resultMap; + return _pinotHelixResourceManager.getAllRawTables(headers.getHeaderString(DATABASE)).stream() + .collect(Collectors.toMap(table -> table, table -> getBrokersForTableV2(table, null, state, headers))); } @GET diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLeadControllerRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLeadControllerRestletResource.java index 5f09f74336..16f13e0471 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLeadControllerRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLeadControllerRestletResource.java @@ -109,8 +109,7 @@ public class PinotLeadControllerRestletResource { } // Assigns all the tables to the relevant partitions. - List<String> tableNames = _pinotHelixResourceManager.getAllTables( - headers.getHeaderString(DATABASE)); + List<String> tableNames = _pinotHelixResourceManager.getAllTables(headers.getHeaderString(DATABASE)); for (String tableName : tableNames) { String rawTableName = TableNameBuilder.extractRawTableName(tableName); int partitionId = LeadControllerUtils.getPartitionIdForTable(rawTableName); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index f1761ce866..680e475d63 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -716,13 +716,12 @@ public class PinotHelixResourceManager { } /** - * Get all table names (with type suffix) in default database. + * Get all table names (with type suffix) in all databases. * - * @return List of table names in default database + * @return List of table names */ - @Deprecated public List<String> getAllTables() { - return getAllTables(null); + return getAllResources().stream().filter(TableNameBuilder::isTableResource).collect(Collectors.toList()); } /** @@ -732,23 +731,18 @@ public class PinotHelixResourceManager { * @return List of table names in provided database name */ public List<String> getAllTables(@Nullable String databaseName) { - List<String> tableNames = new ArrayList<>(); - for (String resourceName : getAllResources()) { - if (TableNameBuilder.isTableResource(resourceName) - && DatabaseUtils.isPartOfDatabase(resourceName, databaseName)) { - tableNames.add(resourceName); - } - } - return tableNames; + return getAllResources().stream().filter( + resourceName -> TableNameBuilder.isTableResource(resourceName) && DatabaseUtils.isPartOfDatabase(resourceName, + databaseName)).collect(Collectors.toList()); } /** - * Get all offline table names from default database. + * Get all offline table names from all databases. * - * @return List of offline table names in default database + * @return List of offline table names */ public List<String> getAllOfflineTables() { - return getAllOfflineTables(null); + return getAllResources().stream().filter(TableNameBuilder::isOfflineTableResource).collect(Collectors.toList()); } /** @@ -758,23 +752,18 @@ public class PinotHelixResourceManager { * @return List of offline table names in provided database name */ public List<String> getAllOfflineTables(@Nullable String databaseName) { - List<String> offlineTableNames = new ArrayList<>(); - for (String resourceName : getAllResources()) { - if (DatabaseUtils.isPartOfDatabase(resourceName, databaseName) - && TableNameBuilder.isOfflineTableResource(resourceName)) { - offlineTableNames.add(resourceName); - } - } - return offlineTableNames; + return getAllResources().stream().filter( + resourceName -> TableNameBuilder.isOfflineTableResource(resourceName) && DatabaseUtils.isPartOfDatabase( + resourceName, databaseName)).collect(Collectors.toList()); } /** - * Get all dimension table names from default database. + * Get all dimension table names from all databases. * - * @return List of dimension table names in default database + * @return List of dimension table names */ public List<String> getAllDimensionTables() { - return getAllDimensionTables(null); + return _tableCache.getAllDimensionTables(); } /** @@ -785,17 +774,16 @@ public class PinotHelixResourceManager { */ public List<String> getAllDimensionTables(@Nullable String databaseName) { return _tableCache.getAllDimensionTables().stream() - .filter(table -> DatabaseUtils.isPartOfDatabase(table, databaseName)) - .collect(Collectors.toList()); + .filter(table -> DatabaseUtils.isPartOfDatabase(table, databaseName)).collect(Collectors.toList()); } /** - * Get all realtime table names from default database. + * Get all realtime table names from all databases. * - * @return List of realtime table names in default database + * @return List of realtime table names */ public List<String> getAllRealtimeTables() { - return getAllRealtimeTables(null); + return getAllResources().stream().filter(TableNameBuilder::isRealtimeTableResource).collect(Collectors.toList()); } /** @@ -805,23 +793,19 @@ public class PinotHelixResourceManager { * @return List of realtime table names in provided database name */ public List<String> getAllRealtimeTables(@Nullable String databaseName) { - List<String> realtimeTableNames = new ArrayList<>(); - for (String resourceName : getAllResources()) { - if (DatabaseUtils.isPartOfDatabase(resourceName, databaseName) - && TableNameBuilder.isRealtimeTableResource(resourceName)) { - realtimeTableNames.add(resourceName); - } - } - return realtimeTableNames; + return getAllResources().stream().filter( + resourceName -> TableNameBuilder.isRealtimeTableResource(resourceName) && DatabaseUtils.isPartOfDatabase( + resourceName, databaseName)).collect(Collectors.toList()); } /** - * Get all raw table names in default database. + * Get all raw table names in all databases. * - * @return List of raw table names in default database + * @return List of raw table names */ public List<String> getAllRawTables() { - return getAllRawTables(null); + return getAllResources().stream().filter(TableNameBuilder::isTableResource) + .map(TableNameBuilder::extractRawTableName).distinct().collect(Collectors.toList()); } /** @@ -831,14 +815,9 @@ public class PinotHelixResourceManager { * @return List of raw table names in provided database name */ public List<String> getAllRawTables(@Nullable String databaseName) { - Set<String> rawTableNames = new HashSet<>(); - for (String resourceName : getAllResources()) { - if (TableNameBuilder.isTableResource(resourceName) - && DatabaseUtils.isPartOfDatabase(resourceName, databaseName)) { - rawTableNames.add(TableNameBuilder.extractRawTableName(resourceName)); - } - } - return new ArrayList<>(rawTableNames); + return getAllResources().stream().filter( + resourceName -> TableNameBuilder.isTableResource(resourceName) && DatabaseUtils.isPartOfDatabase(resourceName, + databaseName)).map(TableNameBuilder::extractRawTableName).distinct().collect(Collectors.toList()); } /** diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/cleanup/StaleInstancesCleanupTask.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/cleanup/StaleInstancesCleanupTask.java index 027712bde8..b257462985 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/cleanup/StaleInstancesCleanupTask.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/cleanup/StaleInstancesCleanupTask.java @@ -138,12 +138,9 @@ public class StaleInstancesCleanupTask extends BasePeriodicTask { private Set<String> getServerInstancesInUse() { Set<String> serverInstancesInUse = new HashSet<>(); - _pinotHelixResourceManager.getDatabaseNames().stream() - .map(_pinotHelixResourceManager::getAllTables) - .flatMap(List::stream) - .forEach(tableName -> serverInstancesInUse.addAll( - Optional.ofNullable(_pinotHelixResourceManager.getTableIdealState(tableName)) - .map(is -> is.getInstanceSet(tableName)).orElse(Collections.emptySet()))); + _pinotHelixResourceManager.getAllTables().forEach(tableName -> serverInstancesInUse.addAll( + Optional.ofNullable(_pinotHelixResourceManager.getTableIdealState(tableName)) + .map(is -> is.getInstanceSet(tableName)).orElse(Collections.emptySet()))); return serverInstancesInUse; } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java index 9b2fced8c2..2cdbf8c1df 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java @@ -480,13 +480,12 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { } /** - * Public API to schedule tasks (all task types) for all tables in default database. + * Public API to schedule tasks (all task types) for all tables in all databases. * It might be called from the non-leader controller. * Returns a map from the task type to the list of tasks scheduled. */ - @Deprecated public synchronized Map<String, List<String>> scheduleTasks() { - return scheduleTasks(_pinotHelixResourceManager.getAllTables(CommonConstants.DEFAULT_DATABASE), false); + return scheduleTasks(_pinotHelixResourceManager.getAllTables(), false); } /** @@ -494,7 +493,7 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { * It might be called from the non-leader controller. * Returns a map from the task type to the list of tasks scheduled. */ - public synchronized Map<String, List<String>> scheduleTasksForDatabase(String database) { + public synchronized Map<String, List<String>> scheduleTasksForDatabase(@Nullable String database) { return scheduleTasks(_pinotHelixResourceManager.getAllTables(database), false); } @@ -605,8 +604,8 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { } } catch (Exception e) { numErrorTasksScheduled++; - LOGGER.error("Failed to schedule task type {} on minion instance {} with task configs: {}", - taskType, minionInstanceTag, pinotTaskConfigs, e); + LOGGER.error("Failed to schedule task type {} on minion instance {} with task configs: {}", taskType, + minionInstanceTag, pinotTaskConfigs, e); } } if (numErrorTasksScheduled > 0) { @@ -629,14 +628,13 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { } /** - * Public API to schedule task for the given task type in default database. + * Public API to schedule task for the given task type in all databases. * It might be called from the non-leader controller. * Returns the list of task names, or {@code null} if no task is scheduled. */ - @Deprecated @Nullable public synchronized List<String> scheduleTask(String taskType) { - return scheduleTaskForDatabase(taskType, CommonConstants.DEFAULT_DATABASE); + return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables()); } /** @@ -645,13 +643,18 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { * Returns the list of task name, or {@code null} if no task is scheduled. */ @Nullable - public synchronized List<String> scheduleTaskForDatabase(String taskType, String database) { + public synchronized List<String> scheduleTaskForDatabase(String taskType, @Nullable String database) { + return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables(database)); + } + + @Nullable + private List<String> scheduleTask(String taskType, List<String> tables) { PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType); Preconditions.checkState(taskGenerator != null, "Task type: %s is not registered", taskType); // Scan all table configs to get the tables with task enabled List<TableConfig> enabledTableConfigs = new ArrayList<>(); - for (String tableNameWithType : _pinotHelixResourceManager.getAllTables(database)) { + for (String tableNameWithType : tables) { TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType); if (tableConfig != null && tableConfig.getTaskConfig() != null && tableConfig.getTaskConfig() .isTaskTypeEnabled(taskType)) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java index 47dc218f43..d3584df068 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java @@ -20,7 +20,6 @@ package org.apache.pinot.controller.helix.core.periodictask; import com.google.common.collect.Sets; import java.util.ArrayList; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Properties; @@ -69,12 +68,8 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask { // Check if we have a specific table against which this task needs to be run. String propTableNameWithType = (String) periodicTaskProperties.get(PeriodicTask.PROPERTY_KEY_TABLE_NAME); // Process the tables that are managed by this controller - List<String> allTables = propTableNameWithType == null - ? _pinotHelixResourceManager.getDatabaseNames().stream() - .map(_pinotHelixResourceManager::getAllTables) - .flatMap(List::stream) - .collect(Collectors.toList()) - : Collections.singletonList(propTableNameWithType); + List<String> allTables = + propTableNameWithType != null ? List.of(propTableNameWithType) : _pinotHelixResourceManager.getAllTables(); Set<String> currentLeaderOfTables = allTables.stream() .filter(_leadControllerManager::isLeaderForTable) diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitorTest.java index 051fd784b6..26928deb3c 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitorTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitorTest.java @@ -18,9 +18,6 @@ */ package org.apache.pinot.controller.helix; -import com.google.common.collect.ImmutableMap; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -43,15 +40,14 @@ import org.apache.pinot.spi.metrics.PinotMetricsRegistry; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; -import org.testng.Assert; import org.testng.annotations.Test; -import static org.apache.pinot.spi.utils.CommonConstants.DEFAULT_DATABASE; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; public class RealtimeConsumerMonitorTest { @@ -59,17 +55,16 @@ public class RealtimeConsumerMonitorTest { @Test public void realtimeBasicTest() throws Exception { - final String tableName = "myTable_REALTIME"; - final String rawTableName = TableNameBuilder.extractRawTableName(tableName); - List<String> allTableNames = new ArrayList<String>(); - allTableNames.add(tableName); + String rawTableName = "myTable"; + String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(rawTableName); TableConfig tableConfig = - new TableConfigBuilder(TableType.REALTIME).setTableName(tableName).setTimeColumnName("timeColumn") + new TableConfigBuilder(TableType.REALTIME).setTableName(rawTableName).setTimeColumnName("timeColumn") .setNumReplicas(2).setStreamConfigs(getStreamConfigMap()).build(); + LLCSegmentName segmentPartition1Seq0 = new LLCSegmentName(rawTableName, 1, 0, System.currentTimeMillis()); LLCSegmentName segmentPartition1Seq1 = new LLCSegmentName(rawTableName, 1, 1, System.currentTimeMillis()); LLCSegmentName segmentPartition2Seq0 = new LLCSegmentName(rawTableName, 2, 0, System.currentTimeMillis()); - IdealState idealState = new IdealState(tableName); + IdealState idealState = new IdealState(realtimeTableName); idealState.setPartitionState(segmentPartition1Seq0.getSegmentName(), "pinot1", "ONLINE"); idealState.setPartitionState(segmentPartition1Seq0.getSegmentName(), "pinot2", "ONLINE"); idealState.setPartitionState(segmentPartition1Seq1.getSegmentName(), "pinot1", "CONSUMING"); @@ -79,7 +74,7 @@ public class RealtimeConsumerMonitorTest { idealState.setReplicas("3"); idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); - ExternalView externalView = new ExternalView(tableName); + ExternalView externalView = new ExternalView(realtimeTableName); externalView.setState(segmentPartition1Seq0.getSegmentName(), "pinot1", "ONLINE"); externalView.setState(segmentPartition1Seq0.getSegmentName(), "pinot2", "ONLINE"); externalView.setState(segmentPartition1Seq1.getSegmentName(), "pinot1", "CONSUMING"); @@ -91,13 +86,11 @@ public class RealtimeConsumerMonitorTest { { helixResourceManager = mock(PinotHelixResourceManager.class); ZkHelixPropertyStore<ZNRecord> helixPropertyStore = mock(ZkHelixPropertyStore.class); - when(helixResourceManager.getTableConfig(tableName)).thenReturn(tableConfig); + when(helixResourceManager.getTableConfig(realtimeTableName)).thenReturn(tableConfig); when(helixResourceManager.getPropertyStore()).thenReturn(helixPropertyStore); - when(helixResourceManager.getDatabaseNames()) - .thenReturn(Collections.singletonList(DEFAULT_DATABASE)); - when(helixResourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(allTableNames); - when(helixResourceManager.getTableIdealState(tableName)).thenReturn(idealState); - when(helixResourceManager.getTableExternalView(tableName)).thenReturn(externalView); + when(helixResourceManager.getAllTables()).thenReturn(List.of(realtimeTableName)); + when(helixResourceManager.getTableIdealState(realtimeTableName)).thenReturn(idealState); + when(helixResourceManager.getTableExternalView(realtimeTableName)).thenReturn(externalView); ZNRecord znRecord = new ZNRecord("0"); znRecord.setSimpleField(CommonConstants.Segment.Realtime.END_OFFSET, "10000"); when(helixPropertyStore.get(anyString(), any(), anyInt())).thenReturn(znRecord); @@ -121,61 +114,53 @@ public class RealtimeConsumerMonitorTest { // So, the consumer monitor should show: 1. partition-1 has 0 lag; partition-2 has some non-zero lag. // Segment 1 in replicas: TreeMap<String, List<ConsumingSegmentInfoReader.ConsumingSegmentInfo>> response = new TreeMap<>(); - List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> part1ServerConsumingSegmentInfo = new ArrayList<>(2); - part1ServerConsumingSegmentInfo.add( - getConsumingSegmentInfoForServer("pinot1", "1", "100", "100", "0")); - part1ServerConsumingSegmentInfo.add( - getConsumingSegmentInfoForServer("pinot2", "1", "100", "100", "0")); - + List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> part1ServerConsumingSegmentInfo = + List.of(getConsumingSegmentInfoForServer("pinot1", "1", "100", "100", "0"), + getConsumingSegmentInfoForServer("pinot2", "1", "100", "100", "0")); response.put(segmentPartition1Seq1.getSegmentName(), part1ServerConsumingSegmentInfo); // Segment 2 in replicas - List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> part2ServerConsumingSegmentInfo = new ArrayList<>(2); - part2ServerConsumingSegmentInfo.add( - getConsumingSegmentInfoForServer("pinot1", "2", "120", "120", "0")); - part2ServerConsumingSegmentInfo.add( - getConsumingSegmentInfoForServer("pinot2", "2", "80", "120", "60000")); - + List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> part2ServerConsumingSegmentInfo = + List.of(getConsumingSegmentInfoForServer("pinot1", "2", "120", "120", "0"), + getConsumingSegmentInfoForServer("pinot2", "2", "80", "120", "60000")); response.put(segmentPartition2Seq0.getSegmentName(), part2ServerConsumingSegmentInfo); ConsumingSegmentInfoReader consumingSegmentReader = mock(ConsumingSegmentInfoReader.class); - when(consumingSegmentReader.getConsumingSegmentsInfo(tableName, 10000)) - .thenReturn(new ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap(response, 0, 0)); + when(consumingSegmentReader.getConsumingSegmentsInfo(realtimeTableName, 10000)).thenReturn( + new ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap(response, 0, 0)); RealtimeConsumerMonitor realtimeConsumerMonitor = - new RealtimeConsumerMonitor(config, helixResourceManager, leadControllerManager, - controllerMetrics, consumingSegmentReader); + new RealtimeConsumerMonitor(config, helixResourceManager, leadControllerManager, controllerMetrics, + consumingSegmentReader); realtimeConsumerMonitor.start(); realtimeConsumerMonitor.run(); - Assert.assertEquals(MetricValueUtils.getPartitionGaugeValue(controllerMetrics, tableName, 1, + + assertEquals(MetricValueUtils.getPartitionGaugeValue(controllerMetrics, realtimeTableName, 1, ControllerGauge.MAX_RECORDS_LAG), 0); - Assert.assertEquals(MetricValueUtils.getPartitionGaugeValue(controllerMetrics, tableName, 2, + assertEquals(MetricValueUtils.getPartitionGaugeValue(controllerMetrics, realtimeTableName, 2, ControllerGauge.MAX_RECORDS_LAG), 40); - Assert.assertEquals(MetricValueUtils.getPartitionGaugeValue(controllerMetrics, tableName, 1, - ControllerGauge.MAX_RECORD_AVAILABILITY_LAG_MS), 0); - Assert.assertEquals(MetricValueUtils.getPartitionGaugeValue(controllerMetrics, tableName, 2, + assertEquals(MetricValueUtils.getPartitionGaugeValue(controllerMetrics, realtimeTableName, 1, + ControllerGauge.MAX_RECORD_AVAILABILITY_LAG_MS), 0); + assertEquals(MetricValueUtils.getPartitionGaugeValue(controllerMetrics, realtimeTableName, 2, ControllerGauge.MAX_RECORD_AVAILABILITY_LAG_MS), 60000); } ConsumingSegmentInfoReader.ConsumingSegmentInfo getConsumingSegmentInfoForServer(String serverName, String partitionId, String currentOffset, String upstreamLatestOffset, String availabilityLagMs) { - Map<String, String> currentOffsetMap = Collections.singletonMap(partitionId, currentOffset); - Map<String, String> latestUpstreamOffsetMap = Collections.singletonMap(partitionId, upstreamLatestOffset); - Map<String, String> recordsLagMap = Collections.singletonMap(partitionId, String.valueOf( - Long.parseLong(upstreamLatestOffset) - Long.parseLong(currentOffset))); - Map<String, String> availabilityLagMsMap = Collections.singletonMap(partitionId, availabilityLagMs); + Map<String, String> currentOffsetMap = Map.of(partitionId, currentOffset); + Map<String, String> latestUpstreamOffsetMap = Map.of(partitionId, upstreamLatestOffset); + Map<String, String> recordsLagMap = + Map.of(partitionId, String.valueOf(Long.parseLong(upstreamLatestOffset) - Long.parseLong(currentOffset))); + Map<String, String> availabilityLagMsMap = Map.of(partitionId, availabilityLagMs); ConsumingSegmentInfoReader.PartitionOffsetInfo partitionOffsetInfo = new ConsumingSegmentInfoReader.PartitionOffsetInfo(currentOffsetMap, latestUpstreamOffsetMap, recordsLagMap, availabilityLagMsMap); - return new ConsumingSegmentInfoReader.ConsumingSegmentInfo(serverName, "CONSUMING", -1, - currentOffsetMap, partitionOffsetInfo); + return new ConsumingSegmentInfoReader.ConsumingSegmentInfo(serverName, "CONSUMING", -1, currentOffsetMap, + partitionOffsetInfo); } Map<String, String> getStreamConfigMap() { - return ImmutableMap.of( - "streamType", "kafka", - "stream.kafka.consumer.type", "simple", - "stream.kafka.topic.name", "test", + return Map.of("streamType", "kafka", "stream.kafka.consumer.type", "simple", "stream.kafka.topic.name", "test", "stream.kafka.decoder.class.name", "org.apache.pinot.plugin.stream.kafka.KafkaAvroMessageDecoder", "stream.kafka.consumer.factory.class.name", "org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory"); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java index a1dd8f2697..3161c9da20 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java @@ -18,10 +18,6 @@ */ package org.apache.pinot.controller.helix; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -52,19 +48,21 @@ import org.apache.pinot.spi.metrics.PinotMetricsRegistry; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; -import org.testng.Assert; import org.testng.annotations.Test; -import static org.apache.pinot.spi.utils.CommonConstants.DEFAULT_DATABASE; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; public class SegmentStatusCheckerTest { + private final ExecutorService _executorService = Executors.newFixedThreadPool(1); + private SegmentStatusChecker _segmentStatusChecker; private PinotHelixResourceManager _helixResourceManager; private ZkHelixPropertyStore<ZNRecord> _helixPropertyStore; @@ -73,18 +71,15 @@ public class SegmentStatusCheckerTest { private ControllerMetrics _controllerMetrics; private ControllerConf _config; private TableSizeReader _tableSizeReader; - private ExecutorService _executorService = Executors.newFixedThreadPool(1); @Test public void offlineBasicTest() throws Exception { - final String tableName = "myTable_OFFLINE"; - List<String> allTableNames = new ArrayList<String>(); - allTableNames.add(tableName); + String offlineTableName = "myTable_OFFLINE"; TableConfig tableConfig = - new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName).setNumReplicas(2).build(); + new TableConfigBuilder(TableType.OFFLINE).setTableName(offlineTableName).setNumReplicas(2).build(); - IdealState idealState = new IdealState(tableName); + IdealState idealState = new IdealState(offlineTableName); idealState.setPartitionState("myTable_0", "pinot1", "ONLINE"); idealState.setPartitionState("myTable_0", "pinot2", "ONLINE"); idealState.setPartitionState("myTable_0", "pinot3", "ONLINE"); @@ -101,7 +96,7 @@ public class SegmentStatusCheckerTest { idealState.setReplicas("2"); idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); - ExternalView externalView = new ExternalView(tableName); + ExternalView externalView = new ExternalView(offlineTableName); externalView.setState("myTable_0", "pinot1", "ONLINE"); externalView.setState("myTable_0", "pinot2", "ONLINE"); externalView.setState("myTable_1", "pinot1", "ERROR"); @@ -114,27 +109,23 @@ public class SegmentStatusCheckerTest { { _helixResourceManager = mock(PinotHelixResourceManager.class); - when(_helixResourceManager.getDatabaseNames()) - .thenReturn(Collections.singletonList(DEFAULT_DATABASE)); - when(_helixResourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(allTableNames); - when(_helixResourceManager.getTableConfig(tableName)).thenReturn(tableConfig); - when(_helixResourceManager.getTableIdealState(tableName)).thenReturn(idealState); - when(_helixResourceManager.getTableExternalView(tableName)).thenReturn(externalView); + when(_helixResourceManager.getAllTables()).thenReturn(List.of(offlineTableName)); + when(_helixResourceManager.getTableConfig(offlineTableName)).thenReturn(tableConfig); + when(_helixResourceManager.getTableIdealState(offlineTableName)).thenReturn(idealState); + when(_helixResourceManager.getTableExternalView(offlineTableName)).thenReturn(externalView); } { _helixPropertyStore = mock(ZkHelixPropertyStore.class); when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore); // Based on the lineage entries: {myTable_1 -> myTable_3, COMPLETED}, {myTable_3 -> myTable_4, IN_PROGRESS}, // myTable_1 and myTable_4 will be skipped for the metrics. - SegmentLineage segmentLineage = new SegmentLineage(tableName); + SegmentLineage segmentLineage = new SegmentLineage(offlineTableName); segmentLineage.addLineageEntry(SegmentLineageUtils.generateLineageEntryId(), - new LineageEntry(Collections.singletonList("myTable_1"), Collections.singletonList("myTable_3"), - LineageEntryState.COMPLETED, 11111L)); + new LineageEntry(List.of("myTable_1"), List.of("myTable_3"), LineageEntryState.COMPLETED, 11111L)); segmentLineage.addLineageEntry(SegmentLineageUtils.generateLineageEntryId(), - new LineageEntry(Collections.singletonList("myTable_3"), Collections.singletonList("myTable_4"), - LineageEntryState.IN_PROGRESS, 11111L)); - when(_helixPropertyStore.get(eq("/SEGMENT_LINEAGE/" + tableName), any(), eq(AccessOption.PERSISTENT))) - .thenReturn(segmentLineage.toZNRecord()); + new LineageEntry(List.of("myTable_3"), List.of("myTable_4"), LineageEntryState.IN_PROGRESS, 11111L)); + when(_helixPropertyStore.get(eq("/SEGMENT_LINEAGE/" + offlineTableName), any(), + eq(AccessOption.PERSISTENT))).thenReturn(segmentLineage.toZNRecord()); } { _config = mock(ControllerConf.class); @@ -158,40 +149,41 @@ public class SegmentStatusCheckerTest { _segmentStatusChecker.setTableSizeReader(_tableSizeReader); _segmentStatusChecker.start(); _segmentStatusChecker.run(); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, + + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, offlineTableName, ControllerGauge.REPLICATION_FROM_CONFIG), 2); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.SEGMENT_COUNT), 3); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.SEGMENT_COUNT_INCLUDING_REPLACED), 5); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.SEGMENTS_IN_ERROR_STATE), 1); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + assertEquals( + MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), ControllerGauge.SEGMENT_COUNT), + 3); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.SEGMENT_COUNT_INCLUDING_REPLACED), 5); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.SEGMENTS_IN_ERROR_STATE), 1); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS), 2); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.NUMBER_OF_REPLICAS), 2); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.PERCENT_OF_REPLICAS), 66); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.TABLE_COMPRESSED_SIZE), 0); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.NUMBER_OF_REPLICAS), 2); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.PERCENT_OF_REPLICAS), 66); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.TABLE_COMPRESSED_SIZE), 0); } @Test public void realtimeBasicTest() throws Exception { - final String tableName = "myTable_REALTIME"; - final String rawTableName = TableNameBuilder.extractRawTableName(tableName); - List<String> allTableNames = new ArrayList<String>(); - allTableNames.add(tableName); + String rawTableName = "myTable"; + String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(rawTableName); TableConfig tableConfig = - new TableConfigBuilder(TableType.REALTIME).setTableName(tableName).setTimeColumnName("timeColumn") + new TableConfigBuilder(TableType.REALTIME).setTableName(rawTableName).setTimeColumnName("timeColumn") .setNumReplicas(3).setStreamConfigs(getStreamConfigMap()).build(); - final LLCSegmentName seg1 = new LLCSegmentName(rawTableName, 1, 0, System.currentTimeMillis()); - final LLCSegmentName seg2 = new LLCSegmentName(rawTableName, 1, 1, System.currentTimeMillis()); - final LLCSegmentName seg3 = new LLCSegmentName(rawTableName, 2, 1, System.currentTimeMillis()); - IdealState idealState = new IdealState(tableName); + + LLCSegmentName seg1 = new LLCSegmentName(rawTableName, 1, 0, System.currentTimeMillis()); + LLCSegmentName seg2 = new LLCSegmentName(rawTableName, 1, 1, System.currentTimeMillis()); + LLCSegmentName seg3 = new LLCSegmentName(rawTableName, 2, 1, System.currentTimeMillis()); + IdealState idealState = new IdealState(realtimeTableName); idealState.setPartitionState(seg1.getSegmentName(), "pinot1", "ONLINE"); idealState.setPartitionState(seg1.getSegmentName(), "pinot2", "ONLINE"); idealState.setPartitionState(seg1.getSegmentName(), "pinot3", "ONLINE"); @@ -204,7 +196,7 @@ public class SegmentStatusCheckerTest { idealState.setReplicas("3"); idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); - ExternalView externalView = new ExternalView(tableName); + ExternalView externalView = new ExternalView(realtimeTableName); externalView.setState(seg1.getSegmentName(), "pinot1", "ONLINE"); externalView.setState(seg1.getSegmentName(), "pinot2", "ONLINE"); externalView.setState(seg1.getSegmentName(), "pinot3", "ONLINE"); @@ -218,13 +210,11 @@ public class SegmentStatusCheckerTest { { _helixResourceManager = mock(PinotHelixResourceManager.class); _helixPropertyStore = mock(ZkHelixPropertyStore.class); - when(_helixResourceManager.getTableConfig(tableName)).thenReturn(tableConfig); + when(_helixResourceManager.getTableConfig(realtimeTableName)).thenReturn(tableConfig); when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore); - when(_helixResourceManager.getDatabaseNames()) - .thenReturn(Collections.singletonList(DEFAULT_DATABASE)); - when(_helixResourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(allTableNames); - when(_helixResourceManager.getTableIdealState(tableName)).thenReturn(idealState); - when(_helixResourceManager.getTableExternalView(tableName)).thenReturn(externalView); + when(_helixResourceManager.getAllTables()).thenReturn(List.of(realtimeTableName)); + when(_helixResourceManager.getTableIdealState(realtimeTableName)).thenReturn(idealState); + when(_helixResourceManager.getTableExternalView(realtimeTableName)).thenReturn(externalView); ZNRecord znRecord = new ZNRecord("0"); znRecord.setSimpleField(CommonConstants.Segment.Realtime.END_OFFSET, "10000"); when(_helixPropertyStore.get(anyString(), any(), anyInt())).thenReturn(znRecord); @@ -251,27 +241,25 @@ public class SegmentStatusCheckerTest { _segmentStatusChecker.setTableSizeReader(_tableSizeReader); _segmentStatusChecker.start(); _segmentStatusChecker.run(); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, - ControllerGauge.REPLICATION_FROM_CONFIG), 3); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, realtimeTableName, + ControllerGauge.REPLICATION_FROM_CONFIG), 3); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS), 0); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.NUMBER_OF_REPLICAS), 3); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.PERCENT_OF_REPLICAS), 100); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.MISSING_CONSUMING_SEGMENT_TOTAL_COUNT), 2); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.NUMBER_OF_REPLICAS), 3); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.PERCENT_OF_REPLICAS), 100); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.MISSING_CONSUMING_SEGMENT_TOTAL_COUNT), 2); } Map<String, String> getStreamConfigMap() { - return ImmutableMap.of( - "streamType", "kafka", - "stream.kafka.consumer.type", "simple", - "stream.kafka.topic.name", "test", + return Map.of("streamType", "kafka", "stream.kafka.consumer.type", "simple", "stream.kafka.topic.name", "test", "stream.kafka.decoder.class.name", "org.apache.pinot.plugin.stream.kafka.KafkaAvroMessageDecoder", "stream.kafka.consumer.factory.class.name", "org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory"); @@ -281,8 +269,7 @@ public class SegmentStatusCheckerTest { public void missingEVPartitionTest() throws Exception { String offlineTableName = "myTable_OFFLINE"; - List<String> allTableNames = new ArrayList<String>(); - allTableNames.add(offlineTableName); + IdealState idealState = new IdealState(offlineTableName); idealState.setPartitionState("myTable_0", "pinot1", "ONLINE"); idealState.setPartitionState("myTable_0", "pinot2", "ONLINE"); @@ -317,21 +304,19 @@ public class SegmentStatusCheckerTest { ZkHelixPropertyStore<ZNRecord> propertyStore; { propertyStore = (ZkHelixPropertyStore<ZNRecord>) mock(ZkHelixPropertyStore.class); - when(propertyStore.get("/SEGMENTS/myTable_OFFLINE/myTable_3", null, AccessOption.PERSISTENT)) - .thenReturn(znrecord); + when(propertyStore.get("/SEGMENTS/myTable_OFFLINE/myTable_3", null, AccessOption.PERSISTENT)).thenReturn( + znrecord); } { _helixResourceManager = mock(PinotHelixResourceManager.class); _helixPropertyStore = mock(ZkHelixPropertyStore.class); when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore); - when(_helixResourceManager.getDatabaseNames()) - .thenReturn(Collections.singletonList(DEFAULT_DATABASE)); - when(_helixResourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(allTableNames); + when(_helixResourceManager.getAllTables()).thenReturn(List.of(offlineTableName)); when(_helixResourceManager.getTableIdealState(offlineTableName)).thenReturn(idealState); when(_helixResourceManager.getTableExternalView(offlineTableName)).thenReturn(externalView); - when(_helixResourceManager.getSegmentZKMetadata(offlineTableName, "myTable_3")) - .thenReturn(new SegmentZKMetadata(znrecord)); + when(_helixResourceManager.getSegmentZKMetadata(offlineTableName, "myTable_3")).thenReturn( + new SegmentZKMetadata(znrecord)); } { _config = mock(ControllerConf.class); @@ -355,25 +340,25 @@ public class SegmentStatusCheckerTest { _segmentStatusChecker.setTableSizeReader(_tableSizeReader); _segmentStatusChecker.start(); _segmentStatusChecker.run(); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.SEGMENTS_IN_ERROR_STATE), 1); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.SEGMENTS_IN_ERROR_STATE), 1); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS), 2); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.NUMBER_OF_REPLICAS), 0); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 75); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.TABLE_COMPRESSED_SIZE), 1111); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.NUMBER_OF_REPLICAS), 0); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 75); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.TABLE_COMPRESSED_SIZE), 1111); } @Test public void missingEVTest() throws Exception { - final String tableName = "myTable_REALTIME"; - List<String> allTableNames = new ArrayList<String>(); - allTableNames.add(tableName); - IdealState idealState = new IdealState(tableName); + String realtimeTableName = "myTable_REALTIME"; + + IdealState idealState = new IdealState(realtimeTableName); idealState.setPartitionState("myTable_0", "pinot1", "ONLINE"); idealState.setPartitionState("myTable_0", "pinot2", "ONLINE"); idealState.setPartitionState("myTable_0", "pinot3", "ONLINE"); @@ -388,11 +373,9 @@ public class SegmentStatusCheckerTest { _helixResourceManager = mock(PinotHelixResourceManager.class); _helixPropertyStore = mock(ZkHelixPropertyStore.class); when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore); - when(_helixResourceManager.getDatabaseNames()) - .thenReturn(Collections.singletonList(DEFAULT_DATABASE)); - when(_helixResourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(allTableNames); - when(_helixResourceManager.getTableIdealState(tableName)).thenReturn(idealState); - when(_helixResourceManager.getTableExternalView(tableName)).thenReturn(null); + when(_helixResourceManager.getAllTables()).thenReturn(List.of(realtimeTableName)); + when(_helixResourceManager.getTableIdealState(realtimeTableName)).thenReturn(idealState); + when(_helixResourceManager.getTableExternalView(realtimeTableName)).thenReturn(null); } { _config = mock(ControllerConf.class); @@ -416,30 +399,28 @@ public class SegmentStatusCheckerTest { _segmentStatusChecker.setTableSizeReader(_tableSizeReader); _segmentStatusChecker.start(); _segmentStatusChecker.run(); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, - ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, + + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, realtimeTableName, + ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, realtimeTableName, ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS), 0); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, - ControllerGauge.NUMBER_OF_REPLICAS), 0); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, - ControllerGauge.TABLE_COMPRESSED_SIZE), 0); + assertEquals( + MetricValueUtils.getTableGaugeValue(_controllerMetrics, realtimeTableName, ControllerGauge.NUMBER_OF_REPLICAS), + 0); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, realtimeTableName, + ControllerGauge.TABLE_COMPRESSED_SIZE), 0); } @Test public void missingIdealTest() throws Exception { - final String tableName = "myTable_REALTIME"; - List<String> allTableNames = new ArrayList<>(); - allTableNames.add(tableName); + String realtimeTableName = "myTable_REALTIME"; { _helixResourceManager = mock(PinotHelixResourceManager.class); - when(_helixResourceManager.getDatabaseNames()) - .thenReturn(Collections.singletonList(DEFAULT_DATABASE)); - when(_helixResourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(allTableNames); - when(_helixResourceManager.getTableIdealState(tableName)).thenReturn(null); - when(_helixResourceManager.getTableExternalView(tableName)).thenReturn(null); + when(_helixResourceManager.getAllTables()).thenReturn(List.of(realtimeTableName)); + when(_helixResourceManager.getTableIdealState(realtimeTableName)).thenReturn(null); + when(_helixResourceManager.getTableExternalView(realtimeTableName)).thenReturn(null); } { _config = mock(ControllerConf.class); @@ -463,24 +444,24 @@ public class SegmentStatusCheckerTest { _segmentStatusChecker.setTableSizeReader(_tableSizeReader); _segmentStatusChecker.start(); _segmentStatusChecker.run(); - Assert.assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, tableName, - ControllerGauge.SEGMENTS_IN_ERROR_STATE)); - Assert.assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, tableName, + + assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, realtimeTableName, + ControllerGauge.SEGMENTS_IN_ERROR_STATE)); + assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, realtimeTableName, ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS)); - Assert.assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, tableName, - ControllerGauge.NUMBER_OF_REPLICAS)); - Assert.assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, tableName, - ControllerGauge.PERCENT_OF_REPLICAS)); - Assert.assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, tableName, - ControllerGauge.TABLE_COMPRESSED_SIZE)); + assertFalse( + MetricValueUtils.tableGaugeExists(_controllerMetrics, realtimeTableName, ControllerGauge.NUMBER_OF_REPLICAS)); + assertFalse( + MetricValueUtils.tableGaugeExists(_controllerMetrics, realtimeTableName, ControllerGauge.PERCENT_OF_REPLICAS)); + assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, realtimeTableName, + ControllerGauge.TABLE_COMPRESSED_SIZE)); } @Test public void missingEVPartitionPushTest() throws Exception { String offlineTableName = "myTable_OFFLINE"; - List<String> allTableNames = new ArrayList<String>(); - allTableNames.add(offlineTableName); + IdealState idealState = new IdealState(offlineTableName); idealState.setPartitionState("myTable_0", "pinot1", "ONLINE"); idealState.setPartitionState("myTable_1", "pinot1", "ONLINE"); @@ -527,15 +508,13 @@ public class SegmentStatusCheckerTest { _helixResourceManager = mock(PinotHelixResourceManager.class); _helixPropertyStore = mock(ZkHelixPropertyStore.class); when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore); - when(_helixResourceManager.getDatabaseNames()) - .thenReturn(Collections.singletonList(DEFAULT_DATABASE)); - when(_helixResourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(allTableNames); + when(_helixResourceManager.getAllTables()).thenReturn(List.of(offlineTableName)); when(_helixResourceManager.getTableIdealState(offlineTableName)).thenReturn(idealState); when(_helixResourceManager.getTableExternalView(offlineTableName)).thenReturn(externalView); - when(_helixResourceManager.getSegmentZKMetadata(offlineTableName, "myTable_0")) - .thenReturn(new SegmentZKMetadata(znrecord)); - when(_helixResourceManager.getSegmentZKMetadata(offlineTableName, "myTable_2")) - .thenReturn(new SegmentZKMetadata(znrecord2)); + when(_helixResourceManager.getSegmentZKMetadata(offlineTableName, "myTable_0")).thenReturn( + new SegmentZKMetadata(znrecord)); + when(_helixResourceManager.getSegmentZKMetadata(offlineTableName, "myTable_2")).thenReturn( + new SegmentZKMetadata(znrecord2)); } { _config = mock(ControllerConf.class); @@ -559,27 +538,27 @@ public class SegmentStatusCheckerTest { _segmentStatusChecker.setTableSizeReader(_tableSizeReader); _segmentStatusChecker.start(); _segmentStatusChecker.run(); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS), 0); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.NUMBER_OF_REPLICAS), 2); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.PERCENT_OF_REPLICAS), 100); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.TABLE_COMPRESSED_SIZE), 0); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.NUMBER_OF_REPLICAS), 2); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.PERCENT_OF_REPLICAS), 100); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.TABLE_COMPRESSED_SIZE), 0); } @Test public void noReplicas() throws Exception { - final String tableName = "myTable_REALTIME"; - List<String> allTableNames = new ArrayList<String>(); - allTableNames.add(tableName); - IdealState idealState = new IdealState(tableName); + String realtimeTableName = "myTable_REALTIME"; + + IdealState idealState = new IdealState(realtimeTableName); idealState.setPartitionState("myTable_0", "pinot1", "OFFLINE"); idealState.setPartitionState("myTable_0", "pinot2", "OFFLINE"); idealState.setPartitionState("myTable_0", "pinot3", "OFFLINE"); @@ -590,11 +569,9 @@ public class SegmentStatusCheckerTest { _helixResourceManager = mock(PinotHelixResourceManager.class); _helixPropertyStore = mock(ZkHelixPropertyStore.class); when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore); - when(_helixResourceManager.getDatabaseNames()) - .thenReturn(Collections.singletonList(DEFAULT_DATABASE)); - when(_helixResourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(allTableNames); - when(_helixResourceManager.getTableIdealState(tableName)).thenReturn(idealState); - when(_helixResourceManager.getTableExternalView(tableName)).thenReturn(null); + when(_helixResourceManager.getAllTables()).thenReturn(List.of(realtimeTableName)); + when(_helixResourceManager.getTableIdealState(realtimeTableName)).thenReturn(idealState); + when(_helixResourceManager.getTableExternalView(realtimeTableName)).thenReturn(null); } { _config = mock(ControllerConf.class); @@ -618,26 +595,26 @@ public class SegmentStatusCheckerTest { _segmentStatusChecker.setTableSizeReader(_tableSizeReader); _segmentStatusChecker.start(); _segmentStatusChecker.run(); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, + + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, realtimeTableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, realtimeTableName, ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS), 0); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, - ControllerGauge.NUMBER_OF_REPLICAS), 1); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, - ControllerGauge.PERCENT_OF_REPLICAS), 100); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, - ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100); + assertEquals( + MetricValueUtils.getTableGaugeValue(_controllerMetrics, realtimeTableName, ControllerGauge.NUMBER_OF_REPLICAS), + 1); + assertEquals( + MetricValueUtils.getTableGaugeValue(_controllerMetrics, realtimeTableName, ControllerGauge.PERCENT_OF_REPLICAS), + 100); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, realtimeTableName, + ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100); } @Test - public void disabledTableTest() - throws Exception { + public void disabledTableTest() { + String offlineTableName = "myTable_OFFLINE"; - final String tableName = "myTable_OFFLINE"; - List<String> allTableNames = new ArrayList<String>(); - allTableNames.add(tableName); - IdealState idealState = new IdealState(tableName); + IdealState idealState = new IdealState(offlineTableName); // disable table in idealstate idealState.enable(false); idealState.setPartitionState("myTable_OFFLINE", "pinot1", "OFFLINE"); @@ -648,11 +625,9 @@ public class SegmentStatusCheckerTest { { _helixResourceManager = mock(PinotHelixResourceManager.class); - when(_helixResourceManager.getDatabaseNames()) - .thenReturn(Collections.singletonList(DEFAULT_DATABASE)); - when(_helixResourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(allTableNames); - when(_helixResourceManager.getTableIdealState(tableName)).thenReturn(idealState); - when(_helixResourceManager.getTableExternalView(tableName)).thenReturn(null); + when(_helixResourceManager.getAllTables()).thenReturn(List.of(offlineTableName)); + when(_helixResourceManager.getTableIdealState(offlineTableName)).thenReturn(idealState); + when(_helixResourceManager.getTableExternalView(offlineTableName)).thenReturn(null); } { _config = mock(ControllerConf.class); @@ -669,23 +644,21 @@ public class SegmentStatusCheckerTest { _segmentStatusChecker = new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, _executorService); + // verify state before test - Assert.assertEquals( - MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, ControllerGauge.DISABLED_TABLE_COUNT), 0); + assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, ControllerGauge.DISABLED_TABLE_COUNT), 0); + // update metrics _segmentStatusChecker.start(); _segmentStatusChecker.run(); - Assert.assertEquals( - MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, ControllerGauge.DISABLED_TABLE_COUNT), 1); + assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, ControllerGauge.DISABLED_TABLE_COUNT), 1); } @Test - public void disabledEmptyTableTest() - throws Exception { + public void disabledEmptyTableTest() { + String offlineTableName = "myTable_OFFLINE"; - final String tableName = "myTable_OFFLINE"; - List<String> allTableNames = Lists.newArrayList(tableName); - IdealState idealState = new IdealState(tableName); + IdealState idealState = new IdealState(offlineTableName); // disable table in idealstate idealState.enable(false); idealState.setReplicas("1"); @@ -693,11 +666,9 @@ public class SegmentStatusCheckerTest { { _helixResourceManager = mock(PinotHelixResourceManager.class); - when(_helixResourceManager.getDatabaseNames()) - .thenReturn(Collections.singletonList(DEFAULT_DATABASE)); - when(_helixResourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(allTableNames); - when(_helixResourceManager.getTableIdealState(tableName)).thenReturn(idealState); - when(_helixResourceManager.getTableExternalView(tableName)).thenReturn(null); + when(_helixResourceManager.getAllTables()).thenReturn(List.of(offlineTableName)); + when(_helixResourceManager.getTableIdealState(offlineTableName)).thenReturn(idealState); + when(_helixResourceManager.getTableExternalView(offlineTableName)).thenReturn(null); } { _config = mock(ControllerConf.class); @@ -714,14 +685,14 @@ public class SegmentStatusCheckerTest { _segmentStatusChecker = new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, _executorService); + // verify state before test - Assert.assertFalse( - MetricValueUtils.globalGaugeExists(_controllerMetrics, ControllerGauge.DISABLED_TABLE_COUNT)); + assertFalse(MetricValueUtils.globalGaugeExists(_controllerMetrics, ControllerGauge.DISABLED_TABLE_COUNT)); + // update metrics _segmentStatusChecker.start(); _segmentStatusChecker.run(); - Assert.assertEquals( - MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, ControllerGauge.DISABLED_TABLE_COUNT), 1); + assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, ControllerGauge.DISABLED_TABLE_COUNT), 1); } @Test @@ -734,22 +705,20 @@ public class SegmentStatusCheckerTest { @Test public void lessThanOnePercentSegmentsUnavailableTest() - throws Exception { - String tableName = "myTable_OFFLINE"; - int numSegments = 200; - List<String> allTableNames = new ArrayList<String>(); - allTableNames.add(tableName); + throws Exception { + String offlineTableName = "myTable_OFFLINE"; TableConfig tableConfig = - new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName).setNumReplicas(1).build(); + new TableConfigBuilder(TableType.OFFLINE).setTableName(offlineTableName).setNumReplicas(1).build(); - IdealState idealState = new IdealState(tableName); + IdealState idealState = new IdealState(offlineTableName); + int numSegments = 200; for (int i = 0; i < numSegments; i++) { idealState.setPartitionState("myTable_" + i, "pinot1", "ONLINE"); } idealState.setReplicas("1"); idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); - ExternalView externalView = new ExternalView(tableName); + ExternalView externalView = new ExternalView(offlineTableName); externalView.setState("myTable_0", "pinot1", "OFFLINE"); for (int i = 1; i < numSegments; i++) { externalView.setState("myTable_" + i, "pinot1", "ONLINE"); @@ -757,19 +726,17 @@ public class SegmentStatusCheckerTest { { _helixResourceManager = mock(PinotHelixResourceManager.class); - when(_helixResourceManager.getDatabaseNames()) - .thenReturn(Collections.singletonList(DEFAULT_DATABASE)); - when(_helixResourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(allTableNames); - when(_helixResourceManager.getTableConfig(tableName)).thenReturn(tableConfig); - when(_helixResourceManager.getTableIdealState(tableName)).thenReturn(idealState); - when(_helixResourceManager.getTableExternalView(tableName)).thenReturn(externalView); + when(_helixResourceManager.getAllTables()).thenReturn(List.of(offlineTableName)); + when(_helixResourceManager.getTableConfig(offlineTableName)).thenReturn(tableConfig); + when(_helixResourceManager.getTableIdealState(offlineTableName)).thenReturn(idealState); + when(_helixResourceManager.getTableExternalView(offlineTableName)).thenReturn(externalView); } { _helixPropertyStore = mock(ZkHelixPropertyStore.class); when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore); - SegmentLineage segmentLineage = new SegmentLineage(tableName); - when(_helixPropertyStore.get(eq("/SEGMENT_LINEAGE/" + tableName), any(), eq(AccessOption.PERSISTENT))) - .thenReturn(segmentLineage.toZNRecord()); + SegmentLineage segmentLineage = new SegmentLineage(offlineTableName); + when(_helixPropertyStore.get(eq("/SEGMENT_LINEAGE/" + offlineTableName), any(), + eq(AccessOption.PERSISTENT))).thenReturn(segmentLineage.toZNRecord()); } { _config = mock(ControllerConf.class); @@ -788,37 +755,35 @@ public class SegmentStatusCheckerTest { _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(); _controllerMetrics = new ControllerMetrics(_metricsRegistry); _segmentStatusChecker = - new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, - _executorService); + new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, + _executorService); _segmentStatusChecker.setTableSizeReader(_tableSizeReader); _segmentStatusChecker.start(); _segmentStatusChecker.run(); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 99); + + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 99); } public void noSegmentsInternal(final int nReplicas) throws Exception { - final String tableName = "myTable_REALTIME"; + String realtimeTableName = "myTable_REALTIME"; + String nReplicasStr = Integer.toString(nReplicas); int nReplicasExpectedValue = nReplicas; if (nReplicas < 0) { nReplicasStr = "abc"; nReplicasExpectedValue = 1; } - List<String> allTableNames = new ArrayList<String>(); - allTableNames.add(tableName); - IdealState idealState = new IdealState(tableName); + IdealState idealState = new IdealState(realtimeTableName); idealState.setReplicas(nReplicasStr); idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); { _helixResourceManager = mock(PinotHelixResourceManager.class); - when(_helixResourceManager.getDatabaseNames()) - .thenReturn(Collections.singletonList(DEFAULT_DATABASE)); - when(_helixResourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(allTableNames); - when(_helixResourceManager.getTableIdealState(tableName)).thenReturn(idealState); - when(_helixResourceManager.getTableExternalView(tableName)).thenReturn(null); + when(_helixResourceManager.getAllTables()).thenReturn(List.of(realtimeTableName)); + when(_helixResourceManager.getTableIdealState(realtimeTableName)).thenReturn(idealState); + when(_helixResourceManager.getTableExternalView(realtimeTableName)).thenReturn(null); } { _config = mock(ControllerConf.class); @@ -843,15 +808,17 @@ public class SegmentStatusCheckerTest { _segmentStatusChecker.start(); _segmentStatusChecker.run(); - Assert.assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, tableName, + assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, realtimeTableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE)); - Assert.assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, tableName, + assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, realtimeTableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE)); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, - ControllerGauge.NUMBER_OF_REPLICAS), nReplicasExpectedValue); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, - ControllerGauge.PERCENT_OF_REPLICAS), 100); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, + assertEquals( + MetricValueUtils.getTableGaugeValue(_controllerMetrics, realtimeTableName, ControllerGauge.NUMBER_OF_REPLICAS), + nReplicasExpectedValue); + assertEquals( + MetricValueUtils.getTableGaugeValue(_controllerMetrics, realtimeTableName, ControllerGauge.PERCENT_OF_REPLICAS), + 100); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, realtimeTableName, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100); } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java index 15c3cf6d81..f4e0eb46b1 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java @@ -19,7 +19,6 @@ package org.apache.pinot.controller.helix.core.periodictask; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; @@ -35,7 +34,6 @@ import org.apache.pinot.spi.metrics.PinotMetricUtils; import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; -import static org.apache.pinot.spi.utils.CommonConstants.DEFAULT_DATABASE; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -89,9 +87,7 @@ public class ControllerPeriodicTaskTest { public void beforeTest() { List<String> tables = new ArrayList<>(_numTables); IntStream.range(0, _numTables).forEach(i -> tables.add("table_" + i + " _OFFLINE")); - when(_resourceManager.getDatabaseNames()) - .thenReturn(Collections.singletonList(DEFAULT_DATABASE)); - when(_resourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(tables); + when(_resourceManager.getAllTables()).thenReturn(tables); when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); } @@ -109,7 +105,6 @@ public class ControllerPeriodicTaskTest { _task.getInitialDelayInSeconds() >= ControllerConf.ControllerPeriodicTasksConf.MIN_INITIAL_DELAY_IN_SECONDS); assertTrue( _task.getInitialDelayInSeconds() < ControllerConf.ControllerPeriodicTasksConf.MAX_INITIAL_DELAY_IN_SECONDS); - assertEquals(_task.getIntervalInSeconds(), RUN_FREQUENCY_IN_SECONDS); } @@ -124,7 +119,7 @@ public class ControllerPeriodicTaskTest { assertFalse(_stopTaskCalled.get()); assertTrue(_task.isStarted()); assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, TASK_NAME, - ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), 0); + ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), 0); // Run periodic task with leadership resetState(); @@ -133,8 +128,7 @@ public class ControllerPeriodicTaskTest { assertTrue(_processTablesCalled.get()); assertEquals(_tablesProcessed.get(), _numTables); assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, TASK_NAME, - ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), - _numTables); + ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), _numTables); assertFalse(_stopTaskCalled.get()); assertTrue(_task.isStarted()); @@ -145,7 +139,7 @@ public class ControllerPeriodicTaskTest { assertFalse(_processTablesCalled.get()); assertEquals(_tablesProcessed.get(), 0); assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, TASK_NAME, - ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), 0); + ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), 0); assertTrue(_stopTaskCalled.get()); assertFalse(_task.isStarted()); @@ -156,7 +150,7 @@ public class ControllerPeriodicTaskTest { assertFalse(_processTablesCalled.get()); assertEquals(_tablesProcessed.get(), 0); assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, TASK_NAME, - ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), 0); + ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), 0); assertFalse(_stopTaskCalled.get()); assertFalse(_task.isStarted()); @@ -169,7 +163,7 @@ public class ControllerPeriodicTaskTest { assertFalse(_stopTaskCalled.get()); assertTrue(_task.isStarted()); assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, TASK_NAME, - ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), 0); + ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), 0); // Run periodic task with leadership resetState(); @@ -178,7 +172,7 @@ public class ControllerPeriodicTaskTest { assertTrue(_processTablesCalled.get()); assertEquals(_tablesProcessed.get(), _numTables); assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, TASK_NAME, - ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), _numTables); + ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), _numTables); assertFalse(_stopTaskCalled.get()); assertTrue(_task.isStarted()); } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java index ce5e31e5ef..b3e656de9e 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java @@ -19,7 +19,6 @@ package org.apache.pinot.controller.helix.core.retention; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -43,15 +42,13 @@ import org.apache.pinot.spi.stream.LongMsgOffset; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; -import org.mockito.ArgumentMatchers; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; -import org.testng.Assert; import org.testng.annotations.Test; -import static org.apache.pinot.spi.utils.CommonConstants.DEFAULT_DATABASE; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.*; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; public class RetentionManagerTest { @@ -60,8 +57,7 @@ public class RetentionManagerTest { private static final String OFFLINE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(TEST_TABLE_NAME); private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(TEST_TABLE_NAME); - private void testDifferentTimeUnits(long pastTimeStamp, TimeUnit timeUnit, long dayAfterTomorrowTimeStamp) - throws Exception { + private void testDifferentTimeUnits(long pastTimeStamp, TimeUnit timeUnit, long dayAfterTomorrowTimeStamp) { List<SegmentZKMetadata> segmentsZKMetadata = new ArrayList<>(); // Create metadata for 10 segments really old, that will be removed by the retention manager. final int numOlderSegments = 10; @@ -105,8 +101,7 @@ public class RetentionManagerTest { } @Test - public void testRetentionWithMinutes() - throws Exception { + public void testRetentionWithMinutes() { final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 1000 / 60 / 60 / 24 + 2; final long minutesSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24 * 60; final long pastMinutesSinceEpoch = 22383360L; @@ -114,8 +109,7 @@ public class RetentionManagerTest { } @Test - public void testRetentionWithSeconds() - throws Exception { + public void testRetentionWithSeconds() { final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 1000 / 60 / 60 / 24 + 2; final long secondsSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24 * 60 * 60; final long pastSecondsSinceEpoch = 1343001600L; @@ -123,8 +117,7 @@ public class RetentionManagerTest { } @Test - public void testRetentionWithMillis() - throws Exception { + public void testRetentionWithMillis() { final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 1000 / 60 / 60 / 24 + 2; final long millisSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24 * 60 * 60 * 1000; final long pastMillisSinceEpoch = 1343001600000L; @@ -132,8 +125,7 @@ public class RetentionManagerTest { } @Test - public void testRetentionWithHours() - throws Exception { + public void testRetentionWithHours() { final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 1000 / 60 / 60 / 24 + 2; final long hoursSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24; final long pastHoursSinceEpoch = 373056L; @@ -141,8 +133,7 @@ public class RetentionManagerTest { } @Test - public void testRetentionWithDays() - throws Exception { + public void testRetentionWithDays() { final long daysSinceEpochTimeStamp = System.currentTimeMillis() / 1000 / 60 / 60 / 24 + 2; final long pastDaysSinceEpoch = 15544L; testDifferentTimeUnits(pastDaysSinceEpoch, TimeUnit.DAYS, daysSinceEpochTimeStamp); @@ -161,10 +152,8 @@ public class RetentionManagerTest { private void setupPinotHelixResourceManager(TableConfig tableConfig, final List<String> removedSegments, PinotHelixResourceManager resourceManager, LeadControllerManager leadControllerManager) { - final String tableNameWithType = tableConfig.getTableName(); - when(resourceManager.getDatabaseNames()) - .thenReturn(Collections.singletonList(DEFAULT_DATABASE)); - when(resourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(Collections.singletonList(tableNameWithType)); + String tableNameWithType = tableConfig.getTableName(); + when(resourceManager.getAllTables()).thenReturn(List.of(tableNameWithType)); ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class); when(resourceManager.getPropertyStore()).thenReturn(propertyStore); @@ -172,38 +161,27 @@ public class RetentionManagerTest { SegmentDeletionManager deletionManager = mock(SegmentDeletionManager.class); // Ignore the call to SegmentDeletionManager.removeAgedDeletedSegments. we only test that the call is made once per // run of the retention manager - doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocationOnMock) - throws Throwable { - return null; - } - }).when(deletionManager).removeAgedDeletedSegments(leadControllerManager); + doAnswer(invocationOnMock -> null).when(deletionManager).removeAgedDeletedSegments(leadControllerManager); when(resourceManager.getSegmentDeletionManager()).thenReturn(deletionManager); // If and when PinotHelixResourceManager.deleteSegments() is invoked, make sure that the segments deleted // are exactly the same as the ones we expect to be deleted. - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) - throws Throwable { - Object[] args = invocationOnMock.getArguments(); - String tableNameArg = (String) args[0]; - Assert.assertEquals(tableNameArg, tableNameWithType); - List<String> segmentListArg = (List<String>) args[1]; - Assert.assertEquals(segmentListArg.size(), removedSegments.size()); - for (String segmentName : removedSegments) { - Assert.assertTrue(segmentListArg.contains(segmentName)); - } - return null; + doAnswer(invocationOnMock -> { + Object[] args = invocationOnMock.getArguments(); + String tableNameArg = (String) args[0]; + assertEquals(tableNameArg, tableNameWithType); + List<String> segmentListArg = (List<String>) args[1]; + assertEquals(segmentListArg.size(), removedSegments.size()); + for (String segmentName : removedSegments) { + assertTrue(segmentListArg.contains(segmentName)); } - }).when(resourceManager).deleteSegments(anyString(), ArgumentMatchers.anyList()); + return null; + }).when(resourceManager).deleteSegments(anyString(), anyList()); } // This test makes sure that we clean up the segments marked OFFLINE in realtime for more than 7 days @Test - public void testRealtimeLLCCleanup() - throws Exception { + public void testRealtimeLLCCleanup() { final int initialNumSegments = 8; final long now = System.currentTimeMillis(); @@ -237,8 +215,7 @@ public class RetentionManagerTest { // This test makes sure that we do not clean up last llc completed segments @Test - public void testRealtimeLastLLCCleanup() - throws Exception { + public void testRealtimeLastLLCCleanup() { final long now = System.currentTimeMillis(); final int replicaCount = 1; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org