This is an automated email from the ASF dual-hosted git repository.

manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c9f0c47d0a Logical table time boundary (#15776)
c9f0c47d0a is described below

commit c9f0c47d0ad96607760b706a79802d1598222ef3
Author: Abhishek Bafna <aba...@startree.ai>
AuthorDate: Tue May 20 11:55:29 2025 +0530

    Logical table time boundary (#15776)
    
    * Time boundary computation for SSE.
    
    * Fix checkstyle
    
    * Fix checkstyle
    
    * Enhancing validations and tests for time boundary.
    
    * Addressing review comments.
    
    * Addressing review comments.
    
    * Addressing review comments.
    
    ---------
    
    Co-authored-by: abhishekbafna <abhishek.ba...@startree.ai>
---
 ...okerResourceOnlineOfflineStateModelFactory.java |  22 ++--
 .../BrokerUserDefinedMessageHandlerFactory.java    |  27 +++++
 .../pinot/broker/routing/BrokerRoutingManager.java |  71 ++++++++++--
 .../messages/LogicalTableConfigRefreshMessage.java |  65 +++++++++++
 .../pinot/common/metadata/ZKMetadataProvider.java  |   4 +
 .../common/utils/LogicalTableConfigUtils.java      |  29 +++++
 .../helix/core/PinotHelixResourceManager.java      |  31 +++++
 .../resources/PinotLogicalTableResourceTest.java   |  54 +++++++++
 .../pinot/controller/helix/ControllerTest.java     |   2 +
 .../BaseLogicalTableIntegrationTest.java           | 127 +++++++++++++++++----
 ...hOneOfflineOneRealtimeTableIntegrationTest.java |  44 +++++++
 ...alTableWithOneRealtimeTableIntegrationTest.java |  29 +++++
 ...elveOfflineOneRealtimeTableIntegrationTest.java |  43 +++++++
 ...hTwoOfflineOneRealtimeTableIntegrationTest.java |  77 +++++++++++++
 .../routing/table/LogicalTableRouteProvider.java   |  27 +++--
 .../timeboundary/MinTimeBoundaryStrategy.java      |  79 +++++++++++++
 .../query/timeboundary/TimeBoundaryStrategy.java   |  46 ++++++++
 .../timeboundary/TimeBoundaryStrategyService.java  |  71 ++++++++++++
 .../query/routing/table/BaseTableRouteTest.java    |  26 ++++-
 ...ogicalTableRouteProviderCalculateRouteTest.java |  14 +++
 .../LogicalTableRouteProviderGetRouteTest.java     |   7 ++
 .../timeboundary/MinTimeBoundaryStrategyTest.java  | 122 ++++++++++++++++++++
 .../TimeBoundaryStrategyServiceTest.java           |  40 +++++++
 .../apache/pinot/spi/data/LogicalTableConfig.java  |  14 +++
 .../apache/pinot/spi/data/TimeBoundaryConfig.java  |  52 +++++++++
 .../utils/builder/ControllerRequestURLBuilder.java |   8 ++
 .../utils/builder/LogicalTableConfigBuilder.java   |   8 ++
 27 files changed, 1087 insertions(+), 52 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
index 41f9e9ef87..dcf8a667e1 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
@@ -75,18 +75,22 @@ public class BrokerResourceOnlineOfflineStateModelFactory 
extends StateModelFact
 
     @Transition(from = "OFFLINE", to = "ONLINE")
     public void onBecomeOnlineFromOffline(Message message, NotificationContext 
context) {
-      String tableNameWithType = message.getPartitionName();
-      LOGGER.info("Processing transition from OFFLINE to ONLINE for table: 
{}", tableNameWithType);
+      String physicalOrLogicalTable = message.getPartitionName();
+      LOGGER.info("Processing transition from OFFLINE to ONLINE for table: 
{}", physicalOrLogicalTable);
       try {
-        _routingManager.buildRouting(tableNameWithType);
-        TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
-        _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig,
-            
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(BROKER_RESOURCE_INSTANCE)));
-        _queryQuotaManager.createDatabaseRateLimiter(
-            
DatabaseUtils.extractDatabaseFromFullyQualifiedTableName(tableNameWithType));
+        if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, 
physicalOrLogicalTable)) {
+          _routingManager.buildRoutingForLogicalTable(physicalOrLogicalTable);
+        } else {
+          _routingManager.buildRouting(physicalOrLogicalTable);
+          TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, physicalOrLogicalTable);
+          _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig,
+              
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(BROKER_RESOURCE_INSTANCE)));
+          _queryQuotaManager.createDatabaseRateLimiter(
+              
DatabaseUtils.extractDatabaseFromFullyQualifiedTableName(physicalOrLogicalTable));
+        }
       } catch (Exception e) {
         LOGGER.error("Caught exception while processing transition from 
OFFLINE to ONLINE for table: {}",
-            tableNameWithType, e);
+            physicalOrLogicalTable, e);
         throw e;
       }
     }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
index f4da13621e..033a126ea6 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
@@ -27,6 +27,7 @@ import 
org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManage
 import org.apache.pinot.broker.routing.BrokerRoutingManager;
 import org.apache.pinot.common.messages.ApplicationQpsQuotaRefreshMessage;
 import org.apache.pinot.common.messages.DatabaseConfigRefreshMessage;
+import org.apache.pinot.common.messages.LogicalTableConfigRefreshMessage;
 import org.apache.pinot.common.messages.RoutingTableRebuildMessage;
 import org.apache.pinot.common.messages.SegmentRefreshMessage;
 import org.apache.pinot.common.messages.TableConfigRefreshMessage;
@@ -62,6 +63,8 @@ public class BrokerUserDefinedMessageHandlerFactory 
implements MessageHandlerFac
         return new RefreshSegmentMessageHandler(new 
SegmentRefreshMessage(message), context);
       case TableConfigRefreshMessage.REFRESH_TABLE_CONFIG_MSG_SUB_TYPE:
         return new RefreshTableConfigMessageHandler(new 
TableConfigRefreshMessage(message), context);
+      case 
LogicalTableConfigRefreshMessage.REFRESH_LOGICAL_TABLE_CONFIG_MSG_SUB_TYPE:
+        return new RefreshLogicalTableConfigMessageHandler(new 
LogicalTableConfigRefreshMessage(message), context);
       case RoutingTableRebuildMessage.REBUILD_ROUTING_TABLE_MSG_SUB_TYPE:
         return new RebuildRoutingTableMessageHandler(new 
RoutingTableRebuildMessage(message), context);
       case DatabaseConfigRefreshMessage.REFRESH_DATABASE_CONFIG_MSG_SUB_TYPE:
@@ -139,6 +142,30 @@ public class BrokerUserDefinedMessageHandlerFactory 
implements MessageHandlerFac
     }
   }
 
+  private class RefreshLogicalTableConfigMessageHandler extends MessageHandler 
{
+    final String _logicalTableName;
+
+    RefreshLogicalTableConfigMessageHandler(LogicalTableConfigRefreshMessage 
refreshMessage,
+        NotificationContext context) {
+      super(refreshMessage, context);
+      _logicalTableName = refreshMessage.getLogicalTableName();
+    }
+
+    @Override
+    public HelixTaskResult handleMessage() {
+      _routingManager.buildRoutingForLogicalTable(_logicalTableName);
+      HelixTaskResult result = new HelixTaskResult();
+      result.setSuccess(true);
+      return result;
+    }
+
+    @Override
+    public void onError(Exception e, ErrorCode code, ErrorType type) {
+      LOGGER.error("Got error while refreshing logical table config for table: 
{} (error code: {}, error type: {})",
+          _logicalTableName, code, type, e);
+    }
+  }
+
   private class RefreshDatabaseConfigMessageHandler extends MessageHandler {
     final String _databaseName;
 
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
index 9c54f57618..801b39055b 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
@@ -70,6 +70,8 @@ import 
org.apache.pinot.spi.config.table.ColumnPartitionConfig;
 import org.apache.pinot.spi.config.table.QueryConfig;
 import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.TimeBoundaryConfig;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Helix;
@@ -418,17 +420,72 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
   }
 
   /**
-   * Builds/rebuilds the routing for the physical table, for logical tables it 
is skipped.
-   * @param physicalOrLogicalTable a physical table with type or logical table 
name
+   * Builds the routing for a logical table. This method is called when a 
logical table is created or updated.
+   * @param logicalTableName the name of the logical table
    */
