Jackie-Jiang commented on code in PR #15776:
URL: https://github.com/apache/pinot/pull/15776#discussion_r2093797708


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -418,17 +420,74 @@ public synchronized void includeServerToRouting(String 
instanceId) {
   }
 
   /**
-   * Builds/rebuilds the routing for the physical table, for logical tables it 
is skipped.
-   * @param physicalOrLogicalTable a physical table with type or logical table 
name
+   * Builds the routing for a logical table. This method is called when a 
logical table is created or updated.
+   * @param logicalTableName the name of the logical table
    */
-  public synchronized void buildRouting(String physicalOrLogicalTable) {
-    // skip route building for logical tables
-    if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, 
physicalOrLogicalTable)) {
-      LOGGER.info("Skipping route building for logical table: {}", 
physicalOrLogicalTable);
+  public synchronized void buildRoutingForLogicalTable(String 
logicalTableName) {
+    LogicalTableConfig logicalTableConfig = 
ZKMetadataProvider.getLogicalTableConfig(_propertyStore, logicalTableName);
+    Preconditions.checkState(logicalTableConfig != null, "Failed to find 
logical table config for: %s",
+        logicalTableConfig);
+    if (!logicalTableConfig.isHybridLogicalTable()) {
+      LOGGER.info("Skip time boundary manager setting for non hybrid logical 
table: {}", logicalTableName);
       return;
     }
 
-    String tableNameWithType = physicalOrLogicalTable;
+    LOGGER.info("Setting time boundary manager for logical table: {}", 
logicalTableName);
+
+    TimeBoundaryConfig timeBoundaryConfig = 
logicalTableConfig.getTimeBoundaryConfig();
+    
Preconditions.checkArgument(timeBoundaryConfig.getBoundaryStrategy().equals("min"),
+        "Invalid time boundary strategy: %s", 
timeBoundaryConfig.getBoundaryStrategy());
+    List<String> includedTables =
+        (List<String>) 
timeBoundaryConfig.getParameters().getOrDefault("includedTables", List.of());
+
+    for (String tableNameWithType : includedTables) {
+      // skip time boundary manager init for realtime table
+      if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
+        LOGGER.info("Skip time boundary manager init for realtime table: {}", 
tableNameWithType);
+        continue;
+      }
+
+      // skip hybrid tables, time boundary for such offline table already 
exists
+      String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(
+          TableNameBuilder.extractRawTableName(tableNameWithType));
+      if (ZKMetadataProvider.isTableConfigExists(_propertyStore, 
realtimeTableName)) {

Review Comment:
   Not sure if this is good enough. There is no guarantee we will build routing 
for the realtime table



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -2150,6 +2152,14 @@ public void updateLogicalTableConfig(LogicalTableConfig 
logicalTableConfig)
       updateBrokerResourceForLogicalTable(logicalTableConfig, tableName);
     }
 
+    @Nullable TimeBoundaryConfig oldTimeBoundaryConfig = 
oldLogicalTableConfig.getTimeBoundaryConfig();

Review Comment:
   (nit) We don't usually annotate local variable. IDE should be able to read 
the annotation from the method signature



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -418,17 +420,74 @@ public synchronized void includeServerToRouting(String 
instanceId) {
   }
 
   /**
-   * Builds/rebuilds the routing for the physical table, for logical tables it 
is skipped.
-   * @param physicalOrLogicalTable a physical table with type or logical table 
name
+   * Builds the routing for a logical table. This method is called when a 
logical table is created or updated.
+   * @param logicalTableName the name of the logical table
    */
-  public synchronized void buildRouting(String physicalOrLogicalTable) {
-    // skip route building for logical tables
-    if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, 
physicalOrLogicalTable)) {
-      LOGGER.info("Skipping route building for logical table: {}", 
physicalOrLogicalTable);
+  public synchronized void buildRoutingForLogicalTable(String 
logicalTableName) {
+    LogicalTableConfig logicalTableConfig = 
ZKMetadataProvider.getLogicalTableConfig(_propertyStore, logicalTableName);
+    Preconditions.checkState(logicalTableConfig != null, "Failed to find 
logical table config for: %s",
+        logicalTableConfig);
+    if (!logicalTableConfig.isHybridLogicalTable()) {
+      LOGGER.info("Skip time boundary manager setting for non hybrid logical 
table: {}", logicalTableName);
       return;
     }
 
-    String tableNameWithType = physicalOrLogicalTable;
+    LOGGER.info("Setting time boundary manager for logical table: {}", 
logicalTableName);
+
+    TimeBoundaryConfig timeBoundaryConfig = 
logicalTableConfig.getTimeBoundaryConfig();
+    
Preconditions.checkArgument(timeBoundaryConfig.getBoundaryStrategy().equals("min"),
+        "Invalid time boundary strategy: %s", 
timeBoundaryConfig.getBoundaryStrategy());
+    List<String> includedTables =
+        (List<String>) 
timeBoundaryConfig.getParameters().getOrDefault("includedTables", List.of());
+
+    for (String tableNameWithType : includedTables) {

Review Comment:
   Should we add a validation where `includedTables` must all be offline table 
name?



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -418,17 +420,74 @@ public synchronized void includeServerToRouting(String 
instanceId) {
   }
 
   /**
-   * Builds/rebuilds the routing for the physical table, for logical tables it 
is skipped.
-   * @param physicalOrLogicalTable a physical table with type or logical table 
name
+   * Builds the routing for a logical table. This method is called when a 
logical table is created or updated.
+   * @param logicalTableName the name of the logical table
    */
-  public synchronized void buildRouting(String physicalOrLogicalTable) {
-    // skip route building for logical tables
-    if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, 
physicalOrLogicalTable)) {
-      LOGGER.info("Skipping route building for logical table: {}", 
physicalOrLogicalTable);
+  public synchronized void buildRoutingForLogicalTable(String 
logicalTableName) {
+    LogicalTableConfig logicalTableConfig = 
ZKMetadataProvider.getLogicalTableConfig(_propertyStore, logicalTableName);
+    Preconditions.checkState(logicalTableConfig != null, "Failed to find 
logical table config for: %s",
+        logicalTableConfig);
+    if (!logicalTableConfig.isHybridLogicalTable()) {
+      LOGGER.info("Skip time boundary manager setting for non hybrid logical 
table: {}", logicalTableName);
       return;
     }
 
-    String tableNameWithType = physicalOrLogicalTable;
+    LOGGER.info("Setting time boundary manager for logical table: {}", 
logicalTableName);
+
+    TimeBoundaryConfig timeBoundaryConfig = 
logicalTableConfig.getTimeBoundaryConfig();
+    
Preconditions.checkArgument(timeBoundaryConfig.getBoundaryStrategy().equals("min"),
+        "Invalid time boundary strategy: %s", 
timeBoundaryConfig.getBoundaryStrategy());
+    List<String> includedTables =
+        (List<String>) 
timeBoundaryConfig.getParameters().getOrDefault("includedTables", List.of());
+
+    for (String tableNameWithType : includedTables) {
+      // skip time boundary manager init for realtime table
+      if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
+        LOGGER.info("Skip time boundary manager init for realtime table: {}", 
tableNameWithType);
+        continue;
+      }
+
+      // skip hybrid tables, time boundary for such offline table already 
exists
+      String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(
+          TableNameBuilder.extractRawTableName(tableNameWithType));
+      if (ZKMetadataProvider.isTableConfigExists(_propertyStore, 
realtimeTableName)) {
+        LOGGER.info("Skip time boundary manager init for for hybrid table: 
{}", tableNameWithType);
+        continue;
+      }
+
+      // build routing if it does not exist for the offline table
+      if (!_routingEntryMap.containsKey(tableNameWithType)) {
+        buildRouting(tableNameWithType);

Review Comment:
   We probably want to handle exception here in case routing cannot be properly 
built



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -418,17 +420,74 @@ public synchronized void includeServerToRouting(String 
instanceId) {
   }
 
   /**
-   * Builds/rebuilds the routing for the physical table, for logical tables it 
is skipped.
-   * @param physicalOrLogicalTable a physical table with type or logical table 
name
+   * Builds the routing for a logical table. This method is called when a 
logical table is created or updated.
+   * @param logicalTableName the name of the logical table
    */
-  public synchronized void buildRouting(String physicalOrLogicalTable) {
-    // skip route building for logical tables
-    if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, 
physicalOrLogicalTable)) {
-      LOGGER.info("Skipping route building for logical table: {}", 
physicalOrLogicalTable);
+  public synchronized void buildRoutingForLogicalTable(String 
logicalTableName) {
+    LogicalTableConfig logicalTableConfig = 
ZKMetadataProvider.getLogicalTableConfig(_propertyStore, logicalTableName);
+    Preconditions.checkState(logicalTableConfig != null, "Failed to find 
logical table config for: %s",
+        logicalTableConfig);
+    if (!logicalTableConfig.isHybridLogicalTable()) {
+      LOGGER.info("Skip time boundary manager setting for non hybrid logical 
table: {}", logicalTableName);
       return;
     }
 
-    String tableNameWithType = physicalOrLogicalTable;
+    LOGGER.info("Setting time boundary manager for logical table: {}", 
logicalTableName);
+
+    TimeBoundaryConfig timeBoundaryConfig = 
logicalTableConfig.getTimeBoundaryConfig();
+    
Preconditions.checkArgument(timeBoundaryConfig.getBoundaryStrategy().equals("min"),
+        "Invalid time boundary strategy: %s", 
timeBoundaryConfig.getBoundaryStrategy());
+    List<String> includedTables =
+        (List<String>) 
timeBoundaryConfig.getParameters().getOrDefault("includedTables", List.of());
+
+    for (String tableNameWithType : includedTables) {
+      // skip time boundary manager init for realtime table
+      if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
+        LOGGER.info("Skip time boundary manager init for realtime table: {}", 
tableNameWithType);
+        continue;
+      }
+
+      // skip hybrid tables, time boundary for such offline table already 
exists
+      String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(
+          TableNameBuilder.extractRawTableName(tableNameWithType));
+      if (ZKMetadataProvider.isTableConfigExists(_propertyStore, 
realtimeTableName)) {
+        LOGGER.info("Skip time boundary manager init for for hybrid table: 
{}", tableNameWithType);
+        continue;
+      }
+
+      // build routing if it does not exist for the offline table
+      if (!_routingEntryMap.containsKey(tableNameWithType)) {
+        buildRouting(tableNameWithType);

Review Comment:
   Are we going to build routing multiple times? Is it possible to maintain a 
set of tables require time boundary management, then only build routing once 
for all tables?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to