This is an automated email from the ASF dual-hosted git repository. manishswaminathan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new c9f0c47d0a Logical table time boundary (#15776) c9f0c47d0a is described below commit c9f0c47d0ad96607760b706a79802d1598222ef3 Author: Abhishek Bafna <aba...@startree.ai> AuthorDate: Tue May 20 11:55:29 2025 +0530 Logical table time boundary (#15776) * Time boundary computation for SSE. * Fix checkstyle * Fix checkstyle * Enhancing validations and tests for time boundary. * Addressing review comments. * Addressing review comments. * Addressing review comments. --------- Co-authored-by: abhishekbafna <abhishek.ba...@startree.ai> --- ...okerResourceOnlineOfflineStateModelFactory.java | 22 ++-- .../BrokerUserDefinedMessageHandlerFactory.java | 27 +++++ .../pinot/broker/routing/BrokerRoutingManager.java | 71 ++++++++++-- .../messages/LogicalTableConfigRefreshMessage.java | 65 +++++++++++ .../pinot/common/metadata/ZKMetadataProvider.java | 4 + .../common/utils/LogicalTableConfigUtils.java | 29 +++++ .../helix/core/PinotHelixResourceManager.java | 31 +++++ .../resources/PinotLogicalTableResourceTest.java | 54 +++++++++ .../pinot/controller/helix/ControllerTest.java | 2 + .../BaseLogicalTableIntegrationTest.java | 127 +++++++++++++++++---- ...hOneOfflineOneRealtimeTableIntegrationTest.java | 44 +++++++ ...alTableWithOneRealtimeTableIntegrationTest.java | 29 +++++ ...elveOfflineOneRealtimeTableIntegrationTest.java | 43 +++++++ ...hTwoOfflineOneRealtimeTableIntegrationTest.java | 77 +++++++++++++ .../routing/table/LogicalTableRouteProvider.java | 27 +++-- .../timeboundary/MinTimeBoundaryStrategy.java | 79 +++++++++++++ .../query/timeboundary/TimeBoundaryStrategy.java | 46 ++++++++ .../timeboundary/TimeBoundaryStrategyService.java | 71 ++++++++++++ .../query/routing/table/BaseTableRouteTest.java | 26 ++++- ...ogicalTableRouteProviderCalculateRouteTest.java | 14 +++ .../LogicalTableRouteProviderGetRouteTest.java | 7 ++ .../timeboundary/MinTimeBoundaryStrategyTest.java | 122 ++++++++++++++++++++ .../TimeBoundaryStrategyServiceTest.java | 40 +++++++ .../apache/pinot/spi/data/LogicalTableConfig.java | 14 +++ .../apache/pinot/spi/data/TimeBoundaryConfig.java | 52 +++++++++ .../utils/builder/ControllerRequestURLBuilder.java | 8 ++ .../utils/builder/LogicalTableConfigBuilder.java | 8 ++ 27 files changed, 1087 insertions(+), 52 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java index 41f9e9ef87..dcf8a667e1 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java @@ -75,18 +75,22 @@ public class BrokerResourceOnlineOfflineStateModelFactory extends StateModelFact @Transition(from = "OFFLINE", to = "ONLINE") public void onBecomeOnlineFromOffline(Message message, NotificationContext context) { - String tableNameWithType = message.getPartitionName(); - LOGGER.info("Processing transition from OFFLINE to ONLINE for table: {}", tableNameWithType); + String physicalOrLogicalTable = message.getPartitionName(); + LOGGER.info("Processing transition from OFFLINE to ONLINE for table: {}", physicalOrLogicalTable); try { - _routingManager.buildRouting(tableNameWithType); - TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType); - _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, - _helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(BROKER_RESOURCE_INSTANCE))); - _queryQuotaManager.createDatabaseRateLimiter( - DatabaseUtils.extractDatabaseFromFullyQualifiedTableName(tableNameWithType)); + if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, physicalOrLogicalTable)) { + _routingManager.buildRoutingForLogicalTable(physicalOrLogicalTable); + } else { + _routingManager.buildRouting(physicalOrLogicalTable); + TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, physicalOrLogicalTable); + _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, + _helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(BROKER_RESOURCE_INSTANCE))); + _queryQuotaManager.createDatabaseRateLimiter( + DatabaseUtils.extractDatabaseFromFullyQualifiedTableName(physicalOrLogicalTable)); + } } catch (Exception e) { LOGGER.error("Caught exception while processing transition from OFFLINE to ONLINE for table: {}", - tableNameWithType, e); + physicalOrLogicalTable, e); throw e; } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java index f4da13621e..033a126ea6 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java @@ -27,6 +27,7 @@ import org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManage import org.apache.pinot.broker.routing.BrokerRoutingManager; import org.apache.pinot.common.messages.ApplicationQpsQuotaRefreshMessage; import org.apache.pinot.common.messages.DatabaseConfigRefreshMessage; +import org.apache.pinot.common.messages.LogicalTableConfigRefreshMessage; import org.apache.pinot.common.messages.RoutingTableRebuildMessage; import org.apache.pinot.common.messages.SegmentRefreshMessage; import org.apache.pinot.common.messages.TableConfigRefreshMessage; @@ -62,6 +63,8 @@ public class BrokerUserDefinedMessageHandlerFactory implements MessageHandlerFac return new RefreshSegmentMessageHandler(new SegmentRefreshMessage(message), context); case TableConfigRefreshMessage.REFRESH_TABLE_CONFIG_MSG_SUB_TYPE: return new RefreshTableConfigMessageHandler(new TableConfigRefreshMessage(message), context); + case LogicalTableConfigRefreshMessage.REFRESH_LOGICAL_TABLE_CONFIG_MSG_SUB_TYPE: + return new RefreshLogicalTableConfigMessageHandler(new LogicalTableConfigRefreshMessage(message), context); case RoutingTableRebuildMessage.REBUILD_ROUTING_TABLE_MSG_SUB_TYPE: return new RebuildRoutingTableMessageHandler(new RoutingTableRebuildMessage(message), context); case DatabaseConfigRefreshMessage.REFRESH_DATABASE_CONFIG_MSG_SUB_TYPE: @@ -139,6 +142,30 @@ public class BrokerUserDefinedMessageHandlerFactory implements MessageHandlerFac } } + private class RefreshLogicalTableConfigMessageHandler extends MessageHandler { + final String _logicalTableName; + + RefreshLogicalTableConfigMessageHandler(LogicalTableConfigRefreshMessage refreshMessage, + NotificationContext context) { + super(refreshMessage, context); + _logicalTableName = refreshMessage.getLogicalTableName(); + } + + @Override + public HelixTaskResult handleMessage() { + _routingManager.buildRoutingForLogicalTable(_logicalTableName); + HelixTaskResult result = new HelixTaskResult(); + result.setSuccess(true); + return result; + } + + @Override + public void onError(Exception e, ErrorCode code, ErrorType type) { + LOGGER.error("Got error while refreshing logical table config for table: {} (error code: {}, error type: {})", + _logicalTableName, code, type, e); + } + } + private class RefreshDatabaseConfigMessageHandler extends MessageHandler { final String _databaseName; diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java index 9c54f57618..801b39055b 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java @@ -70,6 +70,8 @@ import org.apache.pinot.spi.config.table.ColumnPartitionConfig; import org.apache.pinot.spi.config.table.QueryConfig; import org.apache.pinot.spi.config.table.SegmentPartitionConfig; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.LogicalTableConfig; +import org.apache.pinot.spi.data.TimeBoundaryConfig; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Helix; @@ -418,17 +420,72 @@ public class BrokerRoutingManager implements RoutingManager, ClusterChangeHandle } /** - * Builds/rebuilds the routing for the physical table, for logical tables it is skipped. - * @param physicalOrLogicalTable a physical table with type or logical table name + * Builds the routing for a logical table. This method is called when a logical table is created or updated. + * @param logicalTableName the name of the logical table */ - public synchronized void buildRouting(String physicalOrLogicalTable) { - // skip route building for logical tables - if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, physicalOrLogicalTable)) { - LOGGER.info("Skipping route building for logical table: {}", physicalOrLogicalTable); + public synchronized void buildRoutingForLogicalTable(String logicalTableName) { + LogicalTableConfig logicalTableConfig = ZKMetadataProvider.getLogicalTableConfig(_propertyStore, logicalTableName); + Preconditions.checkState(logicalTableConfig != null, "Failed to find logical table config for: %s", + logicalTableConfig); + if (!logicalTableConfig.isHybridLogicalTable()) { + LOGGER.info("Skip time boundary manager setting for non hybrid logical table: {}", logicalTableName); return; } - String tableNameWithType = physicalOrLogicalTable; + LOGGER.info("Setting time boundary manager for logical table: {}", logicalTableName); + + TimeBoundaryConfig timeBoundaryConfig = logicalTableConfig.getTimeBoundaryConfig(); + Preconditions.checkArgument(timeBoundaryConfig.getBoundaryStrategy().equals("min"), + "Invalid time boundary strategy: %s", timeBoundaryConfig.getBoundaryStrategy()); + List<String> includedTables = + (List<String>) timeBoundaryConfig.getParameters().getOrDefault("includedTables", List.of()); + + for (String tableNameWithType : includedTables) { + Preconditions.checkArgument(TableNameBuilder.isOfflineTableResource(tableNameWithType), + "Invalid table in the time boundary config: %s", tableNameWithType); + try { + // build routing if it does not exist for the offline table + if (!_routingEntryMap.containsKey(tableNameWithType)) { + buildRouting(tableNameWithType); + } + + if (_routingEntryMap.get(tableNameWithType).getTimeBoundaryManager() != null) { + LOGGER.info("Skip time boundary manager init for table: {}", tableNameWithType); + continue; + } + + // init time boundary manager for the table + TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType); + Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", tableNameWithType); + + String idealStatePath = getIdealStatePath(tableNameWithType); + IdealState idealState = getIdealState(idealStatePath); + Preconditions.checkState(idealState != null, "Failed to find ideal state for table: %s", tableNameWithType); + + String externalViewPath = getExternalViewPath(tableNameWithType); + ExternalView externalView = getExternalView(externalViewPath); + + Set<String> onlineSegments = getOnlineSegments(idealState); + SegmentPreSelector segmentPreSelector = + SegmentPreSelectorFactory.getSegmentPreSelector(tableConfig, _propertyStore); + Set<String> preSelectedOnlineSegments = segmentPreSelector.preSelect(onlineSegments); + + TimeBoundaryManager timeBoundaryManager = new TimeBoundaryManager(tableConfig, _propertyStore, _brokerMetrics); + timeBoundaryManager.init(idealState, externalView, preSelectedOnlineSegments); + + _routingEntryMap.get(tableNameWithType).setTimeBoundaryManager(timeBoundaryManager); + } catch (Exception e) { + LOGGER.error("Caught unexpected exception while setting time boundary manager for table: {}", tableNameWithType, + e); + } + } + } + + /** + * Builds the routing for a table. + * @param tableNameWithType the name of the table + */ + public synchronized void buildRouting(String tableNameWithType) { LOGGER.info("Building routing for table: {}", tableNameWithType); TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/messages/LogicalTableConfigRefreshMessage.java b/pinot-common/src/main/java/org/apache/pinot/common/messages/LogicalTableConfigRefreshMessage.java new file mode 100644 index 0000000000..dc16baa4ee --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/messages/LogicalTableConfigRefreshMessage.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.messages; + +import java.util.UUID; +import org.apache.helix.model.Message; +import org.apache.helix.zookeeper.datamodel.ZNRecord; + + +/** + * This (Helix) message is sent from the controller to brokers when a request is received to update the logical table + * config. + * + * NOTE: We keep the table name as a separate key instead of using the Helix PARTITION_NAME so that this message can be + * used for any resource. + */ +public class LogicalTableConfigRefreshMessage extends Message { + public static final String REFRESH_LOGICAL_TABLE_CONFIG_MSG_SUB_TYPE = "REFRESH_LOGICAL_TABLE_CONFIG"; + + private static final String TABLE_NAME_KEY = "logicalTableName"; + + /** + * Constructor for the sender. + */ + public LogicalTableConfigRefreshMessage(String logicalTableName) { + super(MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString()); + setMsgSubType(REFRESH_LOGICAL_TABLE_CONFIG_MSG_SUB_TYPE); + // Give it infinite time to process the message, as long as session is alive + setExecutionTimeout(-1); + // Set the Pinot specific fields + // NOTE: DO NOT use Helix field "PARTITION_NAME" because it can be overridden by Helix while sending the message + ZNRecord znRecord = getRecord(); + znRecord.setSimpleField(TABLE_NAME_KEY, logicalTableName); + } + + /** + * Constructor for the receiver. + */ + public LogicalTableConfigRefreshMessage(Message message) { + super(message.getRecord()); + if (!message.getMsgSubType().equals(REFRESH_LOGICAL_TABLE_CONFIG_MSG_SUB_TYPE)) { + throw new IllegalArgumentException("Invalid message subtype:" + message.getMsgSubType()); + } + } + + public String getLogicalTableName() { + return getRecord().getSimpleField(TABLE_NAME_KEY); + } +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java index 829a273f2d..029f5c3491 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java @@ -883,4 +883,8 @@ public class ZKMetadataProvider { public static boolean isLogicalTableExists(ZkHelixPropertyStore<ZNRecord> propertyStore, String tableName) { return propertyStore.exists(constructPropertyStorePathForLogical(tableName), AccessOption.PERSISTENT); } + + public static boolean isTableConfigExists(ZkHelixPropertyStore<ZNRecord> propertyStore, String tableName) { + return propertyStore.exists(constructPropertyStorePathForResourceConfig(tableName), AccessOption.PERSISTENT); + } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java index d9b5e69b1e..42d5e43f9f 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java @@ -33,6 +33,7 @@ import org.apache.pinot.spi.config.table.QueryConfig; import org.apache.pinot.spi.config.table.QuotaConfig; import org.apache.pinot.spi.data.LogicalTableConfig; import org.apache.pinot.spi.data.PhysicalTableConfig; +import org.apache.pinot.spi.data.TimeBoundaryConfig; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; @@ -64,6 +65,10 @@ public class LogicalTableConfigUtils { if (record.getSimpleField(LogicalTableConfig.REF_REALTIME_TABLE_NAME_KEY) != null) { builder.setRefRealtimeTableName(record.getSimpleField(LogicalTableConfig.REF_REALTIME_TABLE_NAME_KEY)); } + String timeBoundaryConfigJson = record.getSimpleField(LogicalTableConfig.TIME_BOUNDARY_CONFIG_KEY); + if (timeBoundaryConfigJson != null) { + builder.setTimeBoundaryConfig(JsonUtils.stringToObject(timeBoundaryConfigJson, TimeBoundaryConfig.class)); + } Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>(); for (Map.Entry<String, String> entry : record.getMapField(LogicalTableConfig.PHYSICAL_TABLE_CONFIG_KEY) @@ -105,6 +110,10 @@ public class LogicalTableConfigUtils { record.setSimpleField(LogicalTableConfig.REF_REALTIME_TABLE_NAME_KEY, logicalTableConfig.getRefRealtimeTableName()); } + if (logicalTableConfig.getTimeBoundaryConfig() != null) { + record.setSimpleField(LogicalTableConfig.TIME_BOUNDARY_CONFIG_KEY, + logicalTableConfig.getTimeBoundaryConfig().toJsonString()); + } return record; } @@ -199,5 +208,25 @@ public class LogicalTableConfigUtils { throw new IllegalArgumentException( "Invalid logical table. Reason: Schema with same name as logical table '" + tableName + "' does not exist"); } + + // validate time boundary config is not null for hybrid tables + TimeBoundaryConfig timeBoundaryConfig = logicalTableConfig.getTimeBoundaryConfig(); + if (logicalTableConfig.isHybridLogicalTable() && timeBoundaryConfig == null) { + throw new IllegalArgumentException( + "Invalid logical table. Reason: 'timeBoundaryConfig' should not be null for hybrid logical tables"); + } + + // time boundary strategy should not be null or empty + if (timeBoundaryConfig != null && StringUtils.isEmpty(timeBoundaryConfig.getBoundaryStrategy())) { + throw new IllegalArgumentException( + "Invalid logical table. Reason: 'timeBoundaryConfig.boundaryStrategy' should not be null or empty"); + } + + // validate time boundary config parameters + if (timeBoundaryConfig != null + && (timeBoundaryConfig.getParameters() == null || timeBoundaryConfig.getParameters().isEmpty())) { + throw new IllegalArgumentException( + "Invalid logical table. Reason: 'timeBoundaryConfig.parameters' should not be null or empty"); + } } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 81c14fc2ca..892e3fbc11 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -105,6 +105,7 @@ import org.apache.pinot.common.lineage.SegmentLineageAccessHelper; import org.apache.pinot.common.lineage.SegmentLineageUtils; import org.apache.pinot.common.messages.ApplicationQpsQuotaRefreshMessage; import org.apache.pinot.common.messages.DatabaseConfigRefreshMessage; +import org.apache.pinot.common.messages.LogicalTableConfigRefreshMessage; import org.apache.pinot.common.messages.RoutingTableRebuildMessage; import org.apache.pinot.common.messages.RunPeriodicTaskMessage; import org.apache.pinot.common.messages.SegmentRefreshMessage; @@ -179,6 +180,7 @@ import org.apache.pinot.spi.config.user.UserConfig; import org.apache.pinot.spi.data.DateTimeFieldSpec; import org.apache.pinot.spi.data.LogicalTableConfig; import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.TimeBoundaryConfig; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Helix; import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.BrokerResourceStateModel; @@ -2150,6 +2152,14 @@ public class PinotHelixResourceManager { updateBrokerResourceForLogicalTable(logicalTableConfig, tableName); } + TimeBoundaryConfig oldTimeBoundaryConfig = oldLogicalTableConfig.getTimeBoundaryConfig(); + TimeBoundaryConfig newTimeBoundaryConfig = logicalTableConfig.getTimeBoundaryConfig(); + // compare the old and new time boundary config and send message if they are different + if ((oldTimeBoundaryConfig != null && !oldTimeBoundaryConfig.equals(newTimeBoundaryConfig)) + || (oldTimeBoundaryConfig == null && newTimeBoundaryConfig != null)) { + sendLogicalTableConfigRefreshMessage(logicalTableConfig.getTableName()); + } + LOGGER.info("Updated logical table {}: Successfully updated table", tableName); } @@ -3172,6 +3182,27 @@ public class PinotHelixResourceManager { } } + private void sendLogicalTableConfigRefreshMessage(String logicalTableName) { + LogicalTableConfigRefreshMessage refreshMessage = new LogicalTableConfigRefreshMessage(logicalTableName); + + // Send logical table config refresh message to brokers + Criteria recipientCriteria = new Criteria(); + recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); + recipientCriteria.setInstanceName("%"); + recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE); + recipientCriteria.setSessionSpecific(true); + recipientCriteria.setPartition(logicalTableName); + // Send message with no callback and infinite timeout on the recipient + int numMessagesSent = + _helixZkManager.getMessagingService().send(recipientCriteria, refreshMessage, null, -1); + if (numMessagesSent > 0) { + LOGGER.info("Sent {} logical table config refresh messages to brokers for table: {}", numMessagesSent, + logicalTableName); + } else { + LOGGER.warn("No logical table config refresh message sent to brokers for table: {}", logicalTableName); + } + } + private void sendApplicationQpsQuotaRefreshMessage(String appName) { ApplicationQpsQuotaRefreshMessage message = new ApplicationQpsQuotaRefreshMessage(appName); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java index ced4a3f6c9..3458777e8d 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java @@ -31,6 +31,7 @@ import org.apache.pinot.spi.config.table.QuotaConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.LogicalTableConfig; +import org.apache.pinot.spi.data.TimeBoundaryConfig; import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder; import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; @@ -333,6 +334,59 @@ public class PinotLogicalTableResourceTest extends ControllerTest { throwable.getMessage()); } + public void testLogicalTableTimeBoundaryConfigValidation() + throws IOException { + // Test logical table time boundary strategy validation + List<String> physicalTableNamesWithType = createHybridTables(List.of("test_table_8")); + LogicalTableConfig logicalTableConfig = + getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, physicalTableNamesWithType, BROKER_TENANT); + + // Test logical table with no time boundary config + logicalTableConfig.setTimeBoundaryConfig(null); + Throwable throwable = expectThrows(IOException.class, () -> { + ControllerTest.sendPostRequest(_addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); + }); + assertTrue(throwable.getMessage() + .contains("Reason: 'timeBoundaryConfig' should not be null for hybrid logical tables"), + throwable.getMessage()); + + // Test logical table with time boundary config but null strategy + logicalTableConfig.setTimeBoundaryConfig(new TimeBoundaryConfig(null, null)); + throwable = expectThrows(IOException.class, () -> { + ControllerTest.sendPostRequest(_addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); + }); + assertTrue(throwable.getMessage() + .contains("Reason: 'timeBoundaryConfig.strategy' should not be null or empty for hybrid logical tables"), + throwable.getMessage()); + + // Test logical table with time boundary config but empty strategy + logicalTableConfig.setTimeBoundaryConfig(new TimeBoundaryConfig("", null)); + throwable = expectThrows(IOException.class, () -> { + ControllerTest.sendPostRequest(_addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); + }); + assertTrue(throwable.getMessage() + .contains("Reason: 'timeBoundaryConfig.strategy' should not be null or empty for hybrid logical tables"), + throwable.getMessage()); + + // Test logical table with time boundary config but null parameters + logicalTableConfig.setTimeBoundaryConfig(new TimeBoundaryConfig("min", null)); + throwable = expectThrows(IOException.class, () -> { + ControllerTest.sendPostRequest(_addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); + }); + assertTrue(throwable.getMessage() + .contains("Reason: 'timeBoundaryConfig.parameters' should not be null or empty for hybrid logical tables"), + throwable.getMessage()); + + // Test logical table with time boundary config but empty parameters + logicalTableConfig.setTimeBoundaryConfig(new TimeBoundaryConfig("min", Map.of())); + throwable = expectThrows(IOException.class, () -> { + ControllerTest.sendPostRequest(_addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); + }); + assertTrue(throwable.getMessage() + .contains("Reason: 'timeBoundaryConfig.parameters' should not be null or empty for hybrid logical tables"), + throwable.getMessage()); + } + @Test public void testLogicalTableWithSameNameNotAllowed() throws IOException { diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java index 9f775d3201..38c29dfeb7 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java @@ -81,6 +81,7 @@ import org.apache.pinot.spi.data.LogicalTableConfig; import org.apache.pinot.spi.data.MetricFieldSpec; import org.apache.pinot.spi.data.PhysicalTableConfig; import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.TimeBoundaryConfig; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Helix; @@ -408,6 +409,7 @@ public class ControllerTest { .setRefRealtimeTableName(realtimeTableName) .setQuotaConfig(new QuotaConfig(null, "999")) .setQueryConfig(new QueryConfig(1L, true, false, null, 1L, 1L)) + .setTimeBoundaryConfig(new TimeBoundaryConfig("min", Map.of("includedTables", physicalTableNames))) .setPhysicalTableConfigMap(physicalTableConfigMap); return builder.build(); } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java index 7405a00ce5..3f2bffd888 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java @@ -24,9 +24,11 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.commons.io.FileUtils; import org.apache.pinot.controller.helix.ControllerRequestClient; import org.apache.pinot.controller.helix.ControllerTest; @@ -39,7 +41,9 @@ import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.LogicalTableConfig; import org.apache.pinot.spi.data.PhysicalTableConfig; import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.TimeBoundaryConfig; import org.apache.pinot.spi.exception.QueryErrorCode; +import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; @@ -63,8 +67,8 @@ public abstract class BaseLogicalTableIntegrationTest extends BaseClusterIntegra private static final String DEFAULT_TENANT = "DefaultTenant"; private static final String DEFAULT_LOGICAL_TABLE_NAME = "mytable"; protected static final String DEFAULT_TABLE_NAME = "physicalTable"; - private static final int NUM_OFFLINE_SEGMENTS = 12; protected static BaseLogicalTableIntegrationTest _sharedClusterTestSuite = null; + protected List<File> _avroFiles; @BeforeSuite public void setUpSuite() @@ -99,11 +103,6 @@ public abstract class BaseLogicalTableIntegrationTest extends BaseClusterIntegra LOGGER.info("Finished tearing down integration test suite"); } - @Override - protected String getTableName() { - return DEFAULT_TABLE_NAME; - } - @BeforeClass public void setUp() throws Exception { @@ -113,10 +112,12 @@ public abstract class BaseLogicalTableIntegrationTest extends BaseClusterIntegra _helixResourceManager = _sharedClusterTestSuite._helixResourceManager; } - List<File> avroFiles = getAllAvroFiles(); - int numSegmentsPerTable = NUM_OFFLINE_SEGMENTS / getOfflineTableNames().size(); - int index = 0; - for (String tableName : getOfflineTableNames()) { + _avroFiles = getAllAvroFiles(); + Map<String, List<File>> offlineTableDataFiles = getOfflineTableDataFiles(); + for (Map.Entry<String, List<File>> entry : offlineTableDataFiles.entrySet()) { + String tableName = entry.getKey(); + List<File> avroFilesForTable = entry.getValue(); + File tarDir = new File(_tarDir, tableName); TestUtils.ensureDirectoriesExistAndEmpty(tarDir); @@ -128,25 +129,37 @@ public abstract class BaseLogicalTableIntegrationTest extends BaseClusterIntegra TableConfig offlineTableConfig = createOfflineTableConfig(tableName); addTableConfig(offlineTableConfig); - List<File> offlineAvroFiles = new ArrayList<>(numSegmentsPerTable); - for (int i = index; i < index + numSegmentsPerTable; i++) { - offlineAvroFiles.add(avroFiles.get(i)); - } - index += numSegmentsPerTable; - // Create and upload segments - ClusterIntegrationTestUtils.buildSegmentsFromAvro(offlineAvroFiles, offlineTableConfig, schema, 0, _segmentDir, + ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFilesForTable, offlineTableConfig, schema, 0, _segmentDir, tarDir); uploadSegments(tableName, tarDir); } + // create realtime table + Map<String, List<File>> realtimeTableDataFiles = getRealtimeTableDataFiles(); + for (Map.Entry<String, List<File>> entry : realtimeTableDataFiles.entrySet()) { + String tableName = entry.getKey(); + List<File> avroFilesForTable = entry.getValue(); + // create and upload the schema and table config + Schema schema = createSchema(getSchemaFileName()); + schema.setSchemaName(tableName); + addSchema(schema); + + TableConfig realtimeTableConfig = createRealtimeTableConfig(avroFilesForTable.get(0)); + realtimeTableConfig.setTableName(tableName); + addTableConfig(realtimeTableConfig); + + // push avro files into kafka + pushAvroIntoKafka(avroFilesForTable); + } + createLogicalTable(); // Set up the H2 connection - setUpH2Connection(avroFiles); + setUpH2Connection(_avroFiles); // Initialize the query generator - setUpQueryGenerator(avroFiles); + setUpQueryGenerator(_avroFiles); // Wait for all documents loaded waitForAllDocsLoaded(600_000L); @@ -158,11 +171,70 @@ public abstract class BaseLogicalTableIntegrationTest extends BaseClusterIntegra cleanup(); } - protected abstract List<String> getOfflineTableNames(); + protected List<String> getOfflineTableNames() { + return List.of(); + } + + protected List<String> getRealtimeTableNames() { + return List.of(); + } + + protected Map<String, List<File>> getOfflineTableDataFiles() { + List<String> offlineTableNames = getOfflineTableNames(); + return !offlineTableNames.isEmpty() ? distributeFilesToTables(offlineTableNames, _avroFiles) : Map.of(); + } + + protected Map<String, List<File>> getRealtimeTableDataFiles() { + List<String> realtimeTableNames = getRealtimeTableNames(); + return !realtimeTableNames.isEmpty() ? distributeFilesToTables(realtimeTableNames, _avroFiles) : Map.of(); + } + + protected Map<String, List<File>> distributeFilesToTables(List<String> tableNames, List<File> avroFiles) { + Map<String, List<File>> tableNameToFilesMap = new HashMap<>(); + + // Initialize the map with empty lists for each table name + tableNames.forEach(table -> tableNameToFilesMap.put(table, new ArrayList<>())); + + // Round-robin distribution of files to table names + for (int i = 0; i < avroFiles.size(); i++) { + String tableName = tableNames.get(i % tableNames.size()); + tableNameToFilesMap.get(tableName).add(avroFiles.get(i)); + } + return tableNameToFilesMap; + } + + private List<String> getTimeBoundaryTable() { + String timeBoundaryTable = null; + long maxEndTimeMillis = Long.MIN_VALUE; + try { + for (String tableName : getOfflineTableNames()) { + String url = _controllerRequestURLBuilder.forSegmentMetadata(tableName, TableType.OFFLINE); + String response = ControllerTest.sendGetRequest(url); + JsonNode jsonNode = JsonUtils.stringToJsonNode(response); + Iterator<String> stringIterator = jsonNode.fieldNames(); + while (stringIterator.hasNext()) { + String segmentName = stringIterator.next(); + JsonNode segmentJsonNode = jsonNode.get(segmentName); + long endTimeMillis = segmentJsonNode.get("endTimeMillis").asLong(); + if (endTimeMillis > maxEndTimeMillis) { + maxEndTimeMillis = endTimeMillis; + timeBoundaryTable = tableName; + } + } + } + } catch (IOException e) { + throw new RuntimeException("Failed to get the time boundary table", e); + } + return timeBoundaryTable != null ? List.of(TableNameBuilder.OFFLINE.tableNameWithType(timeBoundaryTable)) + : List.of(); + } protected List<String> getPhysicalTableNames() { - return getOfflineTableNames().stream().map(TableNameBuilder.OFFLINE::tableNameWithType) + List<String> offlineTableNames = getOfflineTableNames().stream().map(TableNameBuilder.OFFLINE::tableNameWithType) .collect(Collectors.toList()); + List<String> realtimeTableNames = getRealtimeTableNames().stream() + .map(TableNameBuilder.REALTIME::tableNameWithType).collect(Collectors.toList()); + return Stream.concat(offlineTableNames.stream(), realtimeTableNames.stream()).collect(Collectors.toList()); } protected String getLogicalTableName() { @@ -205,7 +277,7 @@ public abstract class BaseLogicalTableIntegrationTest extends BaseClusterIntegra // @formatter:on } - public static LogicalTableConfig getLogicalTableConfig(String tableName, List<String> physicalTableNames, + public LogicalTableConfig getLogicalTableConfig(String tableName, List<String> physicalTableNames, String brokerTenant) { Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>(); for (String physicalTableName : physicalTableNames) { @@ -216,9 +288,16 @@ public abstract class BaseLogicalTableIntegrationTest extends BaseClusterIntegra String realtimeTableName = physicalTableNames.stream().filter(TableNameBuilder::isRealtimeTableResource).findFirst().orElse(null); LogicalTableConfigBuilder builder = - new LogicalTableConfigBuilder().setTableName(tableName).setBrokerTenant(brokerTenant) - .setRefOfflineTableName(offlineTableName).setRefRealtimeTableName(realtimeTableName) + new LogicalTableConfigBuilder().setTableName(tableName) + .setBrokerTenant(brokerTenant) + .setRefOfflineTableName(offlineTableName) + .setRefRealtimeTableName(realtimeTableName) .setPhysicalTableConfigMap(physicalTableConfigMap); + if (!getOfflineTableNames().isEmpty() && !getRealtimeTableNames().isEmpty()) { + builder.setTimeBoundaryConfig( + new TimeBoundaryConfig("min", Map.of("includedTables", getTimeBoundaryTable())) + ); + } return builder.build(); } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithOneOfflineOneRealtimeTableIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithOneOfflineOneRealtimeTableIntegrationTest.java new file mode 100644 index 0000000000..d6f599ce1b --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithOneOfflineOneRealtimeTableIntegrationTest.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests.logicaltable; + +import java.io.File; +import java.util.List; +import java.util.Map; + + +public class LogicalTableWithOneOfflineOneRealtimeTableIntegrationTest extends BaseLogicalTableIntegrationTest { + + @Override + protected List<String> getOfflineTableNames() { + return List.of("o_1"); + } + + @Override + protected List<String> getRealtimeTableNames() { + return List.of("r_1"); + } + + @Override + protected Map<String, List<File>> getRealtimeTableDataFiles() { + // Overlapping data files for the hybrid table + return distributeFilesToTables(getRealtimeTableNames(), + _avroFiles.subList(_avroFiles.size() - 4, _avroFiles.size())); + } +} diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithOneRealtimeTableIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithOneRealtimeTableIntegrationTest.java new file mode 100644 index 0000000000..dee7e1f59c --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithOneRealtimeTableIntegrationTest.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests.logicaltable; + +import java.util.List; + + +public class LogicalTableWithOneRealtimeTableIntegrationTest extends BaseLogicalTableIntegrationTest { + @Override + protected List<String> getRealtimeTableNames() { + return List.of("r_1"); + } +} diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwelveOfflineOneRealtimeTableIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwelveOfflineOneRealtimeTableIntegrationTest.java new file mode 100644 index 0000000000..96eb11e2fe --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwelveOfflineOneRealtimeTableIntegrationTest.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests.logicaltable; + +import java.io.File; +import java.util.List; +import java.util.Map; + + +public class LogicalTableWithTwelveOfflineOneRealtimeTableIntegrationTest extends BaseLogicalTableIntegrationTest { + @Override + protected List<String> getOfflineTableNames() { + return List.of("o_1", "o_2", "o_3", "o_4", "o_5", "o_6", "o_7", "o_8", "o_9", "o_10", "o_11", "o_12"); + } + + @Override + protected List<String> getRealtimeTableNames() { + return List.of("r_1"); + } + + @Override + protected Map<String, List<File>> getRealtimeTableDataFiles() { + // Overlapping data files for the hybrid table + return distributeFilesToTables(getRealtimeTableNames(), + _avroFiles.subList(_avroFiles.size() - 2, _avroFiles.size())); + } +} diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest.java new file mode 100644 index 0000000000..d406b2ad56 --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests.logicaltable; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.pinot.spi.data.LogicalTableConfig; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.testng.annotations.Test; + + +public class LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest extends BaseLogicalTableIntegrationTest { + @Override + protected List<String> getOfflineTableNames() { + return List.of("o_1", "o_2"); + } + + @Override + protected List<String> getRealtimeTableNames() { + return List.of("r_1"); + } + + @Override + protected Map<String, List<File>> getRealtimeTableDataFiles() { + // Overlapping data files for the hybrid table + return distributeFilesToTables(getRealtimeTableNames(), + _avroFiles.subList(_avroFiles.size() - 4, _avroFiles.size())); + } + + @Test + public void testUpdateLogicalTableTimeBoundary() + throws Exception { + LogicalTableConfig logicalTableConfig = getLogicalTableConfig(getLogicalTableName()); + updateTimeBoundaryTableInLogicalTable(logicalTableConfig); + + // Wait to ensure logical table config update helix message is processed in broker + waitForAllDocsLoaded(5_000); + + // Run the tests + testGeneratedQueries(); + testHardcodedQueries(); + testQueriesFromQueryFile(); + } + + private void updateTimeBoundaryTableInLogicalTable(LogicalTableConfig logicalTableConfig) + throws IOException { + List<String> includedTables = + (List<String>) logicalTableConfig.getTimeBoundaryConfig().getParameters().get("includedTables"); + + String timeBoundaryTableName = TableNameBuilder.extractRawTableName(includedTables.get(0)); + String newTimeBoundaryTableName = timeBoundaryTableName.equals("o_1") ? "o_2" : "o_1"; + newTimeBoundaryTableName = TableNameBuilder.OFFLINE.tableNameWithType(newTimeBoundaryTableName); + + Map<String, Object> parameters = Map.of("includedTables", List.of(newTimeBoundaryTableName)); + logicalTableConfig.getTimeBoundaryConfig().setParameters(parameters); + + updateLogicalTableConfig(logicalTableConfig.getTableName(), logicalTableConfig); + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteProvider.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteProvider.java index 60ddd345ae..fe3937b8b0 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteProvider.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/table/LogicalTableRouteProvider.java @@ -26,6 +26,8 @@ import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.core.routing.RoutingManager; import org.apache.pinot.core.routing.TimeBoundaryInfo; import org.apache.pinot.core.transport.TableRouteInfo; +import org.apache.pinot.query.timeboundary.TimeBoundaryStrategy; +import org.apache.pinot.query.timeboundary.TimeBoundaryStrategyService; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.LogicalTableConfig; @@ -78,6 +80,20 @@ public class LogicalTableRouteProvider implements TableRouteProvider { routeInfo.setRealtimeTableConfig(realtimeTableConfig); } routeInfo.setQueryConfig(logicalTable.getQueryConfig()); + + TimeBoundaryInfo timeBoundaryInfo; + if (!offlineTables.isEmpty() && !realtimeTables.isEmpty()) { + String boundaryStrategy = logicalTable.getTimeBoundaryConfig().getBoundaryStrategy(); + TimeBoundaryStrategy timeBoundaryStrategy = + TimeBoundaryStrategyService.getInstance().getTimeBoundaryStrategy(boundaryStrategy); + timeBoundaryInfo = timeBoundaryStrategy.computeTimeBoundary(logicalTable, tableCache, routingManager); + if (timeBoundaryInfo == null) { + LOGGER.info("No time boundary info found for logical hybrid table: "); + routeInfo.setOfflineTables(null); + } else { + routeInfo.setTimeBoundaryInfo(timeBoundaryInfo); + } + } return routeInfo; } @@ -111,17 +127,6 @@ public class LogicalTableRouteProvider implements TableRouteProvider { } } - TimeBoundaryInfo timeBoundaryInfo = null; - if (routeInfo.hasRealtime() && routeInfo.hasOffline()) { - timeBoundaryInfo = routingManager.getTimeBoundaryInfo(routeInfo.getOfflineTables().get(0).getOfflineTableName()); - if (timeBoundaryInfo == null) { - LOGGER.debug("No time boundary info found for hybrid table: "); - routeInfo.setOfflineTables(null); - } else { - routeInfo.setTimeBoundaryInfo(timeBoundaryInfo); - } - } - //Set BrokerRequests to NULL if there is no route. if (routeInfo.getOfflineExecutionServers().isEmpty()) { routeInfo.setOfflineBrokerRequest(null); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategy.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategy.java new file mode 100644 index 0000000000..5d3c98596b --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategy.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.timeboundary; + +import com.google.auto.service.AutoService; +import com.google.common.base.Preconditions; +import java.util.List; +import java.util.Map; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.core.routing.RoutingManager; +import org.apache.pinot.core.routing.TimeBoundaryInfo; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.DateTimeFieldSpec; +import org.apache.pinot.spi.data.DateTimeFormatSpec; +import org.apache.pinot.spi.data.LogicalTableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; + + +@AutoService(TimeBoundaryStrategy.class) +public class MinTimeBoundaryStrategy implements TimeBoundaryStrategy { + + @Override + public String getName() { + return "min"; + } + + @Override + public TimeBoundaryInfo computeTimeBoundary(LogicalTableConfig logicalTableConfig, TableCache tableCache, + RoutingManager routingManager) { + TimeBoundaryInfo minTimeBoundaryInfo = null; + long minTimeBoundary = Long.MAX_VALUE; + Map<String, Object> parameters = logicalTableConfig.getTimeBoundaryConfig().getParameters(); + List<String> includedTables = + parameters != null ? (List) parameters.getOrDefault("includedTables", List.of()) : List.of(); + for (String physicalTableName : includedTables) { + TimeBoundaryInfo current = routingManager.getTimeBoundaryInfo(physicalTableName); + if (current != null) { + String rawTableName = TableNameBuilder.extractRawTableName(physicalTableName); + Schema schema = tableCache.getSchema(rawTableName); + TableConfig tableConfig = tableCache.getTableConfig(physicalTableName); + Preconditions.checkArgument(tableConfig != null, + "Table config not found for table: %s", physicalTableName); + Preconditions.checkArgument(schema != null, + "Schema not found for table: %s", physicalTableName); + String timeColumnName = tableConfig.getValidationConfig().getTimeColumnName(); + DateTimeFieldSpec dateTimeFieldSpec = schema.getSpecForTimeColumn(timeColumnName); + Preconditions.checkArgument(dateTimeFieldSpec != null, + "Time column not found in schema for table: %s", physicalTableName); + DateTimeFormatSpec specFormatSpec = dateTimeFieldSpec.getFormatSpec(); + long currentTimeBoundaryMillis = specFormatSpec.fromFormatToMillis(current.getTimeValue()); + if (minTimeBoundaryInfo == null) { + minTimeBoundaryInfo = current; + minTimeBoundary = currentTimeBoundaryMillis; + } else if (minTimeBoundary > currentTimeBoundaryMillis) { + minTimeBoundaryInfo = current; + minTimeBoundary = currentTimeBoundaryMillis; + } + } + } + return minTimeBoundaryInfo; + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategy.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategy.java new file mode 100644 index 0000000000..c1b97f28c5 --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategy.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.timeboundary; + +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.core.routing.RoutingManager; +import org.apache.pinot.core.routing.TimeBoundaryInfo; +import org.apache.pinot.spi.data.LogicalTableConfig; + + +public interface TimeBoundaryStrategy { + + /** + * Returns the time boundary strategy name. + * + * @return The time boundary strategy name. + */ + String getName(); + + /** + * Computes the time boundary for the given physical table names. + * + * @param logicalTableConfig The logical table configuration. + * @param tableCache The table cache to use for fetching table metadata. + * @param routingManager The routing manager to use for computing the time boundary. + * @return The computed time boundary information. + */ + TimeBoundaryInfo computeTimeBoundary(LogicalTableConfig logicalTableConfig, TableCache tableCache, + RoutingManager routingManager); +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategyService.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategyService.java new file mode 100644 index 0000000000..0e3f6af4aa --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategyService.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.timeboundary; + +import java.util.Map; +import java.util.ServiceLoader; + + +public class TimeBoundaryStrategyService { + + private static volatile TimeBoundaryStrategyService _instance = fromServiceLoader(); + private final Map<String, TimeBoundaryStrategy> _strategyMap; + + private TimeBoundaryStrategyService(Map<String, TimeBoundaryStrategy> strategyMap) { + _strategyMap = strategyMap; + } + + public static TimeBoundaryStrategyService fromServiceLoader() { + Map<String, TimeBoundaryStrategy> strategyMap = new java.util.HashMap<>(); + for (TimeBoundaryStrategy strategy : ServiceLoader.load(TimeBoundaryStrategy.class)) { + String strategyName = strategy.getName(); + if (strategyMap.containsKey(strategyName)) { + throw new IllegalStateException("Duplicate TimeBoundaryStrategy found: " + strategyName); + } + strategyMap.put(strategyName, strategy); + } + return new TimeBoundaryStrategyService(strategyMap); + } + + /** + * Returns the singleton instance of the TimeBoundaryStrategyService. + * + * @return The singleton instance of the TimeBoundaryStrategyService. + */ + public static TimeBoundaryStrategyService getInstance() { + return _instance; + } + + /** + * Sets the singleton instance of the TimeBoundaryStrategyService. + * + * @param service The new instance to set. + */ + public static void setInstance(TimeBoundaryStrategyService service) { + _instance = service; + } + + public TimeBoundaryStrategy getTimeBoundaryStrategy(String name) { + TimeBoundaryStrategy strategy = _instance._strategyMap.get(name); + if (strategy == null) { + throw new IllegalArgumentException("No TimeBoundaryStrategy found for name: " + name); + } + return strategy; + } +} diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/BaseTableRouteTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/BaseTableRouteTest.java index dd8d206b5a..1c08dca4ae 100644 --- a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/BaseTableRouteTest.java +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/BaseTableRouteTest.java @@ -31,22 +31,31 @@ import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.common.request.PinotQuery; import org.apache.pinot.core.routing.RoutingManager; import org.apache.pinot.core.routing.ServerRouteInfo; +import org.apache.pinot.core.routing.TimeBoundaryInfo; import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.core.transport.TableRouteInfo; import org.apache.pinot.query.testutils.MockRoutingManagerFactory; +import org.apache.pinot.query.timeboundary.TimeBoundaryStrategy; +import org.apache.pinot.query.timeboundary.TimeBoundaryStrategyService; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.LogicalTableConfig; import org.apache.pinot.spi.data.PhysicalTableConfig; import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.TimeBoundaryConfig; import org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.sql.parsers.CalciteSqlCompiler; import org.apache.pinot.sql.parsers.CalciteSqlParser; +import org.mockito.MockedStatic; +import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; @@ -81,6 +90,7 @@ public class BaseTableRouteTest { public static final Map<String, Schema> TABLE_SCHEMAS = new HashMap<>(); private static final Set<String> DISABLED_TABLES = new HashSet<>(); + static { TABLE_SCHEMAS.put("a_REALTIME", getSchemaBuilder("a").build()); TABLE_SCHEMAS.put("b_OFFLINE", getSchemaBuilder("b").build()); @@ -125,7 +135,8 @@ public class BaseTableRouteTest { TableCache _tableCache; ImplicitHybridTableRouteProvider _hybridTableRouteProvider; LogicalTableRouteProvider _logicalTableRouteProvider; - + TimeBoundaryStrategy _timeBoundaryStrategy; + MockedStatic<TimeBoundaryStrategyService> _timeBoundaryStrategyFactoryMockedStatic; @BeforeClass public void setUp() { @@ -154,6 +165,17 @@ public class BaseTableRouteTest { _tableCache = factory.buildTableCache(); _hybridTableRouteProvider = new ImplicitHybridTableRouteProvider(); _logicalTableRouteProvider = new LogicalTableRouteProvider(); + _timeBoundaryStrategyFactoryMockedStatic = mockStatic(TimeBoundaryStrategyService.class); + _timeBoundaryStrategy = mock(TimeBoundaryStrategy.class); + TimeBoundaryStrategyService mockService = mock(TimeBoundaryStrategyService.class); + when(TimeBoundaryStrategyService.getInstance()).thenReturn(mockService); + when(mockService.getTimeBoundaryStrategy(any())).thenReturn(_timeBoundaryStrategy); + when(_timeBoundaryStrategy.computeTimeBoundary(any(), any(), any())).thenReturn(mock(TimeBoundaryInfo.class)); + } + + @AfterClass + public void tearDown() { + _timeBoundaryStrategyFactoryMockedStatic.close(); } @DataProvider(name = "offlineTableProvider") @@ -413,6 +435,8 @@ public class BaseTableRouteTest { builder.setPhysicalTableConfigMap(tableConfigMap); builder.setBrokerTenant("brokerTenant"); + builder.setTimeBoundaryConfig( + new TimeBoundaryConfig("min", Map.of("includedTables", List.of("randomTable_OFFLINE")))); LogicalTableConfig logicalTable = builder.build(); when(_tableCache.getLogicalTableConfig(eq(logicalTableName))).thenReturn(logicalTable); diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/LogicalTableRouteProviderCalculateRouteTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/LogicalTableRouteProviderCalculateRouteTest.java index f7280b3319..f76a3bfdb9 100644 --- a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/LogicalTableRouteProviderCalculateRouteTest.java +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/LogicalTableRouteProviderCalculateRouteTest.java @@ -33,6 +33,7 @@ import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; @@ -95,6 +96,12 @@ public class LogicalTableRouteProviderCalculateRouteTest extends BaseTableRouteT assertNotNull(requestMap); assertFalse(requestMap.isEmpty()); } + + if (routeInfo.isHybrid()) { + assertNotNull(routeInfo.getTimeBoundaryInfo(), "Time boundary info should not be null for hybrid table"); + } else { + assertNull(routeInfo.getTimeBoundaryInfo(), "Time boundary info should be null for non-hybrid table"); + } } @Test(dataProvider = "offlineTableAndRouteProvider") @@ -107,6 +114,13 @@ public class LogicalTableRouteProviderCalculateRouteTest extends BaseTableRouteT assertTableRoute(tableName, "realtimeTableAndRouteProvider", null, expectedRoutingTable, false, true); } + @Test(dataProvider = "hybridTableAndRouteProvider") + void testHybridTableRoute(String tableName, Map<String, Set<String>> expectedOfflineRoutingTable, + Map<String, Set<String>> expectedRealtimeRoutingTable) { + assertTableRoute(tableName, "hybridTableAndRouteProvider", expectedOfflineRoutingTable, + expectedRealtimeRoutingTable, expectedOfflineRoutingTable != null, expectedRealtimeRoutingTable != null); + } + @Test(dataProvider = "routeNotExistsProvider") void testRouteNotExists(String tableName) { assertTableRoute(tableName, "routeNotExistsProvider", null, null, false, false); diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/LogicalTableRouteProviderGetRouteTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/LogicalTableRouteProviderGetRouteTest.java index e4417a9daa..d9d0a9fa51 100644 --- a/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/LogicalTableRouteProviderGetRouteTest.java +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/routing/table/LogicalTableRouteProviderGetRouteTest.java @@ -26,6 +26,7 @@ import java.util.Map; import org.apache.pinot.core.transport.TableRouteInfo; import org.apache.pinot.spi.data.LogicalTableConfig; import org.apache.pinot.spi.data.PhysicalTableConfig; +import org.apache.pinot.spi.data.TimeBoundaryConfig; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -68,6 +69,7 @@ public class LogicalTableRouteProviderGetRouteTest extends BaseTableRouteTest { TableRouteInfo routeInfo = getLogicalTableRouteInfo(tableName, "testOfflineTable"); assertTrue(routeInfo.isExists(), "The table should exist"); assertTrue(routeInfo.isOffline(), "The table should be offline"); + assertNull(routeInfo.getTimeBoundaryInfo(), "The table should not have time boundary info"); } @Test(dataProvider = "realtimeTableProvider") @@ -75,6 +77,7 @@ public class LogicalTableRouteProviderGetRouteTest extends BaseTableRouteTest { TableRouteInfo routeInfo = getLogicalTableRouteInfo(tableName, "testRealtimeTable"); assertTrue(routeInfo.isExists(), "The table should exist"); assertTrue(routeInfo.isRealtime(), "The table should be realtime"); + assertNull(routeInfo.getTimeBoundaryInfo(), "The table should not have time boundary info"); } @Test(dataProvider = "hybridTableProvider") @@ -82,6 +85,7 @@ public class LogicalTableRouteProviderGetRouteTest extends BaseTableRouteTest { TableRouteInfo routeInfo = getLogicalTableRouteInfo(tableName, "testHybridTable"); assertTrue(routeInfo.isExists(), "The table should exist"); assertTrue(routeInfo.isHybrid(), "The table should be hybrid"); + assertNotNull(routeInfo.getTimeBoundaryInfo(), "The table should have time boundary info"); } @Test(dataProvider = "routeExistsProvider") @@ -216,6 +220,7 @@ public class LogicalTableRouteProviderGetRouteTest extends BaseTableRouteTest { assertTrue(routeInfo.isRouteExists()); assertFalse(routeInfo.isDisabled()); assertNull(routeInfo.getDisabledTableNames()); + assertNotNull(routeInfo.getTimeBoundaryInfo()); } @Test(dataProvider = "disabledTableProvider") @@ -259,6 +264,7 @@ public class LogicalTableRouteProviderGetRouteTest extends BaseTableRouteTest { } logicalTable.setPhysicalTableConfigMap(tableConfigMap); logicalTable.setBrokerTenant("brokerTenant"); + logicalTable.setTimeBoundaryConfig(new TimeBoundaryConfig("min", Map.of("includedTables", physicalTableNames))); when(_tableCache.getLogicalTableConfig(eq(logicalTableName))).thenReturn(logicalTable); TableRouteInfo routeInfo = @@ -297,6 +303,7 @@ public class LogicalTableRouteProviderGetRouteTest extends BaseTableRouteTest { } logicalTable.setPhysicalTableConfigMap(tableConfigMap); logicalTable.setBrokerTenant("brokerTenant"); + logicalTable.setTimeBoundaryConfig(new TimeBoundaryConfig("min", Map.of("includedTables", physicalTableNames))); when(_tableCache.getLogicalTableConfig(eq(logicalTableName))).thenReturn(logicalTable); TableRouteInfo routeInfo = diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategyTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategyTest.java new file mode 100644 index 0000000000..464ff805d9 --- /dev/null +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategyTest.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.timeboundary; + +import java.util.List; +import java.util.Map; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.core.routing.RoutingManager; +import org.apache.pinot.core.routing.TimeBoundaryInfo; +import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.DateTimeFieldSpec; +import org.apache.pinot.spi.data.DateTimeFormatSpec; +import org.apache.pinot.spi.data.LogicalTableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.TimeBoundaryConfig; +import org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertSame; + + +public class MinTimeBoundaryStrategyTest { + + TableCache _mockTableCache; + RoutingManager _mockRoutingManager; + TimeBoundaryStrategy _minTimeBoundaryStrategy = new MinTimeBoundaryStrategy(); + + public void setupMocks(Map<String, TimeBoundaryInfo> data) { + for (String tableName : data.keySet()) { + if (TableNameBuilder.isRealtimeTableResource(tableName)) { + continue; + } + TimeBoundaryInfo timeBoundaryInfo = data.get(tableName); + when(_mockRoutingManager.getTimeBoundaryInfo(tableName)).thenReturn(timeBoundaryInfo); + Schema schema = mock(Schema.class); + TableConfig tableConfig = mock(TableConfig.class); + SegmentsValidationAndRetentionConfig validationMock = mock(SegmentsValidationAndRetentionConfig.class); + DateTimeFieldSpec dateTimeFieldSpec = mock(DateTimeFieldSpec.class); + DateTimeFormatSpec dateTimeFormatSpec = mock(DateTimeFormatSpec.class); + + when(_mockTableCache.getSchema(TableNameBuilder.extractRawTableName(tableName))).thenReturn(schema); + when(_mockTableCache.getTableConfig(tableName)).thenReturn(tableConfig); + when(tableConfig.getValidationConfig()).thenReturn(validationMock); + when(validationMock.getTimeColumnName()).thenReturn(timeBoundaryInfo.getTimeColumn()); + when(schema.getSpecForTimeColumn(timeBoundaryInfo.getTimeColumn())).thenReturn(dateTimeFieldSpec); + when(dateTimeFieldSpec.getFormatSpec()).thenReturn(dateTimeFormatSpec); + when(dateTimeFormatSpec.fromFormatToMillis(any())).thenReturn(Long.valueOf(timeBoundaryInfo.getTimeValue())); + } + } + + + @DataProvider + public Object[][] timeBoundaryData() { + Map<String, TimeBoundaryInfo> timeBoundaryInfoMap = Map.of( + "table1_OFFLINE", new TimeBoundaryInfo("timeColumn1", "1747134822000"), + "table2_OFFLINE", new TimeBoundaryInfo("timeColumn2", "1747134844000"), + "table3_OFFLINE", new TimeBoundaryInfo("timeColumn3", "1747134866000"), + "table4_OFFLINE", new TimeBoundaryInfo("timeColumn4", "1747134888000"), + "table5_REALTIME", new TimeBoundaryInfo("timeColumn5", "1747134900000") + ); + + return new Object[][]{ + {timeBoundaryInfoMap, List.of("table3_OFFLINE"), "table3_OFFLINE"}, + {timeBoundaryInfoMap, List.of("Invalid_OFFLINE"), "Invalid_OFFLINE"}, + {timeBoundaryInfoMap, List.of("table2_OFFLINE", "table3_OFFLINE"), "table2_OFFLINE"}, + {timeBoundaryInfoMap, List.of("table3_OFFLINE", "table2_OFFLINE", "table4_OFFLINE"), "table2_OFFLINE"}, + {timeBoundaryInfoMap, List.of(), "empty_includedTables_OFFLINE"} + }; + } + + + @Test(dataProvider = "timeBoundaryData") + public void testComputeTimeBoundary(Map<String, TimeBoundaryInfo> timeBoundaryInfoMap, + List<String> includedTables, String expectedTableName) { + Map<String, Object> parameters = Map.of("includedTables", includedTables); + testComputeTimeBoundary(timeBoundaryInfoMap, expectedTableName, parameters); + } + + private void testComputeTimeBoundary(Map<String, TimeBoundaryInfo> timeBoundaryInfoMap, String expectedTableName, + Map<String, Object> parameters) { + setupMocks(timeBoundaryInfoMap); + TimeBoundaryInfo timeBoundaryInfo = _minTimeBoundaryStrategy.computeTimeBoundary( + createLogicalTableConfig(parameters), _mockTableCache, _mockRoutingManager); + assertSame(timeBoundaryInfo, timeBoundaryInfoMap.get(expectedTableName)); + } + + @BeforeMethod + public void setUp() { + _mockTableCache = mock(TableCache.class); + _mockRoutingManager = mock(RoutingManager.class); + } + + private LogicalTableConfig createLogicalTableConfig(Map<String, Object> parameters) { + LogicalTableConfigBuilder builder = new LogicalTableConfigBuilder() + .setTableName("logical_table") + .setTimeBoundaryConfig(new TimeBoundaryConfig("min", parameters)); + return builder.build(); + } +} diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategyServiceTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategyServiceTest.java new file mode 100644 index 0000000000..05993a437f --- /dev/null +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategyServiceTest.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.timeboundary; + +import org.testng.annotations.Test; + +import static org.testng.Assert.assertTrue; + + +public class TimeBoundaryStrategyServiceTest { + + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "No TimeBoundaryStrategy found for name: invalidStrategy") + public void testInvalidTimeBoundaryStrategy() { + TimeBoundaryStrategyService.getInstance().getTimeBoundaryStrategy("invalidStrategy"); + } + + @Test + public void testMinTimeBoundaryStrategy() { + TimeBoundaryStrategy timeBoundaryStrategy = TimeBoundaryStrategyService.getInstance() + .getTimeBoundaryStrategy("min"); + assertTrue(timeBoundaryStrategy instanceof MinTimeBoundaryStrategy, "Expected MinTimeBoundaryStrategy instance"); + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java index 2a427ce9dc..2c52148098 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java @@ -42,6 +42,7 @@ public class LogicalTableConfig extends BaseJsonConfig { public static final String QUOTA_CONFIG_KEY = "quota"; public static final String REF_OFFLINE_TABLE_NAME_KEY = "refOfflineTableName"; public static final String REF_REALTIME_TABLE_NAME_KEY = "refRealtimeTableName"; + public static final String TIME_BOUNDARY_CONFIG_KEY = "timeBoundaryConfig"; private String _tableName; private String _brokerTenant; @@ -52,6 +53,7 @@ public class LogicalTableConfig extends BaseJsonConfig { private QuotaConfig _quotaConfig; private String _refOfflineTableName; private String _refRealtimeTableName; + private TimeBoundaryConfig _timeBoundaryConfig; public static LogicalTableConfig fromString(String logicalTableString) throws IOException { @@ -119,6 +121,14 @@ public class LogicalTableConfig extends BaseJsonConfig { _refRealtimeTableName = refRealtimeTableName; } + public TimeBoundaryConfig getTimeBoundaryConfig() { + return _timeBoundaryConfig; + } + + public void setTimeBoundaryConfig(TimeBoundaryConfig timeBoundaryConfig) { + _timeBoundaryConfig = timeBoundaryConfig; + } + private JsonNode toJsonObject() { return DEFAULT_MAPPER.valueToTree(this); } @@ -141,6 +151,10 @@ public class LogicalTableConfig extends BaseJsonConfig { } } + public boolean isHybridLogicalTable() { + return _refOfflineTableName != null && _refRealtimeTableName != null; + } + @Override public String toString() { return toSingleLineJsonString(); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/TimeBoundaryConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/TimeBoundaryConfig.java new file mode 100644 index 0000000000..fda81adac1 --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/TimeBoundaryConfig.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.data; + +import java.util.Map; +import org.apache.pinot.spi.config.BaseJsonConfig; + + +public class TimeBoundaryConfig extends BaseJsonConfig { + private String _boundaryStrategy; + private Map<String, Object> _parameters; + + public TimeBoundaryConfig() { + } + + public TimeBoundaryConfig(String boundaryStrategy, Map<String, Object> parameters) { + _boundaryStrategy = boundaryStrategy; + _parameters = parameters; + } + + public String getBoundaryStrategy() { + return _boundaryStrategy; + } + + public void setBoundaryStrategy(String boundaryStrategy) { + _boundaryStrategy = boundaryStrategy; + } + + public Map<String, Object> getParameters() { + return _parameters; + } + + public void setParameters(Map<String, Object> parameters) { + _parameters = parameters; + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java index eb1b7d3e17..b7c0692792 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java @@ -424,6 +424,10 @@ public class ControllerRequestURLBuilder { return StringUtil.join("/", _baseUrl, "segments", tableName, encode(segmentName), "metadata"); } + public String forSegmentMetadata(String tableName, TableType tableType) { + return StringUtil.join("/", _baseUrl, "segments", tableName, "metadata") + "?type=" + tableType.name(); + } + public String forListAllSegmentLineages(String tableName, String tableType) { return StringUtil.join("/", _baseUrl, "segments", tableName, "lineage?type=" + tableType); } @@ -653,4 +657,8 @@ public class ControllerRequestURLBuilder { public String forLogicalTableDelete(String logicalTableName) { return StringUtil.join("/", _baseUrl, "logicalTables", logicalTableName); } + + public String forTableTimeBoundary(String tableName) { + return StringUtil.join("/", _baseUrl, "tables", tableName, "timeBoundary"); + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/LogicalTableConfigBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/LogicalTableConfigBuilder.java index 54ef55d1f0..1b09bc5993 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/LogicalTableConfigBuilder.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/LogicalTableConfigBuilder.java @@ -23,6 +23,7 @@ import org.apache.pinot.spi.config.table.QueryConfig; import org.apache.pinot.spi.config.table.QuotaConfig; import org.apache.pinot.spi.data.LogicalTableConfig; import org.apache.pinot.spi.data.PhysicalTableConfig; +import org.apache.pinot.spi.data.TimeBoundaryConfig; public class LogicalTableConfigBuilder { @@ -33,6 +34,7 @@ public class LogicalTableConfigBuilder { private QuotaConfig _quotaConfig; private String _refOfflineTableName; private String _refRealtimeTableName; + private TimeBoundaryConfig _timeBoundaryConfig; public LogicalTableConfigBuilder setTableName(String tableName) { @@ -70,6 +72,11 @@ public class LogicalTableConfigBuilder { return this; } + public LogicalTableConfigBuilder setTimeBoundaryConfig(TimeBoundaryConfig timeBoundaryConfig) { + _timeBoundaryConfig = timeBoundaryConfig; + return this; + } + public LogicalTableConfig build() { LogicalTableConfig logicalTableConfig = new LogicalTableConfig(); logicalTableConfig.setTableName(_tableName); @@ -79,6 +86,7 @@ public class LogicalTableConfigBuilder { logicalTableConfig.setQuotaConfig(_quotaConfig); logicalTableConfig.setRefOfflineTableName(_refOfflineTableName); logicalTableConfig.setRefRealtimeTableName(_refRealtimeTableName); + logicalTableConfig.setTimeBoundaryConfig(_timeBoundaryConfig); return logicalTableConfig; } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org