-  public synchronized void buildRouting(String physicalOrLogicalTable) {
-    // skip route building for logical tables
-    if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, 
physicalOrLogicalTable)) {
-      LOGGER.info("Skipping route building for logical table: {}", 
physicalOrLogicalTable);
+  public synchronized void buildRoutingForLogicalTable(String 
logicalTableName) {
+    LogicalTableConfig logicalTableConfig = 
ZKMetadataProvider.getLogicalTableConfig(_propertyStore, logicalTableName);
+    Preconditions.checkState(logicalTableConfig != null, "Failed to find 
logical table config for: %s",
+        logicalTableConfig);
+    if (!logicalTableConfig.isHybridLogicalTable()) {
+      LOGGER.info("Skip time boundary manager setting for non hybrid logical 
table: {}", logicalTableName);
       return;
     }
 
-    String tableNameWithType = physicalOrLogicalTable;
+    LOGGER.info("Setting time boundary manager for logical table: {}", 
logicalTableName);
+
+    TimeBoundaryConfig timeBoundaryConfig = 
logicalTableConfig.getTimeBoundaryConfig();
+    
Preconditions.checkArgument(timeBoundaryConfig.getBoundaryStrategy().equals("min"),
+        "Invalid time boundary strategy: %s", 
timeBoundaryConfig.getBoundaryStrategy());
+    List<String> includedTables =
+        (List<String>) 
timeBoundaryConfig.getParameters().getOrDefault("includedTables", List.of());
+
+    for (String tableNameWithType : includedTables) {
+      
Preconditions.checkArgument(TableNameBuilder.isOfflineTableResource(tableNameWithType),
+          "Invalid table in the time boundary config: %s", tableNameWithType);
+      try {
+        // build routing if it does not exist for the offline table
+        if (!_routingEntryMap.containsKey(tableNameWithType)) {
+          buildRouting(tableNameWithType);
+        }
+
+        if (_routingEntryMap.get(tableNameWithType).getTimeBoundaryManager() 
!= null) {
+          LOGGER.info("Skip time boundary manager init for table: {}", 
tableNameWithType);
+          continue;
+        }
+
+        // init time boundary manager for the table
+        TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+        Preconditions.checkState(tableConfig != null, "Failed to find table 
config for table: %s", tableNameWithType);
+
+        String idealStatePath = getIdealStatePath(tableNameWithType);
+        IdealState idealState = getIdealState(idealStatePath);
+        Preconditions.checkState(idealState != null, "Failed to find ideal 
state for table: %s", tableNameWithType);
+
+        String externalViewPath = getExternalViewPath(tableNameWithType);
+        ExternalView externalView = getExternalView(externalViewPath);
+
+        Set<String> onlineSegments = getOnlineSegments(idealState);
+        SegmentPreSelector segmentPreSelector =
+            SegmentPreSelectorFactory.getSegmentPreSelector(tableConfig, 
_propertyStore);
+        Set<String> preSelectedOnlineSegments = 
segmentPreSelector.preSelect(onlineSegments);
+
+        TimeBoundaryManager timeBoundaryManager = new 
TimeBoundaryManager(tableConfig, _propertyStore, _brokerMetrics);
+        timeBoundaryManager.init(idealState, externalView, 
preSelectedOnlineSegments);
+
+        
_routingEntryMap.get(tableNameWithType).setTimeBoundaryManager(timeBoundaryManager);
+      } catch (Exception e) {
+        LOGGER.error("Caught unexpected exception while setting time boundary 
manager for table: {}", tableNameWithType,
+            e);
+      }
+    }
+  }
+
+  /**
+   * Builds the routing for a table.
+   * @param tableNameWithType the name of the table
+   */
+  public synchronized void buildRouting(String tableNameWithType) {
     LOGGER.info("Building routing for table: {}", tableNameWithType);
 
     TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/messages/LogicalTableConfigRefreshMessage.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/messages/LogicalTableConfigRefreshMessage.java
new file mode 100644
index 0000000000..dc16baa4ee
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/messages/LogicalTableConfigRefreshMessage.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.messages;
+
+import java.util.UUID;
+import org.apache.helix.model.Message;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+
+
+/**
+ * This (Helix) message is sent from the controller to brokers when a request 
is received to update the logical table
+ * config.
+ *
+ * NOTE: We keep the table name as a separate key instead of using the Helix 
PARTITION_NAME so that this message can be
+ *       used for any resource.
+ */
+public class LogicalTableConfigRefreshMessage extends Message {
+  public static final String REFRESH_LOGICAL_TABLE_CONFIG_MSG_SUB_TYPE = 
"REFRESH_LOGICAL_TABLE_CONFIG";
+
+  private static final String TABLE_NAME_KEY = "logicalTableName";
+
+  /**
+   * Constructor for the sender.
+   */
+  public LogicalTableConfigRefreshMessage(String logicalTableName) {
+    super(MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString());
+    setMsgSubType(REFRESH_LOGICAL_TABLE_CONFIG_MSG_SUB_TYPE);
+    // Give it infinite time to process the message, as long as session is 
alive
+    setExecutionTimeout(-1);
+    // Set the Pinot specific fields
+    // NOTE: DO NOT use Helix field "PARTITION_NAME" because it can be 
overridden by Helix while sending the message
+    ZNRecord znRecord = getRecord();
+    znRecord.setSimpleField(TABLE_NAME_KEY, logicalTableName);
+  }
+
+  /**
+   * Constructor for the receiver.
+   */
+  public LogicalTableConfigRefreshMessage(Message message) {
+    super(message.getRecord());
+    if 
(!message.getMsgSubType().equals(REFRESH_LOGICAL_TABLE_CONFIG_MSG_SUB_TYPE)) {
+      throw new IllegalArgumentException("Invalid message subtype:" + 
message.getMsgSubType());
+    }
+  }
+
+  public String getLogicalTableName() {
+    return getRecord().getSimpleField(TABLE_NAME_KEY);
+  }
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index 829a273f2d..029f5c3491 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -883,4 +883,8 @@ public class ZKMetadataProvider {
   public static boolean isLogicalTableExists(ZkHelixPropertyStore<ZNRecord> 
propertyStore, String tableName) {
     return 
propertyStore.exists(constructPropertyStorePathForLogical(tableName), 
AccessOption.PERSISTENT);
   }
+
+  public static boolean isTableConfigExists(ZkHelixPropertyStore<ZNRecord> 
propertyStore, String tableName) {
+    return 
propertyStore.exists(constructPropertyStorePathForResourceConfig(tableName), 
AccessOption.PERSISTENT);
+  }
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java
index d9b5e69b1e..42d5e43f9f 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java
@@ -33,6 +33,7 @@ import org.apache.pinot.spi.config.table.QueryConfig;
 import org.apache.pinot.spi.config.table.QuotaConfig;
 import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.data.PhysicalTableConfig;
+import org.apache.pinot.spi.data.TimeBoundaryConfig;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -64,6 +65,10 @@ public class LogicalTableConfigUtils {
     if (record.getSimpleField(LogicalTableConfig.REF_REALTIME_TABLE_NAME_KEY) 
!= null) {
       
builder.setRefRealtimeTableName(record.getSimpleField(LogicalTableConfig.REF_REALTIME_TABLE_NAME_KEY));
     }
+    String timeBoundaryConfigJson = 
record.getSimpleField(LogicalTableConfig.TIME_BOUNDARY_CONFIG_KEY);
+    if (timeBoundaryConfigJson != null) {
+      
builder.setTimeBoundaryConfig(JsonUtils.stringToObject(timeBoundaryConfigJson, 
TimeBoundaryConfig.class));
+    }
 
     Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>();
     for (Map.Entry<String, String> entry : 
record.getMapField(LogicalTableConfig.PHYSICAL_TABLE_CONFIG_KEY)
@@ -105,6 +110,10 @@ public class LogicalTableConfigUtils {
       record.setSimpleField(LogicalTableConfig.REF_REALTIME_TABLE_NAME_KEY,
           logicalTableConfig.getRefRealtimeTableName());
     }
+    if (logicalTableConfig.getTimeBoundaryConfig() != null) {
+      record.setSimpleField(LogicalTableConfig.TIME_BOUNDARY_CONFIG_KEY,
+          logicalTableConfig.getTimeBoundaryConfig().toJsonString());
+    }
     return record;
   }
 
@@ -199,5 +208,25 @@ public class LogicalTableConfigUtils {
       throw new IllegalArgumentException(
           "Invalid logical table. Reason: Schema with same name as logical 
table '" + tableName + "' does not exist");
     }
+
+    // validate time boundary config is not null for hybrid tables
+    TimeBoundaryConfig timeBoundaryConfig = 
logicalTableConfig.getTimeBoundaryConfig();
+    if (logicalTableConfig.isHybridLogicalTable() && timeBoundaryConfig == 
null) {
+      throw new IllegalArgumentException(
+          "Invalid logical table. Reason: 'timeBoundaryConfig' should not be 
null for hybrid logical tables");
+    }
+
+    // time boundary strategy should not be null or empty
+    if (timeBoundaryConfig != null && 
StringUtils.isEmpty(timeBoundaryConfig.getBoundaryStrategy())) {
+      throw new IllegalArgumentException(
+          "Invalid logical table. Reason: 
'timeBoundaryConfig.boundaryStrategy' should not be null or empty");
+    }
+
+    // validate time boundary config parameters
+    if (timeBoundaryConfig != null
+        && (timeBoundaryConfig.getParameters() == null || 
timeBoundaryConfig.getParameters().isEmpty())) {
+      throw new IllegalArgumentException(
+          "Invalid logical table. Reason: 'timeBoundaryConfig.parameters' 
should not be null or empty");
+    }
   }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 81c14fc2ca..892e3fbc11 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -105,6 +105,7 @@ import 
org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
 import org.apache.pinot.common.lineage.SegmentLineageUtils;
 import org.apache.pinot.common.messages.ApplicationQpsQuotaRefreshMessage;
 import org.apache.pinot.common.messages.DatabaseConfigRefreshMessage;
+import org.apache.pinot.common.messages.LogicalTableConfigRefreshMessage;
 import org.apache.pinot.common.messages.RoutingTableRebuildMessage;
 import org.apache.pinot.common.messages.RunPeriodicTaskMessage;
 import org.apache.pinot.common.messages.SegmentRefreshMessage;
@@ -179,6 +180,7 @@ import org.apache.pinot.spi.config.user.UserConfig;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.TimeBoundaryConfig;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Helix;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.BrokerResourceStateModel;
@@ -2150,6 +2152,14 @@ public class PinotHelixResourceManager {
       updateBrokerResourceForLogicalTable(logicalTableConfig, tableName);
     }
 
+    TimeBoundaryConfig oldTimeBoundaryConfig = 
oldLogicalTableConfig.getTimeBoundaryConfig();
+    TimeBoundaryConfig newTimeBoundaryConfig = 
logicalTableConfig.getTimeBoundaryConfig();
+    // compare the old and new time boundary config and send message if they 
are different
+    if ((oldTimeBoundaryConfig != null && 
!oldTimeBoundaryConfig.equals(newTimeBoundaryConfig))
+    || (oldTimeBoundaryConfig == null && newTimeBoundaryConfig != null)) {
+      sendLogicalTableConfigRefreshMessage(logicalTableConfig.getTableName());
+    }
+
     LOGGER.info("Updated logical table {}: Successfully updated table", 
tableName);
   }
 
@@ -3172,6 +3182,27 @@ public class PinotHelixResourceManager {
     }
   }
 
+  private void sendLogicalTableConfigRefreshMessage(String logicalTableName) {
+    LogicalTableConfigRefreshMessage refreshMessage = new 
LogicalTableConfigRefreshMessage(logicalTableName);
+
+    // Send logical table config refresh message to brokers
+    Criteria recipientCriteria = new Criteria();
+    recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    recipientCriteria.setInstanceName("%");
+    recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
+    recipientCriteria.setSessionSpecific(true);
+    recipientCriteria.setPartition(logicalTableName);
+    // Send message with no callback and infinite timeout on the recipient
+    int numMessagesSent =
+        _helixZkManager.getMessagingService().send(recipientCriteria, 
refreshMessage, null, -1);
+    if (numMessagesSent > 0) {
+      LOGGER.info("Sent {} logical table config refresh messages to brokers 
for table: {}", numMessagesSent,
+          logicalTableName);
+    } else {
+      LOGGER.warn("No logical table config refresh message sent to brokers for 
table: {}", logicalTableName);
+    }
+  }
+
   private void sendApplicationQpsQuotaRefreshMessage(String appName) {
     ApplicationQpsQuotaRefreshMessage message = new 
ApplicationQpsQuotaRefreshMessage(appName);
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java
index ced4a3f6c9..3458777e8d 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java
@@ -31,6 +31,7 @@ import org.apache.pinot.spi.config.table.QuotaConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.TimeBoundaryConfig;
 import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
@@ -333,6 +334,59 @@ public class PinotLogicalTableResourceTest extends 
ControllerTest {
         throwable.getMessage());
   }
 
+  public void testLogicalTableTimeBoundaryConfigValidation()
+      throws IOException {
+    // Test logical table time boundary strategy validation
+    List<String> physicalTableNamesWithType = 
createHybridTables(List.of("test_table_8"));
+    LogicalTableConfig logicalTableConfig =
+        getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, 
physicalTableNamesWithType, BROKER_TENANT);
+
+    // Test logical table with no time boundary config
+    logicalTableConfig.setTimeBoundaryConfig(null);
+    Throwable throwable = expectThrows(IOException.class, () -> {
+      ControllerTest.sendPostRequest(_addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+    });
+    assertTrue(throwable.getMessage()
+            .contains("Reason: 'timeBoundaryConfig' should not be null for 
hybrid logical tables"),
+        throwable.getMessage());
+
+    // Test logical table with time boundary config but null strategy
+    logicalTableConfig.setTimeBoundaryConfig(new TimeBoundaryConfig(null, 
null));
+    throwable = expectThrows(IOException.class, () -> {
+      ControllerTest.sendPostRequest(_addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+    });
+    assertTrue(throwable.getMessage()
+            .contains("Reason: 'timeBoundaryConfig.strategy' should not be 
null or empty for hybrid logical tables"),
+        throwable.getMessage());
+
+    // Test logical table with time boundary config but empty strategy
+    logicalTableConfig.setTimeBoundaryConfig(new TimeBoundaryConfig("", null));
+    throwable = expectThrows(IOException.class, () -> {
+      ControllerTest.sendPostRequest(_addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+    });
+    assertTrue(throwable.getMessage()
+            .contains("Reason: 'timeBoundaryConfig.strategy' should not be 
null or empty for hybrid logical tables"),
+        throwable.getMessage());
+
+    // Test logical table with time boundary config but null parameters
+    logicalTableConfig.setTimeBoundaryConfig(new TimeBoundaryConfig("min", 
null));
+    throwable = expectThrows(IOException.class, () -> {
+      ControllerTest.sendPostRequest(_addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+    });
+    assertTrue(throwable.getMessage()
+            .contains("Reason: 'timeBoundaryConfig.parameters' should not be 
null or empty for hybrid logical tables"),
+        throwable.getMessage());
+
+    // Test logical table with time boundary config but empty parameters
+    logicalTableConfig.setTimeBoundaryConfig(new TimeBoundaryConfig("min", 
Map.of()));
+    throwable = expectThrows(IOException.class, () -> {
+      ControllerTest.sendPostRequest(_addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+    });
+    assertTrue(throwable.getMessage()
+            .contains("Reason: 'timeBoundaryConfig.parameters' should not be 
null or empty for hybrid logical tables"),
+        throwable.getMessage());
+  }
+
   @Test
   public void testLogicalTableWithSameNameNotAllowed()
       throws IOException {
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index 9f775d3201..38c29dfeb7 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -81,6 +81,7 @@ import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.data.MetricFieldSpec;
 import org.apache.pinot.spi.data.PhysicalTableConfig;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.TimeBoundaryConfig;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Helix;
@@ -408,6 +409,7 @@ public class ControllerTest {
         .setRefRealtimeTableName(realtimeTableName)
         .setQuotaConfig(new QuotaConfig(null, "999"))
         .setQueryConfig(new QueryConfig(1L, true, false, null, 1L, 1L))
+        .setTimeBoundaryConfig(new TimeBoundaryConfig("min", 
Map.of("includedTables", physicalTableNames)))
         .setPhysicalTableConfigMap(physicalTableConfigMap);
     return builder.build();
   }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
index 7405a00ce5..3f2bffd888 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
@@ -24,9 +24,11 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.controller.helix.ControllerRequestClient;
 import org.apache.pinot.controller.helix.ControllerTest;
@@ -39,7 +41,9 @@ import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.data.PhysicalTableConfig;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.TimeBoundaryConfig;
 import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -63,8 +67,8 @@ public abstract class BaseLogicalTableIntegrationTest extends 
BaseClusterIntegra
   private static final String DEFAULT_TENANT = "DefaultTenant";
   private static final String DEFAULT_LOGICAL_TABLE_NAME = "mytable";
   protected static final String DEFAULT_TABLE_NAME = "physicalTable";
-  private static final int NUM_OFFLINE_SEGMENTS = 12;
   protected static BaseLogicalTableIntegrationTest _sharedClusterTestSuite = 
null;
+  protected List<File> _avroFiles;
 
   @BeforeSuite
   public void setUpSuite()
@@ -99,11 +103,6 @@ public abstract class BaseLogicalTableIntegrationTest 
extends BaseClusterIntegra
     LOGGER.info("Finished tearing down integration test suite");
   }
 
-  @Override
-  protected String getTableName() {
-    return DEFAULT_TABLE_NAME;
-  }
-
   @BeforeClass
   public void setUp()
       throws Exception {
@@ -113,10 +112,12 @@ public abstract class BaseLogicalTableIntegrationTest 
extends BaseClusterIntegra
       _helixResourceManager = _sharedClusterTestSuite._helixResourceManager;
     }
 
-    List<File> avroFiles = getAllAvroFiles();
-    int numSegmentsPerTable = NUM_OFFLINE_SEGMENTS / 
getOfflineTableNames().size();
-    int index = 0;
-    for (String tableName : getOfflineTableNames()) {
+    _avroFiles = getAllAvroFiles();
+    Map<String, List<File>> offlineTableDataFiles = getOfflineTableDataFiles();
+    for (Map.Entry<String, List<File>> entry : 
offlineTableDataFiles.entrySet()) {
+      String tableName = entry.getKey();
+      List<File> avroFilesForTable = entry.getValue();
+
       File tarDir = new File(_tarDir, tableName);
 
       TestUtils.ensureDirectoriesExistAndEmpty(tarDir);
@@ -128,25 +129,37 @@ public abstract class BaseLogicalTableIntegrationTest 
extends BaseClusterIntegra
       TableConfig offlineTableConfig = createOfflineTableConfig(tableName);
       addTableConfig(offlineTableConfig);
 
-      List<File> offlineAvroFiles = new ArrayList<>(numSegmentsPerTable);
-      for (int i = index; i < index + numSegmentsPerTable; i++) {
-        offlineAvroFiles.add(avroFiles.get(i));
-      }
-      index += numSegmentsPerTable;
-
       // Create and upload segments
-      ClusterIntegrationTestUtils.buildSegmentsFromAvro(offlineAvroFiles, 
offlineTableConfig, schema, 0, _segmentDir,
+      ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFilesForTable, 
offlineTableConfig, schema, 0, _segmentDir,
           tarDir);
       uploadSegments(tableName, tarDir);
     }
 
+    // create realtime table
+    Map<String, List<File>> realtimeTableDataFiles = 
getRealtimeTableDataFiles();
+    for (Map.Entry<String, List<File>> entry : 
realtimeTableDataFiles.entrySet()) {
+      String tableName = entry.getKey();
+      List<File> avroFilesForTable = entry.getValue();
+      // create and upload the schema and table config
+      Schema schema = createSchema(getSchemaFileName());
+      schema.setSchemaName(tableName);
+      addSchema(schema);
+
+      TableConfig realtimeTableConfig = 
createRealtimeTableConfig(avroFilesForTable.get(0));
+      realtimeTableConfig.setTableName(tableName);
+      addTableConfig(realtimeTableConfig);
+
+      // push avro files into kafka
+      pushAvroIntoKafka(avroFilesForTable);
+    }
+
     createLogicalTable();
 
     // Set up the H2 connection
-    setUpH2Connection(avroFiles);
+    setUpH2Connection(_avroFiles);
 
     // Initialize the query generator
-    setUpQueryGenerator(avroFiles);
+    setUpQueryGenerator(_avroFiles);
 
     // Wait for all documents loaded
     waitForAllDocsLoaded(600_000L);
@@ -158,11 +171,70 @@ public abstract class BaseLogicalTableIntegrationTest 
extends BaseClusterIntegra
     cleanup();
   }
 
-  protected abstract List<String> getOfflineTableNames();
+  protected List<String> getOfflineTableNames() {
+    return List.of();
+  }
+
+  protected List<String> getRealtimeTableNames() {
+    return List.of();
+  }
+
+  protected Map<String, List<File>> getOfflineTableDataFiles() {
+    List<String> offlineTableNames = getOfflineTableNames();
+    return !offlineTableNames.isEmpty() ? 
distributeFilesToTables(offlineTableNames, _avroFiles) : Map.of();
+  }
+
+  protected Map<String, List<File>> getRealtimeTableDataFiles() {
+    List<String> realtimeTableNames = getRealtimeTableNames();
+    return !realtimeTableNames.isEmpty() ? 
distributeFilesToTables(realtimeTableNames, _avroFiles) : Map.of();
+  }
+
+  protected Map<String, List<File>> distributeFilesToTables(List<String> 
tableNames, List<File> avroFiles) {
+    Map<String, List<File>> tableNameToFilesMap = new HashMap<>();
+
+    // Initialize the map with empty lists for each table name
+    tableNames.forEach(table -> tableNameToFilesMap.put(table, new 
ArrayList<>()));
+
+    // Round-robin distribution of files to table names
+    for (int i = 0; i < avroFiles.size(); i++) {
+      String tableName = tableNames.get(i % tableNames.size());
+      tableNameToFilesMap.get(tableName).add(avroFiles.get(i));
+    }
+    return tableNameToFilesMap;
+  }
+
+  private List<String> getTimeBoundaryTable() {
+    String timeBoundaryTable = null;
+    long maxEndTimeMillis = Long.MIN_VALUE;
+    try {
+      for (String tableName : getOfflineTableNames()) {
+        String url = 
_controllerRequestURLBuilder.forSegmentMetadata(tableName, TableType.OFFLINE);
+        String response = ControllerTest.sendGetRequest(url);
+        JsonNode jsonNode = JsonUtils.stringToJsonNode(response);
+        Iterator<String> stringIterator = jsonNode.fieldNames();
+        while (stringIterator.hasNext()) {
+          String segmentName = stringIterator.next();
+          JsonNode segmentJsonNode = jsonNode.get(segmentName);
+          long endTimeMillis = segmentJsonNode.get("endTimeMillis").asLong();
+          if (endTimeMillis > maxEndTimeMillis) {
+            maxEndTimeMillis = endTimeMillis;
+            timeBoundaryTable = tableName;
+          }
+        }
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to get the time boundary table", e);
+    }
+    return timeBoundaryTable != null ? 
List.of(TableNameBuilder.OFFLINE.tableNameWithType(timeBoundaryTable))
+        : List.of();
+  }
 
   protected List<String> getPhysicalTableNames() {
-    return 
getOfflineTableNames().stream().map(TableNameBuilder.OFFLINE::tableNameWithType)
+    List<String> offlineTableNames = 
getOfflineTableNames().stream().map(TableNameBuilder.OFFLINE::tableNameWithType)
         .collect(Collectors.toList());
+    List<String> realtimeTableNames = getRealtimeTableNames().stream()
+        
.map(TableNameBuilder.REALTIME::tableNameWithType).collect(Collectors.toList());
+    return Stream.concat(offlineTableNames.stream(), 
realtimeTableNames.stream()).collect(Collectors.toList());
   }
 
   protected String getLogicalTableName() {
@@ -205,7 +277,7 @@ public abstract class BaseLogicalTableIntegrationTest 
extends BaseClusterIntegra
     // @formatter:on
   }
 
-  public static LogicalTableConfig getLogicalTableConfig(String tableName, 
List<String> physicalTableNames,
+  public LogicalTableConfig getLogicalTableConfig(String tableName, 
List<String> physicalTableNames,
       String brokerTenant) {
     Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>();
     for (String physicalTableName : physicalTableNames) {
@@ -216,9 +288,16 @@ public abstract class BaseLogicalTableIntegrationTest 
extends BaseClusterIntegra
     String realtimeTableName =
         
physicalTableNames.stream().filter(TableNameBuilder::isRealtimeTableResource).findFirst().orElse(null);
     LogicalTableConfigBuilder builder =
-        new 
LogicalTableConfigBuilder().setTableName(tableName).setBrokerTenant(brokerTenant)
-            
.setRefOfflineTableName(offlineTableName).setRefRealtimeTableName(realtimeTableName)
+        new LogicalTableConfigBuilder().setTableName(tableName)
+            .setBrokerTenant(brokerTenant)
+            .setRefOfflineTableName(offlineTableName)
+            .setRefRealtimeTableName(realtimeTableName)
             .setPhysicalTableConfigMap(physicalTableConfigMap);
+    if (!getOfflineTableNames().isEmpty() && 
!getRealtimeTableNames().isEmpty()) {
+      builder.setTimeBoundaryConfig(
+          new TimeBoundaryConfig("min", Map.of("includedTables", 
getTimeBoundaryTable()))
+      );
+    }
     return builder.build();
   }
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithOneOfflineOneRealtimeTableIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithOneOfflineOneRealtimeTableIntegrationTest.java
new file mode 100644
index 0000000000..d6f599ce1b
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithOneOfflineOneRealtimeTableIntegrationTest.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.logicaltable;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+
+
+public class LogicalTableWithOneOfflineOneRealtimeTableIntegrationTest extends 
BaseLogicalTableIntegrationTest {
+
+  @Override
+  protected List<String> getOfflineTableNames() {
+    return List.of("o_1");
+  }
+
+  @Override
+  protected List<String> getRealtimeTableNames() {
+    return List.of("r_1");
+  }
+
+  @Override
+  protected Map<String, List<File>> getRealtimeTableDataFiles() {
+    // Overlapping data files for the hybrid table
+    return distributeFilesToTables(getRealtimeTableNames(),
+        _avroFiles.subList(_avroFiles.size() - 4, _avroFiles.size()));
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithOneRealtimeTableIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithOneRealtimeTableIntegrationTest.java
new file mode 100644
index 0000000000..dee7e1f59c
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithOneRealtimeTableIntegrationTest.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.logicaltable;
+
+import java.util.List;
+
+
+public class LogicalTableWithOneRealtimeTableIntegrationTest extends 
BaseLogicalTableIntegrationTest {
+  @Override
+  protected List<String> getRealtimeTableNames() {
+    return List.of("r_1");
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwelveOfflineOneRealtimeTableIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwelveOfflineOneRealtimeTableIntegrationTest.java
new file mode 100644
index 0000000000..96eb11e2fe
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwelveOfflineOneRealtimeTableIntegrationTest.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.logicaltable;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+
+
+public class LogicalTableWithTwelveOfflineOneRealtimeTableIntegrationTest 
extends BaseLogicalTableIntegrationTest {
+  @Override
+  protected List<String> getOfflineTableNames() {
+    return List.of("o_1", "o_2", "o_3", "o_4", "o_5", "o_6", "o_7", "o_8", 
"o_9", "o_10", "o_11", "o_12");
+  }
+
+  @Override
+  protected List<String> getRealtimeTableNames() {
+    return List.of("r_1");
+  }
+
+  @Override
+  protected Map<String, List<File>> getRealtimeTableDataFiles() {
+    // Overlapping data files for the hybrid table
+    return distributeFilesToTables(getRealtimeTableNames(),
+        _avroFiles.subList(_avroFiles.size() - 2, _avroFiles.size()));
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest.java
new file mode 100644
index 0000000000..d406b2ad56
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.logicaltable;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.testng.annotations.Test;
+
+
+public class LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest extends 
BaseLogicalTableIntegrationTest {
+  @Override
+  protected List<String> getOfflineTableNames() {
+    return List.of("o_1", "o_2");
+  }
+
+  @Override
+  protected List<String> getRealtimeTableNames() {
+    return List.of("r_1");
+  }
+
+  @Override
+  protected Map<String, List<File>> getRealtimeTableDataFiles() {
+    // Overlapping data files for the hybrid table
+    return distributeFilesToTables(getRealtimeTableNames(),
+        _avroFiles.subList(_avroFiles.size() - 4, _avroFiles.size()));
+  }
+
+  @Test
+  public void testUpdateLogicalTableTimeBoundary()
+      throws Exception {
+    LogicalTableConfig logicalTableConfig = 
getLogicalTableConfig(getLogicalTableName());
+    updateTimeBoundaryTableInLogicalTable(logicalTableConfig);
+
+    // Wait to ensure logical table config update helix message is processed 
in broker
+    waitForAllDocsLoaded(5_000);
+
+    // Run the tests
+    testGeneratedQueries();
+    testHardcodedQueries();
+    testQueriesFromQueryFile();
+  }
+
+  private void updateTimeBoundaryTableInLogicalTable(LogicalTableConfig 
logicalTableConfig)
+      throws IOException {
+    List<String> includedTables =
+        (List<String>) 
logicalTableConfig.getTimeBoundaryConfig().getParameters().get("includedTables");
+
+    String timeBoundaryTableName = 
TableNameBuilder.extractRawTableName(includedTables.get(0));
+    String newTimeBoundaryTableName = timeBoundaryTableName.equals("o_1") ? 
"o_2" : "o_1";
+    newTimeBoundaryTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(newTimeBoundaryTableName);
+
+    Map<String, Object> parameters = Map.of("includedTables", 
List.of(newTimeBoundaryTableName));
+    logicalTableConfig.getTimeBoundaryConfig().setParameters(parameters);
+
+    updateLogicalTableConfig(logicalTableConfig.getTableName(), 
logicalTableConfig);
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteProvider.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteProvider.java
index 60ddd345ae..fe3937b8b0 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteProvider.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteProvider.java
@@ -26,6 +26,8 @@ import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.core.routing.RoutingManager;
 import org.apache.pinot.core.routing.TimeBoundaryInfo;
 import org.apache.pinot.core.transport.TableRouteInfo;
+import org.apache.pinot.query.timeboundary.TimeBoundaryStrategy;
+import org.apache.pinot.query.timeboundary.TimeBoundaryStrategyService;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.LogicalTableConfig;
@@ -78,6 +80,20 @@ public class LogicalTableRouteProvider implements 
TableRouteProvider {
       routeInfo.setRealtimeTableConfig(realtimeTableConfig);
     }
     routeInfo.setQueryConfig(logicalTable.getQueryConfig());
+
+    TimeBoundaryInfo timeBoundaryInfo;
+    if (!offlineTables.isEmpty() && !realtimeTables.isEmpty()) {
+      String boundaryStrategy = 
logicalTable.getTimeBoundaryConfig().getBoundaryStrategy();
+      TimeBoundaryStrategy timeBoundaryStrategy =
+          
TimeBoundaryStrategyService.getInstance().getTimeBoundaryStrategy(boundaryStrategy);
+      timeBoundaryInfo = 
timeBoundaryStrategy.computeTimeBoundary(logicalTable, tableCache, 
routingManager);
+      if (timeBoundaryInfo == null) {
+        LOGGER.info("No time boundary info found for logical hybrid table: ");
+        routeInfo.setOfflineTables(null);
+      } else {
+        routeInfo.setTimeBoundaryInfo(timeBoundaryInfo);
+      }
+    }
     return routeInfo;
   }
 
@@ -111,17 +127,6 @@ public class LogicalTableRouteProvider implements 
TableRouteProvider {
       }
     }
 
-    TimeBoundaryInfo timeBoundaryInfo = null;
-    if (routeInfo.hasRealtime() && routeInfo.hasOffline()) {
-      timeBoundaryInfo = 
routingManager.getTimeBoundaryInfo(routeInfo.getOfflineTables().get(0).getOfflineTableName());
-      if (timeBoundaryInfo == null) {
-        LOGGER.debug("No time boundary info found for hybrid table: ");
-        routeInfo.setOfflineTables(null);
-      } else {
-        routeInfo.setTimeBoundaryInfo(timeBoundaryInfo);
-      }
-    }
-
     //Set BrokerRequests to NULL if there is no route.
     if (routeInfo.getOfflineExecutionServers().isEmpty()) {
       routeInfo.setOfflineBrokerRequest(null);
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategy.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategy.java
new file mode 100644
index 0000000000..5d3c98596b
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategy.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.timeboundary;
+
+import com.google.auto.service.AutoService;
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.core.routing.RoutingManager;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+
+
+@AutoService(TimeBoundaryStrategy.class)
+public class MinTimeBoundaryStrategy implements TimeBoundaryStrategy {
+
+  @Override
+  public String getName() {
+    return "min";
+  }
+
+  @Override
+  public TimeBoundaryInfo computeTimeBoundary(LogicalTableConfig 
logicalTableConfig, TableCache tableCache,
+      RoutingManager routingManager) {
+    TimeBoundaryInfo minTimeBoundaryInfo = null;
+    long minTimeBoundary = Long.MAX_VALUE;
+    Map<String, Object> parameters = 
logicalTableConfig.getTimeBoundaryConfig().getParameters();
+    List<String> includedTables =
+        parameters != null ? (List) parameters.getOrDefault("includedTables", 
List.of()) : List.of();
+    for (String physicalTableName : includedTables) {
+      TimeBoundaryInfo current = 
routingManager.getTimeBoundaryInfo(physicalTableName);
+      if (current != null) {
+        String rawTableName = 
TableNameBuilder.extractRawTableName(physicalTableName);
+        Schema schema = tableCache.getSchema(rawTableName);
+        TableConfig tableConfig = tableCache.getTableConfig(physicalTableName);
+        Preconditions.checkArgument(tableConfig != null,
+            "Table config not found for table: %s", physicalTableName);
+        Preconditions.checkArgument(schema != null,
+            "Schema not found for table: %s", physicalTableName);
+        String timeColumnName = 
tableConfig.getValidationConfig().getTimeColumnName();
+        DateTimeFieldSpec dateTimeFieldSpec = 
schema.getSpecForTimeColumn(timeColumnName);
+        Preconditions.checkArgument(dateTimeFieldSpec != null,
+            "Time column not found in schema for table: %s", 
physicalTableName);
+        DateTimeFormatSpec specFormatSpec = dateTimeFieldSpec.getFormatSpec();
+        long currentTimeBoundaryMillis = 
specFormatSpec.fromFormatToMillis(current.getTimeValue());
+        if (minTimeBoundaryInfo == null) {
+          minTimeBoundaryInfo = current;
+          minTimeBoundary = currentTimeBoundaryMillis;
+        } else if (minTimeBoundary > currentTimeBoundaryMillis) {
+          minTimeBoundaryInfo = current;
+          minTimeBoundary = currentTimeBoundaryMillis;
+        }
+      }
+    }
+    return minTimeBoundaryInfo;
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategy.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategy.java
new file mode 100644
index 0000000000..c1b97f28c5
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategy.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.timeboundary;
+
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.core.routing.RoutingManager;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+
+
+public interface TimeBoundaryStrategy {
+
+  /**
+   * Returns the time boundary strategy name.
+   *
+   * @return The time boundary strategy name.
+   */
+  String getName();
+
+  /**
+   * Computes the time boundary for the given physical table names.
+   *
+   * @param logicalTableConfig The logical table configuration.
+   * @param tableCache The table cache to use for fetching table metadata.
+   * @param routingManager The routing manager to use for computing the time 
boundary.
+   * @return The computed time boundary information.
+   */
+  TimeBoundaryInfo computeTimeBoundary(LogicalTableConfig logicalTableConfig, 
TableCache tableCache,
+      RoutingManager routingManager);
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategyService.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategyService.java
new file mode 100644
index 0000000000..0e3f6af4aa
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategyService.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.timeboundary;
+
+import java.util.Map;
+import java.util.ServiceLoader;
+
+
+public class TimeBoundaryStrategyService {
+
+  private static volatile TimeBoundaryStrategyService _instance = 
fromServiceLoader();
+  private final Map<String, TimeBoundaryStrategy> _strategyMap;
+
+  private TimeBoundaryStrategyService(Map<String, TimeBoundaryStrategy> 
strategyMap) {
+    _strategyMap = strategyMap;
+  }
+
+  public static TimeBoundaryStrategyService fromServiceLoader() {
+    Map<String, TimeBoundaryStrategy> strategyMap = new java.util.HashMap<>();
+    for (TimeBoundaryStrategy strategy : 
ServiceLoader.load(TimeBoundaryStrategy.class)) {
+      String strategyName = strategy.getName();
+      if (strategyMap.containsKey(strategyName)) {
+        throw new IllegalStateException("Duplicate TimeBoundaryStrategy found: 
" + strategyName);
+      }
+      strategyMap.put(strategyName, strategy);
+    }
+    return new TimeBoundaryStrategyService(strategyMap);
+  }
+
+  /**
+   * Returns the singleton instance of the TimeBoundaryStrategyService.
+   *
+   * @return The singleton instance of the TimeBoundaryStrategyService.
+   */
+  public static TimeBoundaryStrategyService getInstance() {
+    return _instance;
+  }
+
+  /**
+   * Sets the singleton instance of the TimeBoundaryStrategyService.
+   *
+   * @param service The new instance to set.
+   */
+  public static void setInstance(TimeBoundaryStrategyService service) {
+    _instance = service;
+  }
+
+  public TimeBoundaryStrategy getTimeBoundaryStrategy(String name) {
+    TimeBoundaryStrategy strategy = _instance._strategyMap.get(name);
+    if (strategy == null) {
+      throw new IllegalArgumentException("No TimeBoundaryStrategy found for 
name: " + name);
+    }
+    return strategy;
+  }
+}
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/BaseTableRouteTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/BaseTableRouteTest.java
index dd8d206b5a..1c08dca4ae 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/BaseTableRouteTest.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/BaseTableRouteTest.java
@@ -31,22 +31,31 @@ import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.PinotQuery;
 import org.apache.pinot.core.routing.RoutingManager;
 import org.apache.pinot.core.routing.ServerRouteInfo;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.core.transport.TableRouteInfo;
 import org.apache.pinot.query.testutils.MockRoutingManagerFactory;
+import org.apache.pinot.query.timeboundary.TimeBoundaryStrategy;
+import org.apache.pinot.query.timeboundary.TimeBoundaryStrategyService;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.data.PhysicalTableConfig;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.TimeBoundaryConfig;
 import org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
 import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.mockito.MockedStatic;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
@@ -81,6 +90,7 @@ public class BaseTableRouteTest {
 
   public static final Map<String, Schema> TABLE_SCHEMAS = new HashMap<>();
   private static final Set<String> DISABLED_TABLES = new HashSet<>();
+
   static {
     TABLE_SCHEMAS.put("a_REALTIME", getSchemaBuilder("a").build());
     TABLE_SCHEMAS.put("b_OFFLINE", getSchemaBuilder("b").build());
@@ -125,7 +135,8 @@ public class BaseTableRouteTest {
   TableCache _tableCache;
   ImplicitHybridTableRouteProvider _hybridTableRouteProvider;
   LogicalTableRouteProvider _logicalTableRouteProvider;
-
+  TimeBoundaryStrategy _timeBoundaryStrategy;
+  MockedStatic<TimeBoundaryStrategyService> 
_timeBoundaryStrategyFactoryMockedStatic;
 
   @BeforeClass
   public void setUp() {
@@ -154,6 +165,17 @@ public class BaseTableRouteTest {
     _tableCache = factory.buildTableCache();
     _hybridTableRouteProvider = new ImplicitHybridTableRouteProvider();
     _logicalTableRouteProvider = new LogicalTableRouteProvider();
+    _timeBoundaryStrategyFactoryMockedStatic = 
mockStatic(TimeBoundaryStrategyService.class);
+    _timeBoundaryStrategy = mock(TimeBoundaryStrategy.class);
+    TimeBoundaryStrategyService mockService = 
mock(TimeBoundaryStrategyService.class);
+    when(TimeBoundaryStrategyService.getInstance()).thenReturn(mockService);
+    
when(mockService.getTimeBoundaryStrategy(any())).thenReturn(_timeBoundaryStrategy);
+    when(_timeBoundaryStrategy.computeTimeBoundary(any(), any(), 
any())).thenReturn(mock(TimeBoundaryInfo.class));
+  }
+
+  @AfterClass
+  public void tearDown() {
+    _timeBoundaryStrategyFactoryMockedStatic.close();
   }
 
   @DataProvider(name = "offlineTableProvider")
@@ -413,6 +435,8 @@ public class BaseTableRouteTest {
 
     builder.setPhysicalTableConfigMap(tableConfigMap);
     builder.setBrokerTenant("brokerTenant");
+    builder.setTimeBoundaryConfig(
+        new TimeBoundaryConfig("min", Map.of("includedTables", 
List.of("randomTable_OFFLINE"))));
     LogicalTableConfig logicalTable = builder.build();
     
when(_tableCache.getLogicalTableConfig(eq(logicalTableName))).thenReturn(logicalTable);
 
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/LogicalTableRouteProviderCalculateRouteTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/LogicalTableRouteProviderCalculateRouteTest.java
index f7280b3319..f76a3bfdb9 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/LogicalTableRouteProviderCalculateRouteTest.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/LogicalTableRouteProviderCalculateRouteTest.java
@@ -33,6 +33,7 @@ import org.testng.annotations.Test;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 
 
@@ -95,6 +96,12 @@ public class LogicalTableRouteProviderCalculateRouteTest 
extends BaseTableRouteT
       assertNotNull(requestMap);
       assertFalse(requestMap.isEmpty());
     }
+
+    if (routeInfo.isHybrid()) {
+      assertNotNull(routeInfo.getTimeBoundaryInfo(), "Time boundary info 
should not be null for hybrid table");
+    } else {
+      assertNull(routeInfo.getTimeBoundaryInfo(), "Time boundary info should 
be null for non-hybrid table");
+    }
   }
 
   @Test(dataProvider = "offlineTableAndRouteProvider")
@@ -107,6 +114,13 @@ public class LogicalTableRouteProviderCalculateRouteTest 
extends BaseTableRouteT
     assertTableRoute(tableName, "realtimeTableAndRouteProvider", null, 
expectedRoutingTable, false, true);
   }
 
+  @Test(dataProvider = "hybridTableAndRouteProvider")
+  void testHybridTableRoute(String tableName, Map<String, Set<String>> 
expectedOfflineRoutingTable,
+      Map<String, Set<String>> expectedRealtimeRoutingTable) {
+    assertTableRoute(tableName, "hybridTableAndRouteProvider", 
expectedOfflineRoutingTable,
+        expectedRealtimeRoutingTable, expectedOfflineRoutingTable != null, 
expectedRealtimeRoutingTable != null);
+  }
+
   @Test(dataProvider = "routeNotExistsProvider")
   void testRouteNotExists(String tableName) {
     assertTableRoute(tableName, "routeNotExistsProvider", null, null, false, 
false);
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/LogicalTableRouteProviderGetRouteTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/LogicalTableRouteProviderGetRouteTest.java
index e4417a9daa..d9d0a9fa51 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/LogicalTableRouteProviderGetRouteTest.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/LogicalTableRouteProviderGetRouteTest.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import org.apache.pinot.core.transport.TableRouteInfo;
 import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.data.PhysicalTableConfig;
+import org.apache.pinot.spi.data.TimeBoundaryConfig;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
@@ -68,6 +69,7 @@ public class LogicalTableRouteProviderGetRouteTest extends 
BaseTableRouteTest {
     TableRouteInfo routeInfo = getLogicalTableRouteInfo(tableName, 
"testOfflineTable");
     assertTrue(routeInfo.isExists(), "The table should exist");
     assertTrue(routeInfo.isOffline(), "The table should be offline");
+    assertNull(routeInfo.getTimeBoundaryInfo(), "The table should not have 
time boundary info");
   }
 
   @Test(dataProvider = "realtimeTableProvider")
@@ -75,6 +77,7 @@ public class LogicalTableRouteProviderGetRouteTest extends 
BaseTableRouteTest {
     TableRouteInfo routeInfo = getLogicalTableRouteInfo(tableName, 
"testRealtimeTable");
     assertTrue(routeInfo.isExists(), "The table should exist");
     assertTrue(routeInfo.isRealtime(), "The table should be realtime");
+    assertNull(routeInfo.getTimeBoundaryInfo(), "The table should not have 
time boundary info");
   }
 
   @Test(dataProvider = "hybridTableProvider")
@@ -82,6 +85,7 @@ public class LogicalTableRouteProviderGetRouteTest extends 
BaseTableRouteTest {
     TableRouteInfo routeInfo = getLogicalTableRouteInfo(tableName, 
"testHybridTable");
     assertTrue(routeInfo.isExists(), "The table should exist");
     assertTrue(routeInfo.isHybrid(), "The table should be hybrid");
+    assertNotNull(routeInfo.getTimeBoundaryInfo(), "The table should have time 
boundary info");
   }
 
   @Test(dataProvider = "routeExistsProvider")
@@ -216,6 +220,7 @@ public class LogicalTableRouteProviderGetRouteTest extends 
BaseTableRouteTest {
     assertTrue(routeInfo.isRouteExists());
     assertFalse(routeInfo.isDisabled());
     assertNull(routeInfo.getDisabledTableNames());
+    assertNotNull(routeInfo.getTimeBoundaryInfo());
   }
 
   @Test(dataProvider = "disabledTableProvider")
@@ -259,6 +264,7 @@ public class LogicalTableRouteProviderGetRouteTest extends 
BaseTableRouteTest {
     }
     logicalTable.setPhysicalTableConfigMap(tableConfigMap);
     logicalTable.setBrokerTenant("brokerTenant");
+    logicalTable.setTimeBoundaryConfig(new TimeBoundaryConfig("min", 
Map.of("includedTables", physicalTableNames)));
     
when(_tableCache.getLogicalTableConfig(eq(logicalTableName))).thenReturn(logicalTable);
 
     TableRouteInfo routeInfo =
@@ -297,6 +303,7 @@ public class LogicalTableRouteProviderGetRouteTest extends 
BaseTableRouteTest {
     }
     logicalTable.setPhysicalTableConfigMap(tableConfigMap);
     logicalTable.setBrokerTenant("brokerTenant");
+    logicalTable.setTimeBoundaryConfig(new TimeBoundaryConfig("min", 
Map.of("includedTables", physicalTableNames)));
     
when(_tableCache.getLogicalTableConfig(eq(logicalTableName))).thenReturn(logicalTable);
 
     TableRouteInfo routeInfo =
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategyTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategyTest.java
new file mode 100644
index 0000000000..464ff805d9
--- /dev/null
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategyTest.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.timeboundary;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.core.routing.RoutingManager;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
+import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.TimeBoundaryConfig;
+import org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertSame;
+
+
+public class MinTimeBoundaryStrategyTest {
+
+  TableCache _mockTableCache;
+  RoutingManager _mockRoutingManager;
+  TimeBoundaryStrategy _minTimeBoundaryStrategy = new 
MinTimeBoundaryStrategy();
+
+  public void setupMocks(Map<String, TimeBoundaryInfo> data) {
+    for (String tableName : data.keySet()) {
+      if (TableNameBuilder.isRealtimeTableResource(tableName)) {
+        continue;
+      }
+      TimeBoundaryInfo timeBoundaryInfo = data.get(tableName);
+      
when(_mockRoutingManager.getTimeBoundaryInfo(tableName)).thenReturn(timeBoundaryInfo);
+      Schema schema = mock(Schema.class);
+      TableConfig tableConfig = mock(TableConfig.class);
+      SegmentsValidationAndRetentionConfig validationMock = 
mock(SegmentsValidationAndRetentionConfig.class);
+      DateTimeFieldSpec dateTimeFieldSpec = mock(DateTimeFieldSpec.class);
+      DateTimeFormatSpec dateTimeFormatSpec = mock(DateTimeFormatSpec.class);
+
+      
when(_mockTableCache.getSchema(TableNameBuilder.extractRawTableName(tableName))).thenReturn(schema);
+      when(_mockTableCache.getTableConfig(tableName)).thenReturn(tableConfig);
+      when(tableConfig.getValidationConfig()).thenReturn(validationMock);
+      
when(validationMock.getTimeColumnName()).thenReturn(timeBoundaryInfo.getTimeColumn());
+      
when(schema.getSpecForTimeColumn(timeBoundaryInfo.getTimeColumn())).thenReturn(dateTimeFieldSpec);
+      when(dateTimeFieldSpec.getFormatSpec()).thenReturn(dateTimeFormatSpec);
+      
when(dateTimeFormatSpec.fromFormatToMillis(any())).thenReturn(Long.valueOf(timeBoundaryInfo.getTimeValue()));
+    }
+  }
+
+
+  @DataProvider
+  public Object[][] timeBoundaryData() {
+    Map<String, TimeBoundaryInfo> timeBoundaryInfoMap = Map.of(
+        "table1_OFFLINE", new TimeBoundaryInfo("timeColumn1", "1747134822000"),
+        "table2_OFFLINE", new TimeBoundaryInfo("timeColumn2", "1747134844000"),
+        "table3_OFFLINE", new TimeBoundaryInfo("timeColumn3", "1747134866000"),
+        "table4_OFFLINE", new TimeBoundaryInfo("timeColumn4", "1747134888000"),
+        "table5_REALTIME", new TimeBoundaryInfo("timeColumn5", "1747134900000")
+    );
+
+    return new Object[][]{
+        {timeBoundaryInfoMap, List.of("table3_OFFLINE"), "table3_OFFLINE"},
+        {timeBoundaryInfoMap, List.of("Invalid_OFFLINE"), "Invalid_OFFLINE"},
+        {timeBoundaryInfoMap, List.of("table2_OFFLINE", "table3_OFFLINE"), 
"table2_OFFLINE"},
+        {timeBoundaryInfoMap, List.of("table3_OFFLINE", "table2_OFFLINE", 
"table4_OFFLINE"), "table2_OFFLINE"},
+        {timeBoundaryInfoMap, List.of(), "empty_includedTables_OFFLINE"}
+    };
+  }
+
+
+  @Test(dataProvider = "timeBoundaryData")
+  public void testComputeTimeBoundary(Map<String, TimeBoundaryInfo> 
timeBoundaryInfoMap,
+      List<String> includedTables, String expectedTableName) {
+    Map<String, Object> parameters = Map.of("includedTables", includedTables);
+    testComputeTimeBoundary(timeBoundaryInfoMap, expectedTableName, 
parameters);
+  }
+
+  private void testComputeTimeBoundary(Map<String, TimeBoundaryInfo> 
timeBoundaryInfoMap, String expectedTableName,
+      Map<String, Object> parameters) {
+    setupMocks(timeBoundaryInfoMap);
+    TimeBoundaryInfo timeBoundaryInfo = 
_minTimeBoundaryStrategy.computeTimeBoundary(
+        createLogicalTableConfig(parameters), _mockTableCache, 
_mockRoutingManager);
+    assertSame(timeBoundaryInfo, timeBoundaryInfoMap.get(expectedTableName));
+  }
+
+  @BeforeMethod
+  public void setUp() {
+    _mockTableCache = mock(TableCache.class);
+    _mockRoutingManager = mock(RoutingManager.class);
+  }
+
+  private LogicalTableConfig createLogicalTableConfig(Map<String, Object> 
parameters) {
+    LogicalTableConfigBuilder builder = new LogicalTableConfigBuilder()
+        .setTableName("logical_table")
+        .setTimeBoundaryConfig(new TimeBoundaryConfig("min", parameters));
+    return builder.build();
+  }
+}
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategyServiceTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategyServiceTest.java
new file mode 100644
index 0000000000..05993a437f
--- /dev/null
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategyServiceTest.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.timeboundary;
+
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertTrue;
+
+
+public class TimeBoundaryStrategyServiceTest {
+
+  @Test(expectedExceptions = IllegalArgumentException.class,
+        expectedExceptionsMessageRegExp = "No TimeBoundaryStrategy found for 
name: invalidStrategy")
+  public void testInvalidTimeBoundaryStrategy() {
+    
TimeBoundaryStrategyService.getInstance().getTimeBoundaryStrategy("invalidStrategy");
+  }
+
+  @Test
+  public void testMinTimeBoundaryStrategy() {
+    TimeBoundaryStrategy timeBoundaryStrategy = 
TimeBoundaryStrategyService.getInstance()
+        .getTimeBoundaryStrategy("min");
+    assertTrue(timeBoundaryStrategy instanceof MinTimeBoundaryStrategy, 
"Expected MinTimeBoundaryStrategy instance");
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java
index 2a427ce9dc..2c52148098 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java
@@ -42,6 +42,7 @@ public class LogicalTableConfig extends BaseJsonConfig {
   public static final String QUOTA_CONFIG_KEY = "quota";
   public static final String REF_OFFLINE_TABLE_NAME_KEY = 
"refOfflineTableName";
   public static final String REF_REALTIME_TABLE_NAME_KEY = 
"refRealtimeTableName";
+  public static final String TIME_BOUNDARY_CONFIG_KEY = "timeBoundaryConfig";
 
   private String _tableName;
   private String _brokerTenant;
@@ -52,6 +53,7 @@ public class LogicalTableConfig extends BaseJsonConfig {
   private QuotaConfig _quotaConfig;
   private String _refOfflineTableName;
   private String _refRealtimeTableName;
+  private TimeBoundaryConfig _timeBoundaryConfig;
 
   public static LogicalTableConfig fromString(String logicalTableString)
       throws IOException {
@@ -119,6 +121,14 @@ public class LogicalTableConfig extends BaseJsonConfig {
     _refRealtimeTableName = refRealtimeTableName;
   }
 
+  public TimeBoundaryConfig getTimeBoundaryConfig() {
+    return _timeBoundaryConfig;
+  }
+
+  public void setTimeBoundaryConfig(TimeBoundaryConfig timeBoundaryConfig) {
+    _timeBoundaryConfig = timeBoundaryConfig;
+  }
+
   private JsonNode toJsonObject() {
     return DEFAULT_MAPPER.valueToTree(this);
   }
@@ -141,6 +151,10 @@ public class LogicalTableConfig extends BaseJsonConfig {
     }
   }
 
+  public boolean isHybridLogicalTable() {
+    return _refOfflineTableName != null && _refRealtimeTableName != null;
+  }
+
   @Override
   public String toString() {
     return toSingleLineJsonString();
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/TimeBoundaryConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/TimeBoundaryConfig.java
new file mode 100644
index 0000000000..fda81adac1
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/TimeBoundaryConfig.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data;
+
+import java.util.Map;
+import org.apache.pinot.spi.config.BaseJsonConfig;
+
+
+public class TimeBoundaryConfig extends BaseJsonConfig {
+  private String _boundaryStrategy;
+  private Map<String, Object> _parameters;
+
+  public TimeBoundaryConfig() {
+  }
+
+  public TimeBoundaryConfig(String boundaryStrategy, Map<String, Object> 
parameters) {
+    _boundaryStrategy = boundaryStrategy;
+    _parameters = parameters;
+  }
+
+  public String getBoundaryStrategy() {
+    return _boundaryStrategy;
+  }
+
+  public void setBoundaryStrategy(String boundaryStrategy) {
+    _boundaryStrategy = boundaryStrategy;
+  }
+
+  public Map<String, Object> getParameters() {
+    return _parameters;
+  }
+
+  public void setParameters(Map<String, Object> parameters) {
+    _parameters = parameters;
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
index eb1b7d3e17..b7c0692792 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
@@ -424,6 +424,10 @@ public class ControllerRequestURLBuilder {
     return StringUtil.join("/", _baseUrl, "segments", tableName, 
encode(segmentName), "metadata");
   }
 
+  public String forSegmentMetadata(String tableName, TableType tableType) {
+    return StringUtil.join("/", _baseUrl, "segments", tableName, "metadata") + 
"?type=" + tableType.name();
+  }
+
   public String forListAllSegmentLineages(String tableName, String tableType) {
     return StringUtil.join("/", _baseUrl, "segments", tableName, 
"lineage?type=" + tableType);
   }
@@ -653,4 +657,8 @@ public class ControllerRequestURLBuilder {
   public String forLogicalTableDelete(String logicalTableName) {
     return StringUtil.join("/", _baseUrl, "logicalTables", logicalTableName);
   }
+
+  public String forTableTimeBoundary(String tableName) {
+    return StringUtil.join("/", _baseUrl, "tables", tableName, "timeBoundary");
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/LogicalTableConfigBuilder.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/LogicalTableConfigBuilder.java
index 54ef55d1f0..1b09bc5993 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/LogicalTableConfigBuilder.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/LogicalTableConfigBuilder.java
@@ -23,6 +23,7 @@ import org.apache.pinot.spi.config.table.QueryConfig;
 import org.apache.pinot.spi.config.table.QuotaConfig;
 import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.data.PhysicalTableConfig;
+import org.apache.pinot.spi.data.TimeBoundaryConfig;
 
 
 public class LogicalTableConfigBuilder {
@@ -33,6 +34,7 @@ public class LogicalTableConfigBuilder {
   private QuotaConfig _quotaConfig;
   private String _refOfflineTableName;
   private String _refRealtimeTableName;
+  private TimeBoundaryConfig _timeBoundaryConfig;
 
 
   public LogicalTableConfigBuilder setTableName(String tableName) {
@@ -70,6 +72,11 @@ public class LogicalTableConfigBuilder {
     return this;
   }
 
+  public LogicalTableConfigBuilder setTimeBoundaryConfig(TimeBoundaryConfig 
timeBoundaryConfig) {
+    _timeBoundaryConfig = timeBoundaryConfig;
+    return this;
+  }
+
   public LogicalTableConfig build() {
     LogicalTableConfig logicalTableConfig = new LogicalTableConfig();
     logicalTableConfig.setTableName(_tableName);
@@ -79,6 +86,7 @@ public class LogicalTableConfigBuilder {
     logicalTableConfig.setQuotaConfig(_quotaConfig);
     logicalTableConfig.setRefOfflineTableName(_refOfflineTableName);
     logicalTableConfig.setRefRealtimeTableName(_refRealtimeTableName);
+    logicalTableConfig.setTimeBoundaryConfig(_timeBoundaryConfig);
     return logicalTableConfig;
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to