Jackie-Jiang commented on code in PR #15776: URL: https://github.com/apache/pinot/pull/15776#discussion_r2093797708
########## pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java: ########## @@ -418,17 +420,74 @@ public synchronized void includeServerToRouting(String instanceId) { } /** - * Builds/rebuilds the routing for the physical table, for logical tables it is skipped. - * @param physicalOrLogicalTable a physical table with type or logical table name + * Builds the routing for a logical table. This method is called when a logical table is created or updated. + * @param logicalTableName the name of the logical table */ - public synchronized void buildRouting(String physicalOrLogicalTable) { - // skip route building for logical tables - if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, physicalOrLogicalTable)) { - LOGGER.info("Skipping route building for logical table: {}", physicalOrLogicalTable); + public synchronized void buildRoutingForLogicalTable(String logicalTableName) { + LogicalTableConfig logicalTableConfig = ZKMetadataProvider.getLogicalTableConfig(_propertyStore, logicalTableName); + Preconditions.checkState(logicalTableConfig != null, "Failed to find logical table config for: %s", + logicalTableConfig); + if (!logicalTableConfig.isHybridLogicalTable()) { + LOGGER.info("Skip time boundary manager setting for non hybrid logical table: {}", logicalTableName); return; } - String tableNameWithType = physicalOrLogicalTable; + LOGGER.info("Setting time boundary manager for logical table: {}", logicalTableName); + + TimeBoundaryConfig timeBoundaryConfig = logicalTableConfig.getTimeBoundaryConfig(); + Preconditions.checkArgument(timeBoundaryConfig.getBoundaryStrategy().equals("min"), + "Invalid time boundary strategy: %s", timeBoundaryConfig.getBoundaryStrategy()); + List<String> includedTables = + (List<String>) timeBoundaryConfig.getParameters().getOrDefault("includedTables", List.of()); + + for (String tableNameWithType : includedTables) { + // skip time boundary manager init for realtime table + if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) { + LOGGER.info("Skip time boundary manager init for realtime table: {}", tableNameWithType); + continue; + } + + // skip hybrid tables, time boundary for such offline table already exists + String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType( + TableNameBuilder.extractRawTableName(tableNameWithType)); + if (ZKMetadataProvider.isTableConfigExists(_propertyStore, realtimeTableName)) { Review Comment: Not sure if this is good enough. There is no guarantee we will build routing for the realtime table ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java: ########## @@ -2150,6 +2152,14 @@ public void updateLogicalTableConfig(LogicalTableConfig logicalTableConfig) updateBrokerResourceForLogicalTable(logicalTableConfig, tableName); } + @Nullable TimeBoundaryConfig oldTimeBoundaryConfig = oldLogicalTableConfig.getTimeBoundaryConfig(); Review Comment: (nit) We don't usually annotate local variable. IDE should be able to read the annotation from the method signature ########## pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java: ########## @@ -418,17 +420,74 @@ public synchronized void includeServerToRouting(String instanceId) { } /** - * Builds/rebuilds the routing for the physical table, for logical tables it is skipped. - * @param physicalOrLogicalTable a physical table with type or logical table name + * Builds the routing for a logical table. This method is called when a logical table is created or updated. + * @param logicalTableName the name of the logical table */ - public synchronized void buildRouting(String physicalOrLogicalTable) { - // skip route building for logical tables - if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, physicalOrLogicalTable)) { - LOGGER.info("Skipping route building for logical table: {}", physicalOrLogicalTable); + public synchronized void buildRoutingForLogicalTable(String logicalTableName) { + LogicalTableConfig logicalTableConfig = ZKMetadataProvider.getLogicalTableConfig(_propertyStore, logicalTableName); + Preconditions.checkState(logicalTableConfig != null, "Failed to find logical table config for: %s", + logicalTableConfig); + if (!logicalTableConfig.isHybridLogicalTable()) { + LOGGER.info("Skip time boundary manager setting for non hybrid logical table: {}", logicalTableName); return; } - String tableNameWithType = physicalOrLogicalTable; + LOGGER.info("Setting time boundary manager for logical table: {}", logicalTableName); + + TimeBoundaryConfig timeBoundaryConfig = logicalTableConfig.getTimeBoundaryConfig(); + Preconditions.checkArgument(timeBoundaryConfig.getBoundaryStrategy().equals("min"), + "Invalid time boundary strategy: %s", timeBoundaryConfig.getBoundaryStrategy()); + List<String> includedTables = + (List<String>) timeBoundaryConfig.getParameters().getOrDefault("includedTables", List.of()); + + for (String tableNameWithType : includedTables) { Review Comment: Should we add a validation where `includedTables` must all be offline table name? ########## pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java: ########## @@ -418,17 +420,74 @@ public synchronized void includeServerToRouting(String instanceId) { } /** - * Builds/rebuilds the routing for the physical table, for logical tables it is skipped. - * @param physicalOrLogicalTable a physical table with type or logical table name + * Builds the routing for a logical table. This method is called when a logical table is created or updated. + * @param logicalTableName the name of the logical table */ - public synchronized void buildRouting(String physicalOrLogicalTable) { - // skip route building for logical tables - if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, physicalOrLogicalTable)) { - LOGGER.info("Skipping route building for logical table: {}", physicalOrLogicalTable); + public synchronized void buildRoutingForLogicalTable(String logicalTableName) { + LogicalTableConfig logicalTableConfig = ZKMetadataProvider.getLogicalTableConfig(_propertyStore, logicalTableName); + Preconditions.checkState(logicalTableConfig != null, "Failed to find logical table config for: %s", + logicalTableConfig); + if (!logicalTableConfig.isHybridLogicalTable()) { + LOGGER.info("Skip time boundary manager setting for non hybrid logical table: {}", logicalTableName); return; } - String tableNameWithType = physicalOrLogicalTable; + LOGGER.info("Setting time boundary manager for logical table: {}", logicalTableName); + + TimeBoundaryConfig timeBoundaryConfig = logicalTableConfig.getTimeBoundaryConfig(); + Preconditions.checkArgument(timeBoundaryConfig.getBoundaryStrategy().equals("min"), + "Invalid time boundary strategy: %s", timeBoundaryConfig.getBoundaryStrategy()); + List<String> includedTables = + (List<String>) timeBoundaryConfig.getParameters().getOrDefault("includedTables", List.of()); + + for (String tableNameWithType : includedTables) { + // skip time boundary manager init for realtime table + if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) { + LOGGER.info("Skip time boundary manager init for realtime table: {}", tableNameWithType); + continue; + } + + // skip hybrid tables, time boundary for such offline table already exists + String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType( + TableNameBuilder.extractRawTableName(tableNameWithType)); + if (ZKMetadataProvider.isTableConfigExists(_propertyStore, realtimeTableName)) { + LOGGER.info("Skip time boundary manager init for for hybrid table: {}", tableNameWithType); + continue; + } + + // build routing if it does not exist for the offline table + if (!_routingEntryMap.containsKey(tableNameWithType)) { + buildRouting(tableNameWithType); Review Comment: We probably want to handle exception here in case routing cannot be properly built ########## pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java: ########## @@ -418,17 +420,74 @@ public synchronized void includeServerToRouting(String instanceId) { } /** - * Builds/rebuilds the routing for the physical table, for logical tables it is skipped. - * @param physicalOrLogicalTable a physical table with type or logical table name + * Builds the routing for a logical table. This method is called when a logical table is created or updated. + * @param logicalTableName the name of the logical table */ - public synchronized void buildRouting(String physicalOrLogicalTable) { - // skip route building for logical tables - if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, physicalOrLogicalTable)) { - LOGGER.info("Skipping route building for logical table: {}", physicalOrLogicalTable); + public synchronized void buildRoutingForLogicalTable(String logicalTableName) { + LogicalTableConfig logicalTableConfig = ZKMetadataProvider.getLogicalTableConfig(_propertyStore, logicalTableName); + Preconditions.checkState(logicalTableConfig != null, "Failed to find logical table config for: %s", + logicalTableConfig); + if (!logicalTableConfig.isHybridLogicalTable()) { + LOGGER.info("Skip time boundary manager setting for non hybrid logical table: {}", logicalTableName); return; } - String tableNameWithType = physicalOrLogicalTable; + LOGGER.info("Setting time boundary manager for logical table: {}", logicalTableName); + + TimeBoundaryConfig timeBoundaryConfig = logicalTableConfig.getTimeBoundaryConfig(); + Preconditions.checkArgument(timeBoundaryConfig.getBoundaryStrategy().equals("min"), + "Invalid time boundary strategy: %s", timeBoundaryConfig.getBoundaryStrategy()); + List<String> includedTables = + (List<String>) timeBoundaryConfig.getParameters().getOrDefault("includedTables", List.of()); + + for (String tableNameWithType : includedTables) { + // skip time boundary manager init for realtime table + if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) { + LOGGER.info("Skip time boundary manager init for realtime table: {}", tableNameWithType); + continue; + } + + // skip hybrid tables, time boundary for such offline table already exists + String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType( + TableNameBuilder.extractRawTableName(tableNameWithType)); + if (ZKMetadataProvider.isTableConfigExists(_propertyStore, realtimeTableName)) { + LOGGER.info("Skip time boundary manager init for for hybrid table: {}", tableNameWithType); + continue; + } + + // build routing if it does not exist for the offline table + if (!_routingEntryMap.containsKey(tableNameWithType)) { + buildRouting(tableNameWithType); Review Comment: Are we going to build routing multiple times? Is it possible to maintain a set of tables require time boundary management, then only build routing once for all tables? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